Keep your time right
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

473 lines
11 KiB

/*-
* Copyright (c) 2017-2021 Carsten Sonne Larsen <cs@innolan.net>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "config.h"
#include "global.h"
#include "message.h"
#include "text.h"
#include "mem.h"
#include <proto/rexxsyslib.h>
#include "logmod.h"
#define MODULENAME "Dispatch"
struct MsgPort *volatile BrokerPort;
struct MsgPort *volatile SyncerPort;
struct MsgPort *volatile WindowPort;
struct MsgPort *volatile MemoryPort;
struct MsgPort *volatile ControlPort;
struct MsgPort *volatile ARexxPort;
int outstanding;
static void PublishPort(long, char *);
void InitMessages(void)
{
BrokerPort = NULL;
SyncerPort = NULL;
WindowPort = NULL;
MemoryPort = NULL;
ControlPort = NULL;
ARexxPort = NULL;
outstanding = 0;
CreateMessagePort(MSGPORT_MEMORY);
}
bool SetupPorts(void)
{
CreateMessagePort(MSGPORT_BROKER);
CreateMessagePort(MSGPORT_CONTROL);
PublishPort(MSGPORT_CONTROL, CONTROL_PORT_NAME);
if (RexxSysBase != NULL)
{
CreateMessagePort(MSGPORT_AREXX);
PublishPort(MSGPORT_AREXX, AREXX_PORT_NAME);
}
if (MemoryPort == NULL || BrokerPort == NULL)
{
LogError("Message port initialization failed");
CleanupPorts();
return false;
}
LogNotice("Publishing log to message port: %s", LOGGER_PORT_NAME);
LogTrace("Message port initialization complete");
return true;
}
void CleanupPorts(void)
{
DestroyMessagePort(MSGPORT_BROKER);
if (ControlPort != NULL)
{
RemPort((struct MsgPort *)ControlPort);
DestroyMessagePort(MSGPORT_CONTROL);
}
if (ARexxPort != NULL)
{
RemPort((struct MsgPort *)ARexxPort);
DestroyMessagePort(MSGPORT_AREXX);
}
if (MemoryPort != NULL)
{
while (outstanding != 0)
{
WaitPort(MemoryPort);
HandleMemoryMessages();
}
}
DestroyMessagePort(MSGPORT_MEMORY);
}
static struct MsgPort *FindMessagePort(long port)
{
switch (port)
{
case MSGPORT_BROKER:
return (struct MsgPort *)BrokerPort;
break;
case MSGPORT_SYNCER:
return (struct MsgPort *)SyncerPort;
break;
case MSGPORT_WINDOW:
return (struct MsgPort *)WindowPort;
break;
case MSGPORT_MEMORY:
return (struct MsgPort *)MemoryPort;
break;
case MSGPORT_CONTROL:
return (struct MsgPort *)ControlPort;
break;
case MSGPORT_AREXX:
return (struct MsgPort *)ARexxPort;
break;
default:
return NULL;
break;
}
return NULL;
}
static struct MsgPort *volatile *FindMessagePortAddress(long port, char **name)
{
struct MsgPort *volatile *p;
char *s;
switch (port)
{
case MSGPORT_BROKER:
p = &BrokerPort;
s = "Broker";
break;
case MSGPORT_SYNCER:
p = &SyncerPort;
s = "Synchronizer";
break;
case MSGPORT_WINDOW:
p = &WindowPort;
s = "Window";
break;
case MSGPORT_MEMORY:
p = &MemoryPort;
s = "Memory";
break;
case MSGPORT_CONTROL:
p = &ControlPort;
s = "Control";
break;
case MSGPORT_AREXX:
p = &ARexxPort;
s = "ARexx";
break;
default:
p = NULL;
s = "Unknown";
break;
}
*name = s;
return p;
}
static void PublishPort(long port, char *name)
{
char *s;
struct MsgPort *volatile *dest = FindMessagePortAddress(port, &s);
if (dest == NULL || *dest == NULL)
return;
struct MsgPort *pub = *dest;
pub->mp_Node.ln_Name = name;
pub->mp_Node.ln_Pri = 0;
Forbid();
struct MsgPort *msgPort = FindPort((CONST_STRPTR)name);
if (msgPort == NULL)
{
AddPort((struct MsgPort *)pub);
}
Permit();
if (msgPort == NULL)
LogNotice("%s message port published: %s", s, name);
else
LogWarn("%s message port is already registered: %s", s, name);
}
struct MsgPort *GetMessagePort(long port)
{
return FindMessagePort(port);
}
struct Message *GetNewMessage(long port)
{
struct MsgPort *p = FindMessagePort(port);
if (p == NULL)
return NULL;
return GetMsg(p);
}
ULONG GetPortSignalMask(long port)
{
struct MsgPort *p = FindMessagePort(port);
if (p == NULL)
{
LogError("Failed to get port signal mask: %ld", port);
return 0;
}
return (1 << p->mp_SigBit);
}
bool CreateMessagePort(long port)
{
struct MsgPort *n = CreateMsgPort();
struct MsgPort *volatile *p;
bool success = true;
char *s;
p = FindMessagePortAddress(port, &s);
if (n != NULL && p != NULL && *p == NULL)
{
LogTrace(TextPortCreated, s);
*p = n;
}
else
{
LogError(TextPortInitError, s);
success = false;
}
return success;
}
void DestroyMessagePort(long port)
{
struct MsgPort *volatile *p;
struct MsgPort *msgPort;
bool exist = true;
char *s;
Forbid();
p = FindMessagePortAddress(port, &s);
if (p == NULL || *p == NULL)
{
exist = false;
}
else
{
msgPort = *p;
*p = NULL;
}
Permit();
if (exist)
{
struct Message *msg;
while ((msg = GetMsg(msgPort)))
{
ReplyMsg(msg);
}
DeleteMsgPort(msgPort);
LogTrace(TextPortCleaned, s);
}
else
{
LogError(TextPortNotFound, s);
}
}
/*
* Send an application message in a safe manner.
*/
static void SendAppMessage(long port, long type)
{
struct MsgPort *dest, *reply;
bool sent = false;
struct ApplicationMesage *msg = AllocStructSafe(struct ApplicationMesage);
if (msg == NULL)
return;
msg->Msg.mn_Node.ln_Type = NT_MESSAGE;
msg->Msg.mn_Length = sizeof(struct ApplicationMesage);
msg->MsgType = MSGTYPE_APP;
msg->MsgId = type;
Forbid();
dest = FindMessagePort(port);
reply = (struct MsgPort *)MemoryPort;
if (dest != NULL && reply != NULL)
{
msg->Msg.mn_ReplyPort = reply;
PutMsg(dest, (struct Message *)msg);
outstanding++;
sent = true;
}
Permit();
if (!sent)
FreeMemSafe(msg);
}
/*
* Send a message to the broker.
*/
void SendBrokerMessage(long type)
{
SendAppMessage(MSGPORT_BROKER, type);
}
/*
* Send a message to the settings window.
*/
void SendWindowMessage(long type)
{
SendAppMessage(MSGPORT_WINDOW, type);
}
/*
* Try to send a message to the logger message port.
*/
void SendLogMessage(struct LogMessage *msg)
{
struct MsgPort *port, *reply;
bool sent = false;
msg->AppMsg.Msg.mn_Node.ln_Type = NT_MESSAGE;
msg->AppMsg.Msg.mn_Length = sizeof(struct LogMessage);
msg->AppMsg.MsgType = MSGTYPE_LOG;
Forbid();
reply = (struct MsgPort *)MemoryPort;
port = FindPort((CONST_STRPTR)LOGGER_PORT_NAME);
if (port != NULL && reply != NULL)
{
msg->AppMsg.Msg.mn_ReplyPort = reply;
PutMsg(port, (struct Message *)msg);
outstanding++;
sent = true;
}
Permit();
if (!sent && port != NULL)
{
// Try again using own reply port
struct MsgPort *replyPort = CreateMsgPort();
if (replyPort != NULL)
{
Forbid();
port = FindPort((CONST_STRPTR)LOGGER_PORT_NAME);
if (port != NULL)
{
msg->AppMsg.Msg.mn_ReplyPort = replyPort;
PutMsg(port, (struct Message *)msg);
sent = true;
}
Permit();
WaitPort(replyPort);
GetMsg(replyPort);
DeleteMsgPort(replyPort);
}
}
if (!sent)
FreeVec(msg);
}
/*
* Free processed messages.
*/
void HandleMemoryMessages(void)
{
struct ApplicationMesage *msg;
while ((msg = (struct ApplicationMesage *)GetMsg(MemoryPort)))
{
if (msg->MsgType == MSGTYPE_LOG)
{
FreeVec(msg);
}
else
{
FreeMemSafe(msg);
}
outstanding--;
}
}
/*
* Send a message and wait for the reply.
*/
void SendMessageWait(long port, long type)
{
struct MsgPort *replyPort = CreateMsgPort();
if (replyPort != NULL)
{
struct ApplicationMesage *message = AllocStructSafe(struct ApplicationMesage);
if (message != NULL)
{
message->Msg.mn_Node.ln_Type = NT_MESSAGE;
message->Msg.mn_Length = sizeof(struct ApplicationMesage);
message->Msg.mn_ReplyPort = replyPort;
message->MsgType = MSGTYPE_APP;
message->MsgId = type;
Forbid();
struct MsgPort *dest = FindMessagePort(port);
if (dest != NULL)
{
PutMsg(dest, (struct Message *)message);
}
Permit();
if (dest != NULL)
{
WaitPort(replyPort);
GetMsg(replyPort);
}
else
{
LogTrace("Message port is closed");
}
FreeMemSafe(message);
}
DeleteMsgPort(replyPort);
}
}
/*
* Process any remaining messages and close port cleanly.
*/
void CleanupMsgPort(struct MsgPort **port)
{
struct Message *msg;
struct MsgPort *msgPort = *port;
if (*port == NULL)
{
return;
}
*port = NULL;
while ((msg = GetMsg(msgPort)))
{
ReplyMsg(msg);
}
DeleteMsgPort(msgPort);
}