Initially it started in the testing. QA reported problems where "events" were 
not detected after they finished their testing. After this discussion, my 
proposal was to send a few more records to cause the windows to flush so that 
the suppressed event would show up. Now it looks to me, these few dummy records 
have to match the "key" of the pending windows. Then it would be flushed. 

In practice, it may not be a problem always. But then the real time nature of 
the problem might require us that there is not a huge delay between the 
processing of the event and the flush. How does one solve this issue in 
production ? I am wondering why the design did not accommodate a timer to flush 
the windows ? 

Thanks
Mohan


On 6/26/19, 8:18 AM, "John Roesler" <j...@confluent.io> wrote:

    Hi Mohan,
    
    I see where you're going with this, and it might indeed be a
    challenge. Even if you send a "dummy" message on all input topics, you
    won't have a guarantee that after the repartition, the dummy message
    is propagated to all partitions of the repartition topics. So it might
    be difficult to force the suppression buffer to flush if it's after a
    repartition.
    
    Can we take a step back and discuss the motivation for forcing the
    records to flush out? Is this for testing your app, or is it to drive
    some production logic?
    
    Thanks,
    -John
    
    
    On Mon, Jun 24, 2019 at 7:26 PM Parthasarathy, Mohan <mpart...@hpe.com> 
wrote:
    >
    > John,
    >
    > Thanks for the nice explanation. When the repartitioning happens, does 
the window get associated with the new partition i.e., now does a message with 
new timestamp has to appear on the repartition topic for the window to expire ? 
It is possible that there is new stream of messages coming in but post-map 
operation, the partitions in the repartitioned topic does not see the same 
thing.
    >
    > Thanks
    > Mohan
    >
    > On 6/24/19, 7:49 AM, "John Roesler" <j...@confluent.io> wrote:
    >
    >     Hey, this is a very apt question.
    >
    >     GroupByKey isn't a great example because it doesn't actually change
    >     the key, so all the aggregation results are actually on records from
    >     the same partition. But let's say you do a groupBy or a map (or any
    >     operation that can change the key), followed by an aggregation. Now
    >     it's possible that the aggregation would need to process records from
    >     two different partitions. In such a case (key-changing operation
    >     followed by a stateful operation), Streams actually round-trips the
    >     data through an intermediate topic, called a repartition topic, before
    >     the aggregation. This has the effect, similar to the "shuffle" phase
    >     of map-reduce, of putting all the data into its *new* right partition,
    >     so then the aggregation can still process each of its partitions
    >     independently.
    >
    >     Regarding the latter statement, even though you only have one
    >     instance, Streams _still_ processes each partition independently. The
    >     "unit of work" responsible for processing a partition is called a
    >     "task". So if you have 4 partitions, then your one instance actually
    >     has 4 state stores, one for each task, where each task only gets
    >     records from a single partition. The tasks can't see anything about
    >     each other, not their state nor other metadata like their current
    >     stream time. Otherwise, the results would depend on which tasks happen
    >     to be co-located with which other tasks. So, having to send your
    >     "purge" event to all partitions is a pain, but in the end, it buys you
    >     a lot, as you can add another instance to your cluster at any time,
    >     and Streams will scale up, and you'll know that the program is
    >     executing exactly the same way the whole time.
    >
    >     -John
    >
    >     On Sat, Jun 22, 2019 at 4:37 PM Parthasarathy, Mohan 
<mpart...@hpe.com> wrote:
    >     >
    >     > I can see the issue. But it raised other questions. Pardon my 
ignorance. Even though partitions are processed independently, windows can be 
aggregating state from records read from many partitions. Let us say there is a 
groupByKey followed by aggregate. In this case how is the state reconciled 
across all the application instances ? Is there a designated instance for a 
particular key ?
    >     >
    >     > In my case, there was only one instance processing records from all 
partitions and it is kind of odd that windows did not expire even though I 
understand why now.
    >     >
    >     > Thanks
    >     > Mohan
    >     >
    >     >
    >     > On 6/21/19, 2:25 PM, "John Roesler" <j...@confluent.io> wrote:
    >     >
    >     >     No problem. It's definitely a subtlety. It occurs because each
    >     >     partition is processed completely independently of the others, 
so
    >     >     "stream time" is tracked per partition, and there's no way to 
look
    >     >     across at the other partitions to find out what stream time 
they have.
    >     >
    >     >     In general, it's not a problem because you'd expect all 
partitions to
    >     >     receive updates over time, but if you're specifically trying to 
send
    >     >     events that cause stuff to get flushed from the buffers, it can 
mess
    >     >     with you. It's especially notable in tests. So, for most tests, 
I just
    >     >     configure the topics to have one partition.
    >     >
    >     >     -John
    >     >
    >     >     On Fri, Jun 21, 2019 at 3:56 PM Parthasarathy, Mohan 
<mpart...@hpe.com> wrote:
    >     >     >
    >     >     > That change "In the same partition" must explain what we are 
seeing. Unless you see one message per partition, all windows will not expire. 
That is an interesting twist. Thanks for the correction ( I will go back and 
confirm this.
    >     >     >
    >     >     > -mohan
    >     >     >
    >     >     >
    >     >     > On 6/21/19, 12:40 PM, "John Roesler" <j...@confluent.io> 
wrote:
    >     >     >
    >     >     >     Sure, the record cache attempts to save downstream 
operators from
    >     >     >     unnecessary updates by also buffering for a short amount 
of time
    >     >     >     before forwarding. It forwards results whenever the cache 
fills up or
    >     >     >     whenever there is a commit. If you're happy to wait at 
least "commit
    >     >     >     interval" amount of time for updates, then you don't need 
to do
    >     >     >     anything, but if you're on the edge of your seat, waiting 
for these
    >     >     >     results, you can set cache.max.bytes.buffering to 0 to 
disable the
    >     >     >     record cache entirely. Note that this would hurt 
throughput in
    >     >     >     general, though.
    >     >     >
    >     >     >     Just a slight modification:
    >     >     >     * a new record with new timestamp > (all the previous 
timestamps +
    >     >     >     grace period) will cause all the old windows *in the same 
partition*
    >     >     >     to close
    >     >     >     * yes, expiry of the window depends only on the event time
    >     >     >
    >     >     >     Hope this helps!
    >     >     >     -John
    >     >     >
    >     >     >     On Thu, Jun 20, 2019 at 11:42 AM Parthasarathy, Mohan 
<mpart...@hpe.com> wrote:
    >     >     >     >
    >     >     >     > Could you tell me a little more about the delays about 
the record caches and how I can disable it ?
    >     >     >     >
    >     >     >     >  If I could summarize my problem:
    >     >     >     >
    >     >     >     > -A new record with a new timestamp > all records sent 
before, I expect *all* of the old windows to close
    >     >     >     > -Expiry of the windows depends only on the event time 
and not on the key
    >     >     >     >
    >     >     >     > Are these two statements correct ?
    >     >     >     >
    >     >     >     > Thanks
    >     >     >     > Mohan
    >     >     >     >
    >     >     >     > On 6/20/19, 9:17 AM, "John Roesler" <j...@confluent.io> 
wrote:
    >     >     >     >
    >     >     >     >     Hi!
    >     >     >     >
    >     >     >     >     In addition to setting the grace period to zero (or 
some small
    >     >     >     >     number), you should also consider the delays 
introduced by record
    >     >     >     >     caches upstream of the suppression. If you're 
closely watching the
    >     >     >     >     timing of records going into and coming out of the 
topology, this
    >     >     >     >     might also spoil your expectations. You could 
always disable the
    >     >     >     >     record cache to make the system more predictable 
(although this would
    >     >     >     >     hurt throughput in production).
    >     >     >     >
    >     >     >     >     Thanks,
    >     >     >     >     -John
    >     >     >     >
    >     >     >     >     On Wed, Jun 19, 2019 at 3:01 PM Parthasarathy, 
Mohan <mpart...@hpe.com> wrote:
    >     >     >     >     >
    >     >     >     >     > We do explicitly set the grace period to zero. I 
am going to try the new version
    >     >     >     >     >
    >     >     >     >     > -mohan
    >     >     >     >     >
    >     >     >     >     >
    >     >     >     >     > On 6/19/19, 12:50 PM, "Parthasarathy, Mohan" 
<mpart...@hpe.com> wrote:
    >     >     >     >     >
    >     >     >     >     >     Thanks. We will give it a shot.
    >     >     >     >     >
    >     >     >     >     >     On 6/19/19, 12:42 PM, "Bruno Cadonna" 
<br...@confluent.io> wrote:
    >     >     >     >     >
    >     >     >     >     >         Hi Mohan,
    >     >     >     >     >
    >     >     >     >     >         I realized that my previous statement was 
not clear. With a grace
    >     >     >     >     >         period of 12 hour, suppress would wait 
for late events until stream
    >     >     >     >     >         time has advanced 12 hours before a 
result would be emitted.
    >     >     >     >     >
    >     >     >     >     >         Best,
    >     >     >     >     >         Bruno
    >     >     >     >     >
    >     >     >     >     >         On Wed, Jun 19, 2019 at 9:21 PM Bruno 
Cadonna <br...@confluent.io> wrote:
    >     >     >     >     >         >
    >     >     >     >     >         > Hi Mohan,
    >     >     >     >     >         >
    >     >     >     >     >         > if you do not set a grace period, the 
grace period defaults to 12
    >     >     >     >     >         > hours. Hence, suppress would wait for 
an event that occurs 12 hour
    >     >     >     >     >         > later before it outputs a result. Try 
to explicitly set the grace
    >     >     >     >     >         > period to 0 and let us know if it 
worked.
    >     >     >     >     >         >
    >     >     >     >     >         > If it still does not work, upgrade to 
version 2.2.1 if it is possible
    >     >     >     >     >         > for you. We had a couple of bugs in 
suppress recently that are fixed
    >     >     >     >     >         > in that version.
    >     >     >     >     >         >
    >     >     >     >     >         > Best,
    >     >     >     >     >         > Bruno
    >     >     >     >     >         >
    >     >     >     >     >         > On Wed, Jun 19, 2019 at 8:37 PM 
Parthasarathy, Mohan <mpart...@hpe.com> wrote:
    >     >     >     >     >         > >
    >     >     >     >     >         > > No, I have not set any grace period. 
Is that mandatory ? Have you seen problems with suppress and windows expiring ?
    >     >     >     >     >         > >
    >     >     >     >     >         > > Thanks
    >     >     >     >     >         > > Mohan
    >     >     >     >     >         > >
    >     >     >     >     >         > > On 6/19/19, 12:41 AM, "Bruno Cadonna" 
<br...@confluent.io> wrote:
    >     >     >     >     >         > >
    >     >     >     >     >         > >     Hi Mohan,
    >     >     >     >     >         > >
    >     >     >     >     >         > >     Did you set a grace period on the 
window?
    >     >     >     >     >         > >
    >     >     >     >     >         > >     Best,
    >     >     >     >     >         > >     Bruno
    >     >     >     >     >         > >
    >     >     >     >     >         > >     On Tue, Jun 18, 2019 at 2:04 AM 
Parthasarathy, Mohan <mpart...@hpe.com> wrote:
    >     >     >     >     >         > >     >
    >     >     >     >     >         > >     > On further debugging, what we 
are seeing is that windows are expiring rather randomly as new messages are 
being processed. . We tested with new key for every new message. We waited for 
the window time before replaying new messages. Sometimes a new message would 
come in and create state. It takes several messages to make some of the old 
windows to be closed (go past suppress to the next stage). We have also seen 
where one of them never closed even but several other older ones expired.  Then 
we explicitly sent a message with the same old key and then it showed up. Also, 
for every new message, only one of the previous window expires even though 
there are several pending.
    >     >     >     >     >         > >     >
    >     >     >     >     >         > >     > If we don't use suppress, then 
there is never an issue. With suppress, the behavior we are seeing is weird. We 
are using 2.1.0 version in DSL mode. Any clues on what we could be missing ? 
Why isn't there an order in the way windows are closed ? As event time 
progresses by the new messages arriving, the older ones should expire. Is that 
right understanding or not ?
    >     >     >     >     >         > >     >
    >     >     >     >     >         > >     > Thanks
    >     >     >     >     >         > >     > Mohan
    >     >     >     >     >         > >     >
    >     >     >     >     >         > >     > On 6/17/19, 3:43 PM, 
"Parthasarathy, Mohan" <mpart...@hpe.com> wrote:
    >     >     >     >     >         > >     >
    >     >     >     >     >         > >     >     Hi,
    >     >     >     >     >         > >     >
    >     >     >     >     >         > >     >     We are using suppress in 
the application. We see some state being created at some point in time. Now 
there is no new data for a day or two. We send new data but the old window of 
data (where we see the state being created) is not closing i.e not seeing it go 
through suppress and on to the next stage. It is as though the state created 
earlier was purged. Is this possible ?
    >     >     >     >     >         > >     >
    >     >     >     >     >         > >     >     Thanks
    >     >     >     >     >         > >     >     Mohan
    >     >     >     >     >         > >     >
    >     >     >     >     >         > >     >
    >     >     >     >     >         > >     >
    >     >     >     >     >         > >
    >     >     >     >     >         > >
    >     >     >     >     >
    >     >     >     >     >
    >     >     >     >     >
    >     >     >     >     >
    >     >     >     >
    >     >     >     >
    >     >     >
    >     >     >
    >     >
    >     >
    >
    >
    

Reply via email to