Last event is not aggregated

Category: sql server streaminsight

Question

Dgrospelier on Wed, 19 Dec 2012 16:25:14


Hi,

I have a list of events that are sent to StreamInsight via an Inputadapter (WCF). These events are tickets that are bought by people who planned to attend an event.

In StreamInsight, I want to count the number of people who will attend the event.

So I have these artifacts in StreamInsight :

  • A CepStream named inputTicket receiving streams from the WCF input adapter (the StartTime of the event is UtcNow so they are sent in an ordered way)
  • I have a PassThrough query named "tickets" which is : 

var ticketQuery = from i in inputTicket select i;

  • The previous query is published as a stream inside SI so I can reuse it to create new queries (the stream is named ticketStream)
  • I have a query in charge of counting the number of ticket for a specific event (my stream contains tickets that are sold for many events). The query is as follow :
var NumAttendedPeopleByGate = from e in ticketStream
                                            group e by e.EventCode into eventGroup
                                            from window in eventGroup.AlterEventDuration(e => TimeSpan.MaxValue).SnapshotWindow()
                                            select new
                                            {
                                          
                                                EventCode = gateGroup.Key,
                                               
                                                TicketCount = window.Count()
                                            };

The query just runs fine in SI but the last event sent to SI (the ticket which is sold the later) is not count...

When I go to the Event Flow Debugger, and use the Event Propagation Analysis, I see that the last event published to SI disappears just after the Local_Cleanse_0 phase (just before the Aggregate phase).

When I test the query inside LinqPad, everything is fine...

Am I missing something in the process ?

Thanks.

Replies

DevBiker on Thu, 20 Dec 2012 12:47:27


How are you handling your CTIs? Is there a CTI inserted after that last event?

Dgrospelier on Thu, 20 Dec 2012 16:05:30


Hello,

Yes there is a CTI after each event, even the last one.

DevBiker on Thu, 20 Dec 2012 20:26:34


OK ... but how are you enqueuing your CTIs? Are you using IncreasingStartTime?

In LinqPad, if you are using IncreasingStartTime, once your enumerable is enqueued a CTI of DateTimeOffset.MaxValue is enqueued that flushes everything out. I have a feeling that there isn't a flush happening ... so there is something with the CTI that isn't "pushing" the value through the calculation engine.

And ... please clarify ... does the event appear in the Local cleanse phase? Can you post a cap of the event flow debugger?

Dgrospelier on Fri, 21 Dec 2012 05:52:02


I'm enqueuing using IncreasingStartTime.

In the event flow debugger, here is what I saw :

In this screen capture, you can see the last event selected in the PassThrough Query and the event is well propagated in the next queries (and there is a CTI after the event).

But in the second screen capture, you can see that the event appears in the local_cleanse but disappears in the aggregate phase. 

DevBiker on Fri, 21 Dec 2012 15:25:36


Try using StrictlyIncreasingStartTime instead of IncreasingStartTime. I think the problem is that the last CTI and the last event have the same timestamp. StrictlyIncreasingStartTime will enqueue a CTI 1 tick after each event. In your scenario, StrictlyIncreasingStartTime should work just fine.

DevBiker on Fri, 21 Dec 2012 15:27:56


Also, for debugging purposes and to see just how things are flowing through LinqPad vs. StreamInsight app, turn on the tracing before running the test query in LinqPad (use the command-line trace.cmd). Then compare the two. I think that the reason LinqPad is working and the app isn't is the last CTI that you'll get from a LinqPad query (DateTimeOffset.MaxValue ... AdvanceToInfinityOnShutdown) that you aren't getting from your adapter ... because your adapter isn't stopping (calling Stopped()). And it shouldn't be.