split input stream by payload date

Category: sql server streaminsight


MAshworth on Fri, 20 Sep 2013 11:15:22

Hi, just started with streaminsight and have a problem I'm struggling to solve.

I have a stream of data from different devices, read from a database, where the values are almost in chronological order - a few seconds to about a minute out of order at most. I set the CTI interval to 60 sec and this has resolved the vast majority of issues. However there are some devices that are sending data with payload timestamps a couple of days old.

I'm creating groups from the stream based on the device ID but dont ever see data from the devices that are days behind because (i think) the time of the event for these devices is before the CTI events that are occuring every 60 sec so they are discarded.

How should i handle data (that i need to keep and process) that is days behind 'now' and keep the rest of the stream processing responsive?


DevBiker on Sun, 22 Sep 2013 13:23:40

I've actually run across this once before.

First, you are using declarative CTI generation; that's why you don't get any exceptions for CTI violations. Instead, the offending events are quietly dropped. When doing this, you do have an option of adding additional latency in there ... not just the duration but the delay. This, however, won't work for you because of the timestamps that are a couple of days old.

Taking a step back and looking at it, you actually have multiple independent streams with independent timelines. However, they are coming through an aggregator as a single stream into StreamInsight so your natural inclination is to enqueue them as a single stream, which doesn't work. So ... what you need is to be able to enqueue them as a single stream but then split them into separate timelines later. You can do this with a subject.

Here's how it would work:

1) Inbound events from the aggregating source. In the payload for these events is the source system time but the event time is the arrival time. This keeps all of the inbound events moving forward on the same timeline. You can also use this stream to detect offline sources and/or add metadata if you know the offset/error/latency from the source event.

2) Publish the inbound events to a subject.

3) Subscribe to the subject and create a new stream for each source. In these new streams, you use the source system time for the event time. Because each stream is separate, with its own timeline, you'll be good. Now you can do your analytics on the source events in their original timeline.

I'll see if I can put a sample together on my blog today or tomorrow.

DevBiker on Mon, 23 Sep 2013 02:38:36

Here you go: http://www.devbiker.net/post/Multiple-Timelines-From-a-Single-(Input)-Source.aspx.

Download the sample ... :-)

MAshworth on Mon, 23 Sep 2013 08:37:12

Thanks for the very comprehensive answer (and especially for the sample). I'd not looked at subjects to solve this and was going to try retrieving the out of date data separately into different streams but your example look far more elegant and flexible. I'll try and adopt this approach.

For the app I'm working on (getting data from vehicles that can be out of network coverage for long periods / all have different clocks) if i create a subject per vehicle there could be hundreds of independent streams active at once. Won't creating lots of streams have an impact on memory / performance? I'll have to create the streams dynamically as i wont know what the IDs used to separate the source data will be until they arrive, so I guess I'll also need some sort of internal list of subjects and tidy up unused/dead ones periodically too?

DevBiker on Mon, 23 Sep 2013 10:38:52

Hmmmm ... the use case that I've run into out there was will an event aggregator that aggregated multiple wells, so there was a limited number of additional processes.

First, let's clarify one thing: you have only 1 subject. You then have additional processes for each (in your case) vehicle. And yes, if you create a subject per vehicle, you could wind up with a lot of processes. How bad this would be depends on the event rates from the vehicles and would mainly be a challenge with scheduling - scheduling is done at the process level - and be impacted by the edition of StreamInsight that you are using - Standard has 1 scheduler and Premium has 1 per core. There is some memory overhead involved with a process but my initial concern would be the scheduling vs. the memory. So yes, some kind of dynamic process to start (and probably stop) the processes would be necessary. You won't have to have any kind of internal list, however - since each "car" is a separate process, you can use the processes collection to separate them. You could possibly use the initial aggregation stream to do this - that, at least, is where I would start looking at doing it.

You'll also want to think about how you handle the output ... you can use a subject here as well. In this case, you'd have a single subject for each output payload that all of the relevant queries publish to and then your sink would subscribe to that subject. For this, you'd re-convert back to system time for the stream's timeline/clock.

And Subjects are your friend. They provide solutions to soooo many different challenges. Know them ... love them ... cherish them forever. ;-)