1
0
mirror of https://bitbucket.org/anguist/ntpa synced 2025-10-06 02:51:23 +00:00

Improved thread safety

This commit is contained in:
2016-04-02 11:18:17 +02:00
parent ba91d7ed92
commit 6cc2caf62a
17 changed files with 261 additions and 109 deletions

View File

@ -1,5 +1,5 @@
//
// Main.cs
// Program.cs
//
// Author:
// Carsten Sonne Larsen <cs@innolan.dk>
@ -48,7 +48,7 @@ namespace Ntp.Analyzer.Cli
Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture;
string name = "default";
string tempDir = null;
string tempDir = Directory.GetCurrentDirectory();
string configFile = null;
string pidFile = null;
bool exit = false;
@ -82,9 +82,6 @@ namespace Ntp.Analyzer.Cli
} else if (configFile == null) {
Console.WriteLine ("Please specify configuration file with option --config");
return;
} else if (tempDir == null) {
Console.WriteLine ("Please specify temporary folder with option --temp");
return;
}
// Initialize system settings
@ -104,7 +101,8 @@ namespace Ntp.Analyzer.Cli
Main main = new Main (configFile, pid, name);
main.Run ();
} catch (Exception e) {
Console.WriteLine (e.Message);
Console.WriteLine ("Unexpected error: " + e.Message);
Console.WriteLine (e.StackTrace);
}
if (pidFile != null) {
@ -117,8 +115,8 @@ namespace Ntp.Analyzer.Cli
private static void ShowUsage()
{
Console.WriteLine("NTP Analyzer Daemon " + VersionInfo.Text);
Console.WriteLine("Usage: ntpa --config file --temp dir [--writepid file] [--daemon name]");
Console.WriteLine("NTP Analyzer " + VersionInfo.Text);
Console.WriteLine("Usage: ntpa --config file [--temp dir] [--writepid file] [--daemon name]");
}
}
}

View File

@ -62,6 +62,13 @@ namespace Ntp.Analyzer.Data.Cache
}
}
public void Clear ()
{
lock (locker) {
entries.Clear ();
}
}
private T GetImporter (string name)
{
T entry;

View File

@ -54,6 +54,15 @@ namespace Ntp.Analyzer.Data
public static void Reset()
{
if (instance != null) {
if (instance.ntpConfigCache != null) {
instance.ntpConfigCache.Clear ();
}
if (instance.ntpqCache != null) {
instance.ntpqCache.Clear ();
}
}
instance = new DataFace ();
}

View File

@ -23,6 +23,7 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
using System;
using System.Collections.Generic;
@ -43,15 +44,17 @@ namespace Ntp.Analyzer.Log
activity.Clear ();
}
public override void Refresh ()
{
}
public override void Close ()
{
activity.Clear ();
}
public override void Suspend()
{ }
public override void Resume()
{ }
public override void WriteLine (string text, Severity severity)
{
string severityText;

View File

@ -34,7 +34,9 @@ namespace Ntp.Analyzer.Log
public abstract void Close();
public abstract void Refresh();
public abstract void Suspend();
public abstract void Resume();
public abstract void WriteLine(string text, Severity severity);

View File

@ -4,7 +4,7 @@
// Author:
// Carsten Sonne Larsen <cs@innolan.dk>
//
// Copyright (c) 2013 Carsten Sonne Larsen
// Copyright (c) 2013-2016 Carsten Sonne Larsen
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
@ -32,17 +32,27 @@ namespace Ntp.Analyzer.Log
{
private static List<LogBase> logs = new List<LogBase>();
public static void RefreshLogs()
public static void Suspend()
{
foreach (LogBase log in logs)
log.Refresh ();
log.Suspend ();
}
public static void Resume()
{
foreach (LogBase log in logs)
log.Resume ();
}
public static void Close()
{
foreach (LogBase log in logs)
log.Close ();
}
public static void Cleanup()
{
foreach (LogBase log in logs)
log.Close ();
Close ();
logs.Clear ();
}
@ -67,8 +77,7 @@ namespace Ntp.Analyzer.Log
{
LogGroup group = new LogGroup ();
foreach (LogConfiguration config in configs)
{
foreach (LogConfiguration config in configs) {
TextLog log = new TextLog (config.File, config.Treshold);
group.Add (log);
}

View File

@ -44,18 +44,24 @@ namespace Ntp.Analyzer.Log
log.Initialize();
}
public override void Refresh ()
{
foreach (LogBase log in logs)
log.Refresh();
}
public override void Close()
{
foreach (LogBase log in logs)
log.Close();
}
public override void Suspend()
{
foreach (LogBase log in logs)
log.Suspend ();
}
public override void Resume()
{
foreach (LogBase log in logs)
log.Resume ();
}
public override void WriteLine(string text, Severity severity)
{
foreach (LogBase log in logs)

View File

@ -27,24 +27,19 @@
using System;
using System.IO;
using System.Text;
using System.Collections.Generic;
namespace Ntp.Analyzer.Log
{
public sealed class TextLog : LogBase
{
private readonly string file;
private readonly Object locker = new Object();
private readonly string timeFormat;
private readonly Severity treshold;
private bool initialized;
private TextWriter writer;
internal TextLog(string file, Severity treshold, string timeFormat)
{
this.file = file;
this.treshold = treshold;
this.timeFormat = timeFormat;
initialized = false;
suspended = false;
}
internal TextLog(string file, Severity treshold)
@ -52,16 +47,43 @@ namespace Ntp.Analyzer.Log
{
}
private readonly List<String> cache = new List<String>();
private readonly string file;
private readonly Object locker = new Object();
private readonly string timeFormat;
private readonly Severity treshold;
private bool initialized;
private bool suspended;
private TextWriter writer;
public override void Suspend()
{
lock (locker) {
if (writer != null) {
writer.Close ();
writer.Dispose ();
writer = null;
}
initialized = false;
suspended = true;
}
}
public override void Resume()
{
lock (locker) {
suspended = false;
}
}
public override void Initialize()
{
lock (locker) {
if (!initialized && !suspended) {
writer = new StreamWriter (file, true, Encoding.ASCII);
initialized = true;
}
public override void Refresh ()
{
Close ();
Initialize ();
}
}
public override void WriteLine(string text, Severity severity)
@ -69,6 +91,16 @@ namespace Ntp.Analyzer.Log
if (!initialized)
Initialize();
lock (locker) {
if (initialized && !suspended && cache.Count != 0) {
foreach (string line in cache) {
writer.WriteLine (line);
}
writer.Flush ();
cache.Clear ();
}
}
if (severity < treshold)
return;
@ -107,10 +139,13 @@ namespace Ntp.Analyzer.Log
" [", severityText, "] ",
pad, text);
lock (locker)
{
lock (locker) {
if (initialized) {
writer.WriteLine (entry);
writer.Flush ();
} else {
cache.Add (entry);
}
}
}
@ -131,8 +166,14 @@ namespace Ntp.Analyzer.Log
public override void Close()
{
lock (locker) {
if (writer != null) {
writer.Close ();
writer.Dispose ();
writer = null;
}
initialized = false;
}
}
}
}

View File

@ -171,9 +171,9 @@ namespace Ntp.Analyzer.Process
// Initialize log.
if (firstrun)
log.WriteLine ("NTP Analyzer " + version + " started.", Severity.Notice);
log.WriteLine ("Using configuration: " + configFile, Severity.Notice);
log.WriteLine ("Running with pid: " + pid, Severity.Notice);
log.WriteLine ("Instance name: " + name, Severity.Notice);
log.WriteLine ("Using configuration " + configFile, Severity.Notice);
log.WriteLine ("Running with pid " + pid, Severity.Notice);
log.WriteLine ("Instance named " + name, Severity.Notice);
return true;
}
@ -308,6 +308,8 @@ namespace Ntp.Analyzer.Process
/// </summary>
private void InitializeScheduler ()
{
Job.Reset ();
try {
scheduler = new Scheduler (log);
log.WriteLine ("Initializing jobs.", Severity.Debug);

View File

@ -23,7 +23,9 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
using System;
using System.Threading;
using Ntp.Process;
using Ntp.Analyzer.Log;
using Ntp.Monitor.Server;

View File

@ -33,13 +33,13 @@ using Ntp.Analyzer.Objects.Live;
namespace Ntp.Analyzer.Statistics
{
public class SummeryBuilder
public class StatusBuilder
{
private readonly FileInfo configuration;
private readonly List<StatusLine> entries = new List<StatusLine>();
private readonly ServerName server;
public SummeryBuilder(ServerName server, FileInfo configuration)
public StatusBuilder(ServerName server, FileInfo configuration)
{
this.server = server;
this.configuration = configuration;
@ -52,7 +52,9 @@ namespace Ntp.Analyzer.Statistics
public void Build()
{
foreach (NtpConfEntry entry in DataFace.Instance.NtpConfigCache[configuration.FullName])
IEnumerable<NtpConfEntry> config = DataFace.Instance.NtpConfigCache [configuration.FullName];
foreach (NtpConfEntry entry in config)
{
AssociationEntry peerStatus = DataFace.Instance.NtpqCache[server.Name].
SingleOrDefault(p => p.Remote == entry.Address.ToString());

View File

@ -33,13 +33,13 @@ using Ntp.Analyzer.Objects.Live;
namespace Ntp.Analyzer.Statistics
{
public class StatusBuilder
public class SummeryBuilder
{
private readonly FileInfo configuration;
private readonly List<StatusLine> entries = new List<StatusLine>();
private readonly ServerName server;
public StatusBuilder(ServerName server, FileInfo configuration)
public SummeryBuilder(ServerName server, FileInfo configuration)
{
this.server = server;
this.configuration = configuration;
@ -52,9 +52,7 @@ namespace Ntp.Analyzer.Statistics
public void Build()
{
IEnumerable<NtpConfEntry> config = DataFace.Instance.NtpConfigCache [configuration.FullName];
foreach (NtpConfEntry entry in config)
foreach (NtpConfEntry entry in DataFace.Instance.NtpConfigCache[configuration.FullName])
{
AssociationEntry peerStatus = DataFace.Instance.NtpqCache[server.Name].
SingleOrDefault(p => p.Remote == entry.Address.ToString());

View File

@ -111,6 +111,8 @@ namespace Ntp.Process
// This node is now active and no longer "activated"
activated = false;
}
scheduler.Stop ();
}
public void SignalHandler ()
@ -136,8 +138,18 @@ namespace Ntp.Process
log.WriteLine ("Reloading configuration.", Severity.Notice);
break;
case InterProcess.Signal.Refresh:
LogFactory.RefreshLogs ();
log.WriteLine ("Restarting logging module.", Severity.Notice);
Thread thread = new Thread(() =>
{
LogFactory.Suspend ();
Thread.Sleep(30000);
LogFactory.Resume ();
});
thread.Start ();
log.WriteLine ("Refreshing logs.", Severity.Notice);
break;
case InterProcess.Signal.Notify:
// TODO
// log.WriteLine ("Sending notifications.", Severity.Notice);
break;
case InterProcess.Signal.Error:
log.WriteLine ("Error in inter process communication.", Severity.Error);
@ -149,8 +161,7 @@ namespace Ntp.Process
break;
}
} catch (Exception e) {
log.WriteLine ("Error in signal handler.", Severity.Error);
log.WriteLine (e.ToString (), Severity.Error);
log.WriteLine ("Unrecoverable error in SignalHandler.", Severity.Error);
log.WriteLine (e, Severity.Trace);
loop = false;
run = false;

View File

@ -30,9 +30,6 @@ namespace Ntp.Process
{
public sealed class Job
{
private static readonly Object nextJobLocker = new object ();
private static int nextJobId = 1;
/// <summary>
/// Initializes a new instance of the <see cref="Ntp.Process.Job"/> class.
/// </summary>
@ -52,6 +49,16 @@ namespace Ntp.Process
runCount = 0;
}
public static void Reset ()
{
lock (nextJobLocker) {
nextJobId = 1;
}
}
private static readonly Object nextJobLocker = new object ();
private static int nextJobId = 1;
private readonly int jobId;
private readonly JobDescription description;
private readonly JobScheduleDescription schedule;
@ -157,6 +164,10 @@ namespace Ntp.Process
public void Execute ()
{
started = DateTime.Now;
// Narrow scope to ensure threadsafety
DateTime threadStart = DateTime.Now;
Queued = false;
Postponed = false;
Running = true;
@ -176,7 +187,7 @@ namespace Ntp.Process
Running = false;
}
time += DateTime.Now.Subtract (started);
time += DateTime.Now.Subtract (threadStart);
if (error) {
log.WriteLine (String.Format ("{0} -> Failed", this), Severity.Debug);
} else {

View File

@ -49,6 +49,7 @@ namespace Ntp.Process
firstRun = true;
active = false;
runningThreads = new List<Thread> ();
waitHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
schedule = new List<ScheduledJob> ();
@ -62,6 +63,7 @@ namespace Ntp.Process
this.log = logGroup;
}
private readonly List<Thread> runningThreads;
private readonly EventWaitHandle waitHandle;
private readonly ActivityLog activityLog;
private readonly List<Job> jobs;
@ -182,6 +184,11 @@ namespace Ntp.Process
log.WriteLine (String.Format ("Starting scheduler with {0} jobs.", schedule.Count ()), Severity.Notice);
}
lock (runningThreads) {
foreach (Thread finished in runningThreads.Where (t => !t.IsAlive).ToList())
runningThreads.Remove (finished);
}
ScheduledJob next = schedule.OrderBy (j => j.Run).ThenBy (j => j.Job.Description.Priority).First ();
schedule.Remove (next);
next.Job.Queued = false;
@ -201,15 +208,20 @@ namespace Ntp.Process
}
bool signal = waitHandle.WaitOne (wait);
if (signal)
if (signal) {
QueueJob (next.Job, DateTime.Now.Subtract (TimeSpan.FromMilliseconds (wait)));
return;
}
if (next.Job.Description.SingleThread && schedule.Count (j => j.Job.Description.SingleThread && j.Job.Running) != 0) {
PostponeJob (next.Job);
return;
}
lock (runningThreads) {
runningThreads.Add (thread);
thread.Start ();
}
// Dont re-schedule "run-only-once" jobs.
if (next.Job.Schedule.Frequency == 0)
@ -260,5 +272,38 @@ namespace Ntp.Process
log.WriteLine (scheduledJob.ToString (), Severity.Debug);
}
}
public void Stop()
{
int count;
lock (runningThreads) {
foreach (Thread finished in runningThreads.Where (t => !t.IsAlive).ToList())
runningThreads.Remove (finished);
count = runningThreads.Count;
}
if (count != 0)
log.WriteLine (String.Format (
"Waiting for {0} threads to finish.", count),
Severity.Notice);
lock (runningThreads) {
foreach (Thread thread in runningThreads.ToList()) {
thread.Join (30000 / count);
if (thread.IsAlive) {
log.WriteLine (String.Format (
"Arborting thread {0}.", thread.Name),
Severity.Warn);
thread.Abort ();
}
}
}
log.WriteLine (String.Format (
"All threads finished.", runningThreads.Count),
Severity.Notice);
}
}
}

View File

@ -41,56 +41,62 @@ namespace Ntp.System
{
Error = 0,
Exit,
Reload,
Refresh,
Reload
Notify
}
#if __MonoCS__
public static Signal Wait (string name, LogBase log)
{
UnixSignal sigterm, sigquit, sigint, sighup, sigusr1, sigusr2;
UnixSignal[] signals = new UnixSignal[] {
new UnixSignal (Signum.SIGTERM),
new UnixSignal (Signum.SIGQUIT),
new UnixSignal (Signum.SIGINT),
new UnixSignal (Signum.SIGHUP),
new UnixSignal (Signum.SIGUSR1)
sigint = new UnixSignal (Signum.SIGINT),
sigterm = new UnixSignal (Signum.SIGTERM),
sigquit = new UnixSignal (Signum.SIGQUIT),
sighup = new UnixSignal (Signum.SIGHUP),
sigusr1 = new UnixSignal (Signum.SIGUSR1),
sigusr2 = new UnixSignal (Signum.SIGUSR2)
};
int sig = 0;
int i = 0;
while (true) {
try {
sig = UnixSignal.WaitAny (signals, -1);
i = UnixSignal.WaitAny (signals, -1);
} catch (Exception e) {
log.WriteLine ("Error in Signum handling.", Severity.Error);
log.WriteLine ("Unrecoverable error in UnixSignal.WaitAny()",
Severity.Error);
log.WriteLine (e, Severity.Trace);
return Signal.Error;
}
if (sig >= 0 && sig < signals.Length) {
Signum signal = signals [sig].Signum;
switch (signal) {
case Signum.SIGINT:
log.WriteLine ("Interrupted.", Severity.Notice);
return Signal.Exit;
case Signum.SIGTERM:
log.WriteLine ("Received kill signal.", Severity.Notice);
return Signal.Exit;
case Signum.SIGQUIT:
log.WriteLine ("Received quit signal.", Severity.Notice);
return Signal.Exit;
case Signum.SIGHUP:
log.WriteLine ("Received log restart signal.", Severity.Debug);
return Signal.Refresh;
case Signum.SIGUSR1:
log.WriteLine ("Received reload config signal.", Severity.Debug);
return Signal.Reload;
default:
log.WriteLine ("Unknown signal received.", Severity.Warn);
break;
if (i < 0 || i >= signals.Length) {
log.WriteLine (String.Format (
"Received unknown signal {0}",
signals [i].Signum.ToString ()),
Severity.Warn);
continue;
}
} else {
log.WriteLine ("Unknown signal received.", Severity.Warn);
log.WriteLine (String.Format (
"Received signal {0}",
signals [i].Signum.ToString ()),
Severity.Debug);
if (sigint.IsSet || sigterm.IsSet || sigquit.IsSet) {
sigint.Reset ();
sigterm.Reset ();
sigquit.Reset ();
return Signal.Exit;
} else if (sighup.IsSet) {
sighup.Reset ();
return Signal.Refresh;
} else if (sigusr1.IsSet) {
sigusr1.Reset ();
return Signal.Reload;
} else if (sigusr2.IsSet) {
sigusr2.Reset ();
return Signal.Notify;
}
}
}

View File

@ -29,7 +29,7 @@ namespace Ntp.System
{
public static class VersionInfo
{
public static readonly string Number = "0.4b";
public static readonly string Number = "0.4.2b";
public static readonly string Text = "v" + Number;
}