![]() |
![]() |
![]() |
![]() |
Reactive programming is a "programming paradigm oriented around data flows and the propagation of change." Relationships can be established between producers (observables), consumers (observers), and transformers of data (operators), with updates occurring asynchronously, rather than by request. This aids in application scalability and better utilization of hardware resources, as threads are not blocked waiting on potentially long-running operations. In Part I of this article, a method was shown to convert samples as obtained by OpenDDS into observable sequences, as well as a method to simulate these sequences for test purposes. This article describes the operators that can be applied to observable sequences to filter, transform and otherwise manipulate them. We shall use the Reactive Extensions for .NET (Rx.NET), for our examples.
The list of operators that operate on observables is considerable, and although a core set is available
in all reactive frameworks, individual frameworks may also add operators of their own. For example,
RxJava
provides a parallel()
operator which doesn't have a clear analogue in Rx.NET or
RxCpp.
Also, the naming of a given operator can differ from framework to framework. In
Part I, we had seen
Select()
and map()
— both apply a function to elements of an observable
sequence, and, in our case, can be used to convert the sequence type into a different one. We have also
seen the Take()
operator to limit
the sequence to a given number of elements. We can use the Visual Studio Unit Testing Framework
as an easy way to demonstrate the behavior of a number of other operators.
In the testing framework, individual tests are implemented as public class methods that return
void
, have no parameters, and are marked with the [TestMethod]
attribute. The public class that they are contained within is marked with the [TestClass]
attribute. Methods marked with additional attributes are used to initialize tests and clean
up after them. For example, a method marked with the [TestInitialize]
attribute
is executed before each test method. By compiling a project containing these attributes in
Visual Studio, Visual Studio will identify these methods as tests and allow them to be run
from the Test Explorer pane in Visual Studio, or by MSTest.exe
from the command line.
A scheduler is a mechanism, part of the reactive framework, that controls when subscriptions start,
notifications are published, and provides a notion of time. Although the default is a real time scheduler,
unit tests for observables can use a virtual time-based scheduler, TestScheduler
,
as introduced in Part I of this article. The method CreateColdObservable()
creates
a cold observable (an observable that publishes only once
an observer has subscribed, in contrast to hot observables which publish regardless)
by specifying the individual notifications produced
by the observable: OnNext()
to produce a value, and OnCompleted()
to indicate that
the observable will no longer publish values. The first parameter to both notification methods is the time,
in ticks (one ten-millionth of a second) to indicate when the notification is produced. TestScheduler
uses these virtual times, rather than wall-clock time, to sequence the output of observables so tests run
at expected unit test speeds.
For the examples below, we shall define three cold observables, xs
, ys
and zs
, that are recreated before each test, as follows.
These particular observables demonstrate sequences with notifications at irregular intervals as well as
duplicated values.
// ReactiveCS\ReactiveCS.cs [TestClass] public class Tests : ReactiveTest { TestScheduler scheduler; ITestableObservable<int> xs, ys, zs; [TestInitialize] public void TestInit() { scheduler = new TestScheduler(); xs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(40, 3), OnNext(41, 3), OnNext(60, 4), OnCompleted<int>(70) ); ys = scheduler.CreateColdObservable( OnNext(5, 10), OnNext(15, 20), OnNext(45, 30), OnCompleted<int>(50) ); zs = scheduler.CreateColdObservable( OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 3), OnNext(50, 1), OnNext(60, 2), OnNext(70, 3), OnCompleted<int>(80) ); }
For each of the operators described in this article, a demonstration of their output when applied to
xs
, ys
and/or zs
, as appropriate, will be presented, as well as
a suggestion as how the operator would be useful to apply to an OpenDDS observable.
The Amb()
operator, given multiple observables, chooses the observable that is first to
produce any items. From then on, Amb()
selects items from that observable. One use of this
operator is to select the quickest responding from a set of redundant observables. While the original inspiration
for Amb()
, John McCarthy's Ambiguous
operator, would arbitrarily choose one of the provided values (or even roll back computation to select an alternative
value if the first led to an error), the Amb()
operator always selects the first-responding observable.
OpenDDS: Consider multiple stock ticker feeds, or rendundant sensors. The Amb()
operator
can be used to obtain the quickest-responding feed for increased application response. While similar to the
OWNERSHIP DDS quality of service policy, Amb()
, once an observable is selected, will use that
observable from then on. In contrast, in DDS, the OWNERSHIP_STRENGTH can change dynamically, potentially leading to samples
from a different data writer than initially used to be selected.
Unit tests
for observables based on the TestScheduler
can look like
the following.
First, create an observer results
which is of the type produced by the sequence (int
, in
our case). Next, apply
the operator, and subscribe results
to the obervable that was created by the application of
the operator. Start()
the scheduler, and when
it completes, results.Messages
contains the sequence that was produced by the observable
that was subscribed to. By using AssertEqual()
, the generated sequence can be compared against
an expected sequence to determine the pass/fail criteria of the test. In the case above, xs.Amb(ys)
selects the sequence produced by ys
because the first sample of ys
is at time 5, which is earlier
than the first sample of xs
at time 10.
// ReactiveCS\ReactiveCS.cs [TestMethod] public void Amb() { var results = scheduler.CreateObserver<int>(); xs.Amb(ys) .Subscribe(results); scheduler.Start(); results.Messages.AssertEqual( OnNext(5, 10), OnNext(15, 20), OnNext(45, 30), OnCompleted<int>(50)); }
As a side note, AssertEqual()
compares elements in the sequence using the default comparator of the
underlying element — in this case, the equality operator of int
. If the sequence produces a
structure, it may be necessary to implement a custom equality comparison. In particular, if the sequence element
contains floating point values, AssertEqual()
will fail unless a custom equality comparison is implemented.
One such comparison is described
here
which handles special cases for infinity and near-zero values, but otherwise compares against a supplied tolerance.
public bool NearlyEqual(double a, double b, double epsilon) { double absA = Math.Abs(a); double absB = Math.Abs(b); double diff = Math.Abs(a - b); if (a == b) { // shortcut, handles infinities return true; } else if (a == 0 || b == 0 || diff < Double.MinValue) { // a or b is zero or both are extremely close to it // relative error is less meaningful here return diff < (epsilon * Double.MinValue); } else { // use relative error return diff / (absA + absB) < epsilon; } }
The Merge()
operator merges the sequences from multiple observers into a single stream, ordered
by time. That is, xs.Merge(ys)
produces
OnNext(5, 10)
,
OnNext(10, 1)
,
OnNext(15, 20)
,
OnNext(20, 2)
,
OnNext(40, 3)
,
OnNext(41, 3)
,
OnNext(45, 30)
,
OnNext(60, 4)
,
and
OnCompleted<int>(70)
. The sequence terminates when the last of the merged sequences terminates. Here,
xs
completes at 70 and ys
at 50, so the sequence produced by Merge()
completes
at 70.
OpenDDS: Suppose sensors that publish on different topics have similar data — say topics "Temperature1" through
"Temperature20" — which are to be processed at one time, as they are all of the same data type. The
Merge()
operator can combine the data streams into a single stream for analysis.
The Where()
operator, also called filter()
in some of the reactive frameworks for other
languages, takes a function that accepts an element of the sequence type as a parameter, and returns true if it
should be present in the resulting sequence, or false if it should not be. For instance, xs.Where(x => x > 2)
yields
OnNext(40, 3)
,
OnNext(41, 3)
,
OnNext(60, 4)
,
and
OnCompleted<int>(70)
,
as these are the only values produced by xs
greater than two. The sequence completes at the same moment
that the filtered sequence completes.
OpenDDS: OpenDDS provides the PARTITION and content filtering methods to filter which data samples are
received, and Where()
provides similar behavior. If the same OpenDDS data stream, though, is to
be filtered in multiple ways, it may be more efficient to receive a single stream from OpenDDS, subscribe to the
OpenDDS observer multiple times and filter each subscription differently, rather than having multiple OpenDDS
subscribers with differing PARTITION and content filtering expressions. Then again, OpenDDS quality of service
and content filtering can be applied at the publisher, so while subscriber development complexity may be reduced,
network performance will still be impacted.
The Distinct()
operator ensures that no duplicated values are produced by the sequence. zs.Distinct()
yields
OnNext(10, 1)
,
OnNext(20, 2)
,
OnNext(30, 3)
,
and
OnCompleted<int>(80)
, as the values of 1, 2 and 3 are emitted only the first time they are seen. The emitted
sequence terminates when the original sequence does, at time 80.
OpenDDS: If an OpenDDS sample stream should be, say, monotonically increasing but is subject to jitter,
the Distinct()
operator can ensure that only unique values are processed.
The DistinctUntilChanged()
operator differs from Distinct()
in that it only drops duplicated
values if they appear next to each other in the original sequence. That is, zs.DistinctUntilChanged()
yields
OnNext(10, 1)
,
OnNext(20, 2)
,
OnNext(30, 3)
,
OnNext(50, 1)
,
OnNext(60, 2)
,
OnNext(70, 3)
,
and
OnCompleted<int>(80)
, and only the value 3 produced at time 40 is dropped because the previous sample at time 30
was also 3.
OpenDDS: Simce DistinctUntilChanged()
will only remove duplicates if they arrive
consecutively, it allows data samples to be processed only if they have changed.
For example, a sequence of stock values may only need attention if
the price has moved, but can otherwise be ignored if the value remains stable.
The Concat()
operator concatenates sequences together — the second sequence begins when the first completes.
xs.Concat(ys)
yields
OnNext(10, 1)
,
OnNext(20, 2)
,
OnNext(40, 3)
,
OnNext(41, 3)
,
OnNext(60, 4)
,
OnNext(75, 10)
,
OnNext(85, 20)
,
OnNext(115, 30)
,
and
OnCompleted<int>(120)
, where the elements from ys
immediately follow those of xs
.
The sequence produced by xs
ends at time 70, and the first element of ys
is produced at time 5,
so the result of the concatenation has the first element of ys
emitted at time 70+5 = 75.
OpenDDS: Suppose two OpenDDS data streams exist that represent work items to service, where one stream is high priority and the other low priority. Suppose also that work items must be serviced at the relative time intevals when they arrive — say if a robot on a factory floor needs time to move its manipulator arm to a starting position, the assembly of the part that the work item represents cannot begin until the arm is ready. By concatenating the low priority work observable on to the end of the high priority work observable, it will be ensured that all high priority work is completed first, but all work, regardless of when it arrives, is still executed with the appropriate time intervals between them.
The Zip()
operator combines two sequences into one, using a supplied function, and the number of elements
produced by the combination is equal to the shorter of the sequences being combined — elements are taken pairwise,
so both original sequences must have an element available to combine into one that can be emitted in the resulting sequence.
The observer created by xs.Zip(ys, (x, y) => x + y)
yields
OnNext(10, 11)
,
OnNext(20, 22)
,
OnNext(45, 33)
,
and
OnCompleted<int>(60)
, pairing the 1, 2 and 3 of xs
with the 10, 20, and 30 of ys
.
The times of each element emitted is the time at which an element from each of the zipped sequences was able to be used.
That is, while an element of ys
is available at time 5, it isn't until time 10 that an element of xs
is available to pair with it, so the time of the result of the zip of the two is time 10. The completion time of the emitted
sequence is documented to be the end of the shorter sequence, but is not the case in the version of Rx.NET used
in this article. In this example, it is the time of the sample that does not have a match in the paired sequence.
OpenDDS: Consider a calculation that can only be performed when a data value arrives from each of three
different OpenDDS topics. By using the Zip()
operator, the resulting observable wouldn't contain
an item to process unless values from all three topics had already arrived. The Zip()
operator is
related to OpenDDS's implementation of the MultiTopic content subscription feature, as it can be used to unify
samples produced by disparate observables.
The Sample()
operator, given a sampling interval, returns the most recent data sample received within
that interval. In order for the sampling interval to apply to the virtual time scheduler, unlike the time-independent
operators above, the scheduler must be specified as an argument to Sample()
. If the scheduler is allowed
to be its default value, the sampling interval would be interpreted as real time, producing incorrect test results.
Sampling xs
every 25 ticks can be done by:
// ReactiveCS\ReactiveCS.cs var results = scheduler.CreateObserver<int>(); xs.Sample(TimeSpan.FromTicks(25), scheduler) .Subscribe(results);
The sequence produced is
OnNext(25, 2)
,
OnNext(50, 3)
,
OnNext(75, 4)
,
OnCompleted<int>(75)
.
That is, one notification generated at the end of each sample interval, containing the most recent value of
the observable. For example, at the 25 tick mark, the most recent sequence value from xs
has the
value 2, produced at time 20. The sequence completes at the end of the last sample interval, not at the point
at which the sampled sequence completes.
OpenDDS: The Sample()
operator can be used to reduce the data rate of an OpenDDS data
stream. For instance, a data sample stream containing time updates may be arriving much more quickly than
a clock that needs updates only once per second. The DDS TIME_BASED_FILTER quality of service policy behaves
in a similar way as does the Sample()
operator, although the sample yielded by TIME_BASED_FILTER
will be the first in the sampling interval window, while the sample yielded by the Sample()
operator
will be the last sample in the window.
From its name, one may think that the Throttle()
operator reduces the sequence rate below a
threshold. Instead, it allows elements to be produced only if a throttling interval has passed without
any elements being generated. An example used in this
presentation
uses Throttle()
to limit requests made to a web service that returns words that complete the
text that the user is typing. Rather than querying the web service on each character typed, the web service
is queried only when the user has stopped typing for a period of time.
As it is also based on a time interval, the scheduler must be supplied. The observable created by
xs.Throttle(TimeSpan.FromTicks(15), scheduler)
produces the sequence
OnNext(35, 2)
,
OnNext(56, 3)
,
OnNext(70, 4)
,
and
OnCompleted<int>(70)
. The sample times are explained as follows.
The first sample of xs
is at time 10, but as that is within
the 15 tick interval (starting from 0), it is skipped. The next value of 2 is produced at time 20, but as the value following it, 3,
isn't produced until time 40, the 20 tick gap between 2 and 3 is greater than the throttle interval, so the value of 2
produced at time 20 is emitted from the throttled sequence at time 35 (20 plus the throttle interval). Similarly, the
first 3 from xs
is dropped, but the second 3 is emitted, as the interval between the second 3 and 4,
times of 41 and 60 respectively, is greater than the 15 tick throttle interval. There are no samples following 4, so
it can be safely emitted, and the emitted sequence terminates at the same moment that the throttled sequence does.
OpenDDS: Suppose an OpenDDS data stream normally produces values continually, but occasionally stops
and restarts, perhaps due to a mechanical fault. The Throttle()
operator can be used to signal
the application that a restart has occurred and that the device producing the data stream requires maintenance.
Unlike the previous operators which produce an observable sequence of elements, the GroupBy()
operator yields an observable sequence of observable sequences. The orignal sequence that GroupBy()
is applied to is divided into separate observable sequences based on a supplied function. As these new observables
are created, they are produced as notifications in the sequence returned by GroupBy()
, and, as they
appear, they can be treated as any other observable — operators may be applied to them, or they may be
subscribed to.
Testing the result of GroupBy
is a bit convoluted, but is described
here
As new grouped observables are produced, they are added to a list for later examination. As an
example, consider the division of the elements of an observable into two groups, one group
containing values less than or equal to 2, and the other group containing values 2 or greater.
We can set up the test as follows:
// ReactiveCS\ReactiveCS.cs [TestMethod] public void GroupBy() { // as each group appears, add it to the groups list var groups = new List<Tuple<long, bool, ITestableObserver<int>>>(); xs.GroupBy(x => x > 2) .Subscribe(g => { var observer = scheduler.CreateObserver<int>(); g.Subscribe(observer); groups.Add(Tuple.Create( scheduler.Clock, g.Key, observer)); }); scheduler.Start();
The variable groups
contains a list of tuples, where a tuple stores three
pieces of information: the time at which the group was created, the group key (here, just
true or false — the return value of the grouping function — to identify the two groups),
and an observer of the group represented by the tuple. That is, each time a new group
is created by the grouping operation, a new observer is built, it subscribes to the new group,
and an item is added to the groups
list. We then start the scheduler as before.
Next, we create a helper function that validates the contents of a tuple in the groups
list — it compares the values in the tuple to ones supplied as arguments to it.
var assertGroup = new Action<int, long, bool, Recorded<Notification<int>>[]>( (index, clock, key, messages) => { var g = groups[index]; Assert.AreEqual(clock, g.Item1); Assert.AreEqual(key, g.Item2); g.Item3.Messages.AssertEqual(messages); });
To test that GroupBy()
operated as expected, we first confirm that two groups were
created. Only two groups should exist as the grouping function can only return true or false.
Assert.AreEqual(2, groups.Count);
Next, we check the first group (group 0). The first element in xs
has the value 1
produced at time 10. It is not greater than 2, so the value 1 is added to the "false" group.
As the "false" group doesn't yet exist, it is created at time 10. The only other element in the
sequence that fails the "greater than 2" test is 2 itself at time 20, so the "false" group should
only contain two elements, and terminate when the grouped sequence terminates.
// at time 10, the "false" group appears assertGroup(0, 10, false, new[] { OnNext(10, 1), OnNext(20, 2), OnCompleted<int>(70) });
We then check the second group (group 1). The "true" group is created when the first element that is greater than 2 is seen in the sequence (time 40), contains all elements greater than 2 from the original sequence, and also completes at the same time that the original sequence does.
// at time 40 the "true" group appears assertGroup(1, 40, true, new[] { OnNext(40, 3), OnNext(41, 3), OnNext(60, 4), OnCompleted<int>(70) }); }
OpenDDS: Grouping can be used to not only arrange a sequence of OpenDDS data samples into groups by the data values themselves, but can also be used to group based on other OpenDDS properties, such as instance or sample state, transforming a topic-based observable sequence into an instance-based one.
The Window()
operator breaks a sequence into time slices, creating a new observable (as GroupBy()
did) for each time slice (window). Splitting xs
into 25 tick windows is done by
xs.Window(TimeSpan.FromTicks(25), scheduler)
, and three windows are created. The first window, starting
at time 0, contains
OnNext(10, 1)
,
OnNext(20, 2)
,
and
OnCompleted<int>(25)
. That is, only two samples from xs
are produced within the first
25 ticks, and the window closes at tick 25. The second window ranges from ticks 25 to 50, and contains
OnNext(40, 3)
,
OnNext(41, 3)
,
and
OnCompleted<int>(50)
, and the third window starts at tick 50 and ends when the original sequence ends at tick 70.
It contains
OnNext(60, 4)
,
and
OnCompleted<int>(70)
.
OpenDDS: As with Sample()
, Window()
can be used as another form
of rate limiting. Suppose a process can only operate at the rate of 10 samples a second —
dividing the incoming data into one-second windows and processing only the first, at most, 10 samples that arrive
in each window (by using the Take()
operator on each window), will ensure that the process
is never overloaded if the OpenDDS data sample rate increases.
The Buffer()
operator is similar to the Window()
operator in that it divides the buffered sequence
into time slices, but the slices themselves are different. Slices can be created either by a time interval (as with
Window()
) or by a count of elements, and a buffer is a single instant in time. The creation time, completion
time, and element times of the buffer are the same.
A 25 tick-sized buffer can be created with xs.Buffer(TimeSpan.FromTicks(25), scheduler)
, and, as with Window()
,
three buffers are created. The first, at time 25 (the end of the interval, unlike Window()
which created the window
at the start of the interval), contains
OnNext(25, 1)
,
OnNext(25, 2)
,
and
OnCompleted<int>(25)
. It contains the same values as does the first window, but both values, and the completion,
are at the buffer creation time — tick 25. The second buffer, at time 50, contains
OnNext(50, 3)
,
OnNext(50, 3)
,
and
OnCompleted<int>(50)
. Again, the same values as in the second window, but all at the start/end time of
the buffer. Lastly, in the same pattern, the third buffer contains
OnNext(70, 4)
,
and
OnCompleted<int>(70)
.
A 3 count-sized buffer can be created with xs.Buffer(3)
. Here, a buffer is built every time three elements
have arrived, or the original sequence has completed. So, for xs
, two buffers are created. The first one
contains
OnNext(40, 1)
,
OnNext(40, 2)
,
OnNext(40, 3)
,
and
OnCompleted<int>(40)
, and is at time 40 because that was the time at which the third element arrived.
The second buffer contains
OnNext(70, 3)
,
OnNext(70, 4)
,
and
OnCompleted<int>(70)
, and is at time 70 because the sequence completed before a third element was received.
OpenDDS: A use of the Buffer()
operator is for data smoothing. Consider a noisy analog-to-digital
converter that is sending its raw readings over OpenDDS. The application could use the Buffer()
operator
to produce a buffer after, say, 7 samples have arrived. Out of the 7 samples in the buffer, the highest and lowest
sample could be discarded, the remaining 5 averaged, and then the result used for calculation.
The GroupJoin()
is an operator used for combining two observables. A window of time is
provided for each observable, and, for elements from each observable that fall into the overlap of the windows,
a joining function is applied, producing a new element that is a combination of the matching ones.
A demonstration of GroupJoin()
is shown below, as provided in the collection of 101 Rx Samples
here.
The code is identical to the Rx sample, save a small modification to convert it into a unit test.
Previously, we created observables based on an adapted event, a subject, or a time-based generated sequence. Another
way to create an observable is to convert a standard data structure, such as a list, into an observable, via
a call to ToObservable()
.
// ReactiveCS\ReactiveCS.cs [TestMethod] public void GroupJoin() { var leftList = new List<string[]>(); leftList.Add( new string[] { "2013-01-01 02:00:00", "Batch1" }); leftList.Add( new string[] { "2013-01-01 03:00:00", "Batch2" }); leftList.Add( new string[] { "2013-01-01 04:00:00", "Batch3" }); var rightList = new List<string[]>(); rightList.Add( new string[] { "2013-01-01 01:00:00", "Production=2" }); rightList.Add( new string[] { "2013-01-01 02:00:00", "Production=0" }); rightList.Add( new string[] { "2013-01-01 03:00:00", "Production=3" }); var l = leftList.ToObservable(); var r = rightList.ToObservable();
A number of predefined special observables are also available. In this example, the windows for each
observable are defined by Observable.Never<>
, a special observable that never produces
any elements, but also never terminates. The joining function pairs an element from the left observable
with an observable that generates elements that fall into the window of time of the right observable.
var q = l.GroupJoin(r, // windows from each left event going on forever _ => Observable.Never<Unit>(), // windows from each right event going on forever _ => Observable.Never<Unit>(), // create tuple of left event with observable of right events (left, obsOfRight) => Tuple.Create(left, obsOfRight));
The right observable can then be subscribed to, and for each element pushed to it, a comparison is made against a given element from the left observable. If the element matches specified criteria (here, a time index), then a message is generated.
var messages = new List<string>(); // e is a tuple with two items, left and obsOfRight using (q.Subscribe(e => { var xs = e.Item2; xs.Where( // filter only when datetime matches x => x[0] == e.Item1[0]) .Subscribe(v => { messages.Add(string.Format( string.Format( "{0},{1} and {2},{3} occur at the same time", e.Item1[0], e.Item1[1], v[0], v[1] ))); }); })) { Assert.AreEqual(2, messages.Count); Assert.AreEqual( "2013-01-01 02:00:00,Batch1 and " + "2013-01-01 02:00:00,Production=0 " + "occur at the same time", messages[0]); Assert.AreEqual( "2013-01-01 03:00:00,Batch2 and " + "2013-01-01 03:00:00,Production=3 " + "occur at the same time", messages[1]); } }
OpenDDS: Similar to OpenDDS's implementation of MultiTopic, GroupJoin()
can be used to combine
multiple OpenDDS data streams based on common characteristics, such as the data sample reading time, sensor ID, or
other criteria.
A number of operators are availble that provide for collecting basic statistics from an observable. When an observable completes, a single value is pushed from each of these operators, yelding the statistical result across all values produced by the observable. As such, these operators are not suitable for observers that produce infinite sequences, as since the observable never terminates, no values will ever be emitted by these operators.
The Count()
, Sum()
, Min()
, Max()
, and
Average()
operators do as their names suggest. The Aggregate()
operator
provides a means to accumulate values into a single result, and is supplied with an accumulator
function and an optional seed value as parameters. We use ToObservable()
as before
to convert a list into an observable sequence in order to demonstrate these operators.
The test below also shows that a single observable can be subscribed to multiple times. This can be advantageous when used with OpenDDS as, rather than creating mutiple subscribers for a given topic that filter the data in various ways, a single subscriber can be used and the resulting observable can be subscribed to as many times as needed to create modified observables for use by the application.
// ReactiveCS\ReactiveCS.cs [TestMethod] public void CountSumMinMaxAverageAggregate() { var o = new List<int>() { 3, 10, 8 }.ToObservable(); o.Count().Subscribe(e => Assert.AreEqual(3, e)); o.Sum().Subscribe(e => Assert.AreEqual(21, e)); o.Min().Subscribe(e => Assert.AreEqual(3, e)); o.Max().Subscribe(e => Assert.AreEqual(10, e)); o.Average().Subscribe(e => Assert.AreEqual(7, e)); o.Aggregate(6, (acc, i) => acc + i*2) .Subscribe(e => Assert.AreEqual(48, e)); }
OpenDDS: Provided that an OpenDDS observable terminates, these operators can be used to obtain statistical information from OpenDDS sample data.
While Aggregate()
applies an accumulator function over the elements of a sequence and generates a single
value when the sequence completes, the Scan()
operator does the same, but produces a value corresponding to
each value of the observed sequence.
Scott Weinstein
uses the Scan()
operator to create a new operator, ToCommonAggregates()
, to
collect statistics on an observable that are pushed out as each element in the source observable arrives.
As such, unlike Count()
and the other standard operators, ToCommonAggregates()
is suitable
to be used with infinite sequences.
Weinstein's code is shown here. As we did above, Weinstein created a class which will be used as the element type of a new observable that will be created.
// ReactiveCS\ReactiveCS.cs public class StatInfoItem<T> { public T Item { get; set; } public double Sum { get; set; } public int Count { get; set; } public double Mean { get; set; } public double M2 { get; set; } public double StdDev { get; set; } public double Min { get; set; } public double Max { get; set; } public override string ToString() { return "[" + Item + ": Sum=" + Sum + ", Count=" + Count + ", Mean=" + Mean + ", StdDev=" + StdDev + ", Min=" + Min + ", Max=" + Max + "]"; } }
The .NET Framework allows methods to appear to be added to an existing class, without actually modifying
that class. As these methods extend an existing class, they are called extension methods.
To create an extension method, first create a static class to hold them. In it, create
public static methods that take this T
as the first parameter, where T
is
the type that is being extended. As shown in this James Michael Hare
blog post,
class int
can be extended to have a Half()
method by code like:
static class Extensions { public static int Half(this int source) { return source / 2; } }
which now allows the syntax 4.Half()
to be legal, and return the value 2. Weinstein creates an extension
method that extends IObservable<T>
, the interface of observables in Rx.NET. Two functions must be provided as
parameters to ToCommonAggregates()
. The first function identifies what component
of the observable should be used for statistical analysis (such as a sensor data reading). The second function provides a name to associate with the accumulated statistics (such
as a sensor ID).
// ReactiveCS\ReactiveCS.cs static class Extensions { public static IObservable<StatInfoItem<T>> ToCommonAggregates<T, TSrc>( this IObservable<TSrc> source, Func<TSrc, double> dataSelector, Func<TSrc, T> itemSelector) {
ToCommonAggregates()
uses the Scan()
operator to accumulate its values.
The Scan()
operator is provided a starting value and a function to accumulate values,
and each time a value is added to the accumulator, it is also pushed as a sample to subscribers.
Here, the accumulator calculates various statistical values, and each time a new element appears
in the sequence to which ToCommonAggregates()
is applied, a statistical sample
is generated. The Skip()
operator is also used to drop the initial sample, as
it is a seed value that is not fully initialized.
return source.Scan(new StatInfoItem<T>(), (cur, next) => { double data = dataSelector(next); T itemp = itemSelector(next); var n = cur.Count + 1; var delta = data - cur.Mean; var meanp = cur.Mean + delta / n; var m2 = cur.M2 + delta * (data - meanp); var stdDevp = Math.Sqrt(m2 / n); return new StatInfoItem<T>() { Item = itemp, Sum = data + cur.Sum, Count = n, Mean = meanp, M2 = m2, StdDev = stdDevp, Min = Math.Min(data, cur.Min), Max = Math.Max(data, cur.Max), }; }) // need a seed, but don't want to include seed value // in the output .Skip(1); } }
We can test it as follows. We create an observable by transforming ys
.
The observable ys
is a sequence of integers. We apply the Timestamp()
operator which associates a timestamp with each element in the sequence, resulting in a type
of Timestamped<int>
. The Timestamped<T>
type is a structure with
two fields, Timestamp
, giving the time of the sample, and Value
,
which stores the value that is timestamped — in our case, the original int
.
We now transform that using the Select()
operator to generate a SensorEventArgs
type from the timestamp and value. So, at this point, we've now created another simulated OpenDDS sample.
We then apply the ToCommonAggregates()
operator, identifying the OpenDDS data reading as the
data value on which to perform statistical analysis, and the name of the sensor to label the data aggregate.
// ReactiveCS\ReactiveCS.cs [TestMethod] public void ToCommonAggregates() { var results = scheduler.CreateObserver<StatInfoItem<string>>(); var obs = // start with IObservable<int> ys // change to IObservable<Timestamped<int>> .Timestamp(scheduler) // change to IObservable<SensorEventArgs> .Select(i => new SensorEventArgs("Temp7", i.Timestamp.DateTime, i.Value)) // change to IObservable<StatInfoItem<string>> .ToCommonAggregates(i => i.Reading, i => i.SensorID);
The test looks as before — subscribe, run the scheduler, and examine the results. The
observable contains three data samples, causing three statistical aggregates to be generated.
The first aggregate is dropped due to the call to Skip()
, leaving two. The time
of each aggregate corresponds to the original time of each corresponding element in ys
,
and the aggregate observable completes when ys
does.
obs.Subscribe(results); scheduler.Start(); results.Messages.AssertEqual( OnNext(15, new StatInfoItem<string>() { Item = "Temp7", Sum = 30.0, Count = 2, Mean = 15.0, M2 = 50.0, StdDev = 5.0, Min = 0.0, Max = 20.0, }), OnNext(45, new StatInfoItem<string>() { Item = "Temp7", Sum = 60.0, Count = 3, Mean = 20.0, M2 = 200.0, StdDev = 8.16496580927726, Min = 0.0, Max = 30.0, }), OnCompleted<StatInfoItem<string>>(50) ); }
OpenDDS: As OpenDDS sample streams are potentially infinite (that is, a sensor generates values essentially
forever), the use of Scan()
for element-wise statistics gathering is likely more useful than
operators such as Count()
, Average()
and such that only produce a value when the
stream has terminated.
A number of other operators exist, and sites such as Introduction to Rx, 101 Rx Samples and the list of RxJava operators will provide greater detail, but the above gives a flavor of what operators are available in reactive frameworks.
Part I of this article showed how to convert an OpenDDS stream of data samples into an observable, but this article showed that the real power of reactive programming is to use operator composition in order to manipulate observable sequences. Reactive programming can lead to more responsive applications, and better use of hardware resources. With these articles, we've seen how to integrate OpenDDS into a reactive application, and operatators that are particularly useful for manipulating OpenDDS sample data.