/*- * Copyright (c) 2017-2021 Carsten Sonne Larsen * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include "config.h" #include "global.h" #include "message.h" #include "text.h" #include "mem.h" #include #include "logmod.h" #define MODULENAME "Dispatch" struct MsgPort *volatile BrokerPort; struct MsgPort *volatile SyncerPort; struct MsgPort *volatile WindowPort; struct MsgPort *volatile MemoryPort; struct MsgPort *volatile ControlPort; struct MsgPort *volatile ARexxPort; int outstanding; static void PublishPort(long, char *); void InitMessages(void) { BrokerPort = NULL; SyncerPort = NULL; WindowPort = NULL; MemoryPort = NULL; ControlPort = NULL; ARexxPort = NULL; outstanding = 0; CreateMessagePort(MSGPORT_MEMORY); } bool SetupPorts(void) { CreateMessagePort(MSGPORT_BROKER); CreateMessagePort(MSGPORT_CONTROL); PublishPort(MSGPORT_CONTROL, CONTROL_PORT_NAME); if (RexxSysBase != NULL) { CreateMessagePort(MSGPORT_AREXX); PublishPort(MSGPORT_AREXX, AREXX_PORT_NAME); } if (MemoryPort == NULL || BrokerPort == NULL) { LogError("Message port initialization failed"); CleanupPorts(); return false; } LogNotice("Publishing log to message port: %s", LOGGER_PORT_NAME); LogTrace("Message port initialization complete"); return true; } void CleanupPorts(void) { DestroyMessagePort(MSGPORT_BROKER); if (ControlPort != NULL) { RemPort((struct MsgPort *)ControlPort); DestroyMessagePort(MSGPORT_CONTROL); } if (ARexxPort != NULL) { RemPort((struct MsgPort *)ARexxPort); DestroyMessagePort(MSGPORT_AREXX); } if (MemoryPort != NULL) { while (outstanding != 0) { WaitPort(MemoryPort); HandleMemoryMessages(); } } DestroyMessagePort(MSGPORT_MEMORY); } static struct MsgPort *FindMessagePort(long port) { switch (port) { case MSGPORT_BROKER: return (struct MsgPort *)BrokerPort; break; case MSGPORT_SYNCER: return (struct MsgPort *)SyncerPort; break; case MSGPORT_WINDOW: return (struct MsgPort *)WindowPort; break; case MSGPORT_MEMORY: return (struct MsgPort *)MemoryPort; break; case MSGPORT_CONTROL: return (struct MsgPort *)ControlPort; break; case MSGPORT_AREXX: return (struct MsgPort *)ARexxPort; break; default: return NULL; break; } return NULL; } static struct MsgPort *volatile *FindMessagePortAddress(long port, char **name) { struct MsgPort *volatile *p; char *s; switch (port) { case MSGPORT_BROKER: p = &BrokerPort; s = "Broker"; break; case MSGPORT_SYNCER: p = &SyncerPort; s = "Synchronizer"; break; case MSGPORT_WINDOW: p = &WindowPort; s = "Window"; break; case MSGPORT_MEMORY: p = &MemoryPort; s = "Memory"; break; case MSGPORT_CONTROL: p = &ControlPort; s = "Control"; break; case MSGPORT_AREXX: p = &ARexxPort; s = "ARexx"; break; default: p = NULL; s = "Unknown"; break; } *name = s; return p; } static void PublishPort(long port, char *name) { char *s; struct MsgPort *volatile *dest = FindMessagePortAddress(port, &s); if (dest == NULL || *dest == NULL) return; struct MsgPort *pub = *dest; pub->mp_Node.ln_Name = name; pub->mp_Node.ln_Pri = 0; Forbid(); struct MsgPort *msgPort = FindPort((CONST_STRPTR)name); if (msgPort == NULL) { AddPort((struct MsgPort *)pub); } Permit(); if (msgPort == NULL) LogNotice("%s message port published: %s", s, name); else LogWarn("%s message port is already registered: %s", s, name); } struct MsgPort *GetMessagePort(long port) { return FindMessagePort(port); } struct Message *GetNewMessage(long port) { struct MsgPort *p = FindMessagePort(port); if (p == NULL) return NULL; return GetMsg(p); } ULONG GetPortSignalMask(long port) { struct MsgPort *p = FindMessagePort(port); if (p == NULL) { LogError("Failed to get port signal mask: %ld", port); return 0; } return (1 << p->mp_SigBit); } bool CreateMessagePort(long port) { struct MsgPort *n = CreateMsgPort(); struct MsgPort *volatile *p; bool success = true; char *s; p = FindMessagePortAddress(port, &s); if (n != NULL && p != NULL && *p == NULL) { LogTrace(TextPortCreated, s); *p = n; } else { LogError(TextPortInitError, s); success = false; } return success; } void DestroyMessagePort(long port) { struct MsgPort *volatile *p; struct MsgPort *msgPort; bool exist = true; char *s; Forbid(); p = FindMessagePortAddress(port, &s); if (p == NULL || *p == NULL) { exist = false; } else { msgPort = *p; *p = NULL; } Permit(); if (exist) { struct Message *msg; while ((msg = GetMsg(msgPort))) { ReplyMsg(msg); } DeleteMsgPort(msgPort); LogTrace(TextPortCleaned, s); } else { LogError(TextPortNotFound, s); } } /* * Send an application message in a safe manner. */ static void SendAppMessage(long port, long type) { struct MsgPort *dest, *reply; bool sent = false; struct ApplicationMesage *msg = AllocStructSafe(struct ApplicationMesage); if (msg == NULL) return; msg->Msg.mn_Node.ln_Type = NT_MESSAGE; msg->Msg.mn_Length = sizeof(struct ApplicationMesage); msg->MsgType = MSGTYPE_APP; msg->MsgId = type; Forbid(); dest = FindMessagePort(port); reply = (struct MsgPort *)MemoryPort; if (dest != NULL && reply != NULL) { msg->Msg.mn_ReplyPort = reply; PutMsg(dest, (struct Message *)msg); outstanding++; sent = true; } Permit(); if (!sent) FreeMemSafe(msg); } /* * Send a message to the broker. */ void SendBrokerMessage(long type) { SendAppMessage(MSGPORT_BROKER, type); } /* * Send a message to the settings window. */ void SendWindowMessage(long type) { SendAppMessage(MSGPORT_WINDOW, type); } /* * Try to send a message to the logger message port. */ void SendLogMessage(struct LogMessage *msg) { struct MsgPort *port, *reply; bool sent = false; msg->AppMsg.Msg.mn_Node.ln_Type = NT_MESSAGE; msg->AppMsg.Msg.mn_Length = sizeof(struct LogMessage); msg->AppMsg.MsgType = MSGTYPE_LOG; Forbid(); reply = (struct MsgPort *)MemoryPort; port = FindPort((CONST_STRPTR)LOGGER_PORT_NAME); if (port != NULL && reply != NULL) { msg->AppMsg.Msg.mn_ReplyPort = reply; PutMsg(port, (struct Message *)msg); outstanding++; sent = true; } Permit(); if (!sent && port != NULL) { // Try again using own reply port struct MsgPort *replyPort = CreateMsgPort(); if (replyPort != NULL) { Forbid(); port = FindPort((CONST_STRPTR)LOGGER_PORT_NAME); if (port != NULL) { msg->AppMsg.Msg.mn_ReplyPort = replyPort; PutMsg(port, (struct Message *)msg); sent = true; } Permit(); WaitPort(replyPort); GetMsg(replyPort); DeleteMsgPort(replyPort); } } if (!sent) FreeVec(msg); } /* * Free processed messages. */ void HandleMemoryMessages(void) { struct ApplicationMesage *msg; while ((msg = (struct ApplicationMesage *)GetMsg(MemoryPort))) { if (msg->MsgType == MSGTYPE_LOG) { FreeVec(msg); } else { FreeMemSafe(msg); } outstanding--; } } /* * Send a message and wait for the reply. */ void SendMessageWait(long port, long type) { struct MsgPort *replyPort = CreateMsgPort(); if (replyPort != NULL) { struct ApplicationMesage *message = AllocStructSafe(struct ApplicationMesage); if (message != NULL) { message->Msg.mn_Node.ln_Type = NT_MESSAGE; message->Msg.mn_Length = sizeof(struct ApplicationMesage); message->Msg.mn_ReplyPort = replyPort; message->MsgType = MSGTYPE_APP; message->MsgId = type; Forbid(); struct MsgPort *dest = FindMessagePort(port); if (dest != NULL) { PutMsg(dest, (struct Message *)message); } Permit(); if (dest != NULL) { WaitPort(replyPort); GetMsg(replyPort); } else { LogTrace("Message port is closed"); } FreeMemSafe(message); } DeleteMsgPort(replyPort); } } /* * Process any remaining messages and close port cleanly. */ void CleanupMsgPort(struct MsgPort **port) { struct Message *msg; struct MsgPort *msgPort = *port; if (*port == NULL) { return; } *port = NULL; while ((msg = GetMsg(msgPort))) { ReplyMsg(msg); } DeleteMsgPort(msgPort); }