For some reason, I have a task that should accept serial analog signals and find out all positive and negative peaks in real time. The good news is that the algorithm itself does not need to be designed as a hardware circuit. So, let’s try the real power of ReactiveX in processing data streams.
Pull Input Analog Signals
All inputs are generated by a data acquisition system (NI DAQ), the method provided by NI SDK is:
1 2 3 4 5
public IAsyncResult BeginMemoryOptimizedReadWaveform( int samplesPerChannel, AsyncCallback callback, object state, AnalogWaveform<double>[] data)
The first point is this SDK is designed for .NET 4.5, but it still uses Asynchronous Programming Model (APM), so it must be converted to Task-based Asynchronous Pattern (TAP). Next, the following strange point is it doesn’t follow C# parameter rules of method. For the static factory of Task, it requires the last parameter of the given async method shoule be state: object, but the parameters order of this one is incorrect. After swapped the order, I got:
var runningTask = new NI_Task(); var data = new[] { new AnalogWaveform<double>(Context.SamplesPerChannel), new AnalogWaveform<double>(Context.SamplesPerChannel) };
In offline situation, we can observe the complete curve easily.
For example, peaks have two characteristics: height and width and the condition for treating a part of the curve as a peak is that there is a bottom following a crest. Therefore, to make this decision, the future data after the focused crest must be known.
Of course, raw data may contains noises and unexpcected peaks, we can use a threshould to filter directly.
However the task is real-time detection, I cannot predict the future data and use it. Thence I try to use the difference (i.e. trend) of curve to find out peaks. After getting difference, according to the characteristics of the first derivative, the original function at this point meets a vertex when the sign of the derivative changes.
The following is the visualized progress (normalized).