Yes, this all makes sense. The option untilWallClockTimeLimit would solve the 
problem for us. I can see the difficulty with delivering the "final" result. 
The problem here is assuming that something will be *complete* at some point in 
time and a final result can be delivered. The time for a window is not 
synchronized with the "event" that we want to detect.  Hence, the application 
*has* to deal with state that may not be *complete* i.e some sort of cross 
window aggregation has to be implemented outside the application. I am 
wondering how you can possibly avoid that.

Let me know how I can help.

Thanks
-mohan


On 6/28/19, 1:21 PM, "John Roesler" <j...@confluent.io> wrote:

    Ok, good, that's what I was hoping. I think that's a good strategy, at
    the end of the "real" data, just write a dummy record with the same
    keys with a high timestamp to flush everything else through.
    
    For the most part, I'd expect a production program to get a steady
    stream of traffic with increasing timestamps, so windows would be
    constantly be getting flushed out as stream time moves forward.
    
    Some folks have reported that they don't get enough traffic through
    their program to flush out the suppressed results on a regular basis,
    though. Right now, the best solution is to have the same Producer that
    writes data to the input topics also write "heartbeat/dummy" records
    periodically when there is no data to send, just to keep stream time
    moving forward. But this isn't a perfect solution, as you have pointed
    out in this thread; you really want the heartbeat records to go to all
    partitions, and also go to all re-partitions if there are repartition
    topics in the topology.
    
    I agree that there seems to be a need for some first-class support for
    keeping stream time moving reliably. I think that the ideal would be
    to allow Producers to automatically send the heartbeats and to
    implement a variant of the Chandy-Lamport distributed snapshot
    algorithm to push them though the whole topology while skipping any
    actual computations (so your business logic wouldn't have to see
    them). I'd really love to see this feature in Streams; I just haven't
    written it up yet because I haven't had time.
    
    During the design for Suppress, we did consider having some kind of
    timer, but the problem is that it's not possible for this to be
    deterministic. If you want the "until window closes" version, you're
    supposed to get a guarantee that you'll really only see one, final,
    result for each window/key. If we were to use the system clock on the
    Streams machine to decide it's probably been long enough and emit the
    "final" result, but it turns out that we actually had just stalled
    (maybe waiting for quorum during a broker upgrade or something, or
    just a run-of-the-mill networking problem) and the next record we poll
    was supposed to be in the window, what can we do? We already emitted
    the "final" result, so we can say "oops, that wasn't the final result,
    _this_ one is the final result", but that seems to render the words
    "final result" kind of meaningless. On the other hand, we can just
    drop that record and ignore it, but that's a bummer because the only
    reason we couldn't include it in the result was some ephemeral
    environmental problem. If we run the same data through the same
    program again, we'd get a different result.
    
    So for "until window closes" mode, where we guarantee you _only_ see
    the final results, we only offer stream time expiration. Anything else
    would violate correctness one way or another. On the other hand, you
    have "until time limit" mode. In that case, it's just "buffer for a
    while, but multiple results are still ok" semantics. For that case, we
    have 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time
 
    , to indeed use a timer to emit events if too much time passes on the
    Streams side. It's just not implemented yet.
    
    Does this all seem about right to you?
    -john
    
    On Wed, Jun 26, 2019 at 12:57 PM Parthasarathy, Mohan <mpart...@hpe.com> 
wrote:
    >
    > Initially it started in the testing. QA reported problems where "events" 
were not detected after they finished their testing. After this discussion, my 
proposal was to send a few more records to cause the windows to flush so that 
the suppressed event would show up. Now it looks to me, these few dummy records 
have to match the "key" of the pending windows. Then it would be flushed.
    >
    > In practice, it may not be a problem always. But then the real time 
nature of the problem might require us that there is not a huge delay between 
the processing of the event and the flush. How does one solve this issue in 
production ? I am wondering why the design did not accommodate a timer to flush 
the windows ?
    >
    > Thanks
    > Mohan
    >
    >
    > On 6/26/19, 8:18 AM, "John Roesler" <j...@confluent.io> wrote:
    >
    >     Hi Mohan,
    >
    >     I see where you're going with this, and it might indeed be a
    >     challenge. Even if you send a "dummy" message on all input topics, you
    >     won't have a guarantee that after the repartition, the dummy message
    >     is propagated to all partitions of the repartition topics. So it might
    >     be difficult to force the suppression buffer to flush if it's after a
    >     repartition.
    >
    >     Can we take a step back and discuss the motivation for forcing the
    >     records to flush out? Is this for testing your app, or is it to drive
    >     some production logic?
    >
    >     Thanks,
    >     -John
    >
    >
    >     On Mon, Jun 24, 2019 at 7:26 PM Parthasarathy, Mohan 
<mpart...@hpe.com> wrote:
    >     >
    >     > John,
    >     >
    >     > Thanks for the nice explanation. When the repartitioning happens, 
does the window get associated with the new partition i.e., now does a message 
with new timestamp has to appear on the repartition topic for the window to 
expire ? It is possible that there is new stream of messages coming in but 
post-map operation, the partitions in the repartitioned topic does not see the 
same thing.
    >     >
    >     > Thanks
    >     > Mohan
    >     >
    >     > On 6/24/19, 7:49 AM, "John Roesler" <j...@confluent.io> wrote:
    >     >
    >     >     Hey, this is a very apt question.
    >     >
    >     >     GroupByKey isn't a great example because it doesn't actually 
change
    >     >     the key, so all the aggregation results are actually on records 
from
    >     >     the same partition. But let's say you do a groupBy or a map (or 
any
    >     >     operation that can change the key), followed by an aggregation. 
Now
    >     >     it's possible that the aggregation would need to process 
records from
    >     >     two different partitions. In such a case (key-changing operation
    >     >     followed by a stateful operation), Streams actually round-trips 
the
    >     >     data through an intermediate topic, called a repartition topic, 
before
    >     >     the aggregation. This has the effect, similar to the "shuffle" 
phase
    >     >     of map-reduce, of putting all the data into its *new* right 
partition,
    >     >     so then the aggregation can still process each of its partitions
    >     >     independently.
    >     >
    >     >     Regarding the latter statement, even though you only have one
    >     >     instance, Streams _still_ processes each partition 
independently. The
    >     >     "unit of work" responsible for processing a partition is called 
a
    >     >     "task". So if you have 4 partitions, then your one instance 
actually
    >     >     has 4 state stores, one for each task, where each task only gets
    >     >     records from a single partition. The tasks can't see anything 
about
    >     >     each other, not their state nor other metadata like their 
current
    >     >     stream time. Otherwise, the results would depend on which tasks 
happen
    >     >     to be co-located with which other tasks. So, having to send your
    >     >     "purge" event to all partitions is a pain, but in the end, it 
buys you
    >     >     a lot, as you can add another instance to your cluster at any 
time,
    >     >     and Streams will scale up, and you'll know that the program is
    >     >     executing exactly the same way the whole time.
    >     >
    >     >     -John
    >     >
    >     >     On Sat, Jun 22, 2019 at 4:37 PM Parthasarathy, Mohan 
<mpart...@hpe.com> wrote:
    >     >     >
    >     >     > I can see the issue. But it raised other questions. Pardon my 
ignorance. Even though partitions are processed independently, windows can be 
aggregating state from records read from many partitions. Let us say there is a 
groupByKey followed by aggregate. In this case how is the state reconciled 
across all the application instances ? Is there a designated instance for a 
particular key ?
    >     >     >
    >     >     > In my case, there was only one instance processing records 
from all partitions and it is kind of odd that windows did not expire even 
though I understand why now.
    >     >     >
    >     >     > Thanks
    >     >     > Mohan
    >     >     >
    >     >     >
    >     >     > On 6/21/19, 2:25 PM, "John Roesler" <j...@confluent.io> wrote:
    >     >     >
    >     >     >     No problem. It's definitely a subtlety. It occurs because 
each
    >     >     >     partition is processed completely independently of the 
others, so
    >     >     >     "stream time" is tracked per partition, and there's no 
way to look
    >     >     >     across at the other partitions to find out what stream 
time they have.
    >     >     >
    >     >     >     In general, it's not a problem because you'd expect all 
partitions to
    >     >     >     receive updates over time, but if you're specifically 
trying to send
    >     >     >     events that cause stuff to get flushed from the buffers, 
it can mess
    >     >     >     with you. It's especially notable in tests. So, for most 
tests, I just
    >     >     >     configure the topics to have one partition.
    >     >     >
    >     >     >     -John
    >     >     >
    >     >     >     On Fri, Jun 21, 2019 at 3:56 PM Parthasarathy, Mohan 
<mpart...@hpe.com> wrote:
    >     >     >     >
    >     >     >     > That change "In the same partition" must explain what 
we are seeing. Unless you see one message per partition, all windows will not 
expire. That is an interesting twist. Thanks for the correction ( I will go 
back and confirm this.
    >     >     >     >
    >     >     >     > -mohan
    >     >     >     >
    >     >     >     >
    >     >     >     > On 6/21/19, 12:40 PM, "John Roesler" 
<j...@confluent.io> wrote:
    >     >     >     >
    >     >     >     >     Sure, the record cache attempts to save downstream 
operators from
    >     >     >     >     unnecessary updates by also buffering for a short 
amount of time
    >     >     >     >     before forwarding. It forwards results whenever the 
cache fills up or
    >     >     >     >     whenever there is a commit. If you're happy to wait 
at least "commit
    >     >     >     >     interval" amount of time for updates, then you 
don't need to do
    >     >     >     >     anything, but if you're on the edge of your seat, 
waiting for these
    >     >     >     >     results, you can set cache.max.bytes.buffering to 0 
to disable the
    >     >     >     >     record cache entirely. Note that this would hurt 
throughput in
    >     >     >     >     general, though.
    >     >     >     >
    >     >     >     >     Just a slight modification:
    >     >     >     >     * a new record with new timestamp > (all the 
previous timestamps +
    >     >     >     >     grace period) will cause all the old windows *in 
the same partition*
    >     >     >     >     to close
    >     >     >     >     * yes, expiry of the window depends only on the 
event time
    >     >     >     >
    >     >     >     >     Hope this helps!
    >     >     >     >     -John
    >     >     >     >
    >     >     >     >     On Thu, Jun 20, 2019 at 11:42 AM Parthasarathy, 
Mohan <mpart...@hpe.com> wrote:
    >     >     >     >     >
    >     >     >     >     > Could you tell me a little more about the delays 
about the record caches and how I can disable it ?
    >     >     >     >     >
    >     >     >     >     >  If I could summarize my problem:
    >     >     >     >     >
    >     >     >     >     > -A new record with a new timestamp > all records 
sent before, I expect *all* of the old windows to close
    >     >     >     >     > -Expiry of the windows depends only on the event 
time and not on the key
    >     >     >     >     >
    >     >     >     >     > Are these two statements correct ?
    >     >     >     >     >
    >     >     >     >     > Thanks
    >     >     >     >     > Mohan
    >     >     >     >     >
    >     >     >     >     > On 6/20/19, 9:17 AM, "John Roesler" 
<j...@confluent.io> wrote:
    >     >     >     >     >
    >     >     >     >     >     Hi!
    >     >     >     >     >
    >     >     >     >     >     In addition to setting the grace period to 
zero (or some small
    >     >     >     >     >     number), you should also consider the delays 
introduced by record
    >     >     >     >     >     caches upstream of the suppression. If you're 
closely watching the
    >     >     >     >     >     timing of records going into and coming out 
of the topology, this
    >     >     >     >     >     might also spoil your expectations. You could 
always disable the
    >     >     >     >     >     record cache to make the system more 
predictable (although this would
    >     >     >     >     >     hurt throughput in production).
    >     >     >     >     >
    >     >     >     >     >     Thanks,
    >     >     >     >     >     -John
    >     >     >     >     >
    >     >     >     >     >     On Wed, Jun 19, 2019 at 3:01 PM 
Parthasarathy, Mohan <mpart...@hpe.com> wrote:
    >     >     >     >     >     >
    >     >     >     >     >     > We do explicitly set the grace period to 
zero. I am going to try the new version
    >     >     >     >     >     >
    >     >     >     >     >     > -mohan
    >     >     >     >     >     >
    >     >     >     >     >     >
    >     >     >     >     >     > On 6/19/19, 12:50 PM, "Parthasarathy, 
Mohan" <mpart...@hpe.com> wrote:
    >     >     >     >     >     >
    >     >     >     >     >     >     Thanks. We will give it a shot.
    >     >     >     >     >     >
    >     >     >     >     >     >     On 6/19/19, 12:42 PM, "Bruno Cadonna" 
<br...@confluent.io> wrote:
    >     >     >     >     >     >
    >     >     >     >     >     >         Hi Mohan,
    >     >     >     >     >     >
    >     >     >     >     >     >         I realized that my previous 
statement was not clear. With a grace
    >     >     >     >     >     >         period of 12 hour, suppress would 
wait for late events until stream
    >     >     >     >     >     >         time has advanced 12 hours before a 
result would be emitted.
    >     >     >     >     >     >
    >     >     >     >     >     >         Best,
    >     >     >     >     >     >         Bruno
    >     >     >     >     >     >
    >     >     >     >     >     >         On Wed, Jun 19, 2019 at 9:21 PM 
Bruno Cadonna <br...@confluent.io> wrote:
    >     >     >     >     >     >         >
    >     >     >     >     >     >         > Hi Mohan,
    >     >     >     >     >     >         >
    >     >     >     >     >     >         > if you do not set a grace period, 
the grace period defaults to 12
    >     >     >     >     >     >         > hours. Hence, suppress would wait 
for an event that occurs 12 hour
    >     >     >     >     >     >         > later before it outputs a result. 
Try to explicitly set the grace
    >     >     >     >     >     >         > period to 0 and let us know if it 
worked.
    >     >     >     >     >     >         >
    >     >     >     >     >     >         > If it still does not work, 
upgrade to version 2.2.1 if it is possible
    >     >     >     >     >     >         > for you. We had a couple of bugs 
in suppress recently that are fixed
    >     >     >     >     >     >         > in that version.
    >     >     >     >     >     >         >
    >     >     >     >     >     >         > Best,
    >     >     >     >     >     >         > Bruno
    >     >     >     >     >     >         >
    >     >     >     >     >     >         > On Wed, Jun 19, 2019 at 8:37 PM 
Parthasarathy, Mohan <mpart...@hpe.com> wrote:
    >     >     >     >     >     >         > >
    >     >     >     >     >     >         > > No, I have not set any grace 
period. Is that mandatory ? Have you seen problems with suppress and windows 
expiring ?
    >     >     >     >     >     >         > >
    >     >     >     >     >     >         > > Thanks
    >     >     >     >     >     >         > > Mohan
    >     >     >     >     >     >         > >
    >     >     >     >     >     >         > > On 6/19/19, 12:41 AM, "Bruno 
Cadonna" <br...@confluent.io> wrote:
    >     >     >     >     >     >         > >
    >     >     >     >     >     >         > >     Hi Mohan,
    >     >     >     >     >     >         > >
    >     >     >     >     >     >         > >     Did you set a grace period 
on the window?
    >     >     >     >     >     >         > >
    >     >     >     >     >     >         > >     Best,
    >     >     >     >     >     >         > >     Bruno
    >     >     >     >     >     >         > >
    >     >     >     >     >     >         > >     On Tue, Jun 18, 2019 at 
2:04 AM Parthasarathy, Mohan <mpart...@hpe.com> wrote:
    >     >     >     >     >     >         > >     >
    >     >     >     >     >     >         > >     > On further debugging, 
what we are seeing is that windows are expiring rather randomly as new messages 
are being processed. . We tested with new key for every new message. We waited 
for the window time before replaying new messages. Sometimes a new message 
would come in and create state. It takes several messages to make some of the 
old windows to be closed (go past suppress to the next stage). We have also 
seen where one of them never closed even but several other older ones expired.  
Then we explicitly sent a message with the same old key and then it showed up. 
Also, for every new message, only one of the previous window expires even 
though there are several pending.
    >     >     >     >     >     >         > >     >
    >     >     >     >     >     >         > >     > If we don't use suppress, 
then there is never an issue. With suppress, the behavior we are seeing is 
weird. We are using 2.1.0 version in DSL mode. Any clues on what we could be 
missing ? Why isn't there an order in the way windows are closed ? As event 
time progresses by the new messages arriving, the older ones should expire. Is 
that right understanding or not ?
    >     >     >     >     >     >         > >     >
    >     >     >     >     >     >         > >     > Thanks
    >     >     >     >     >     >         > >     > Mohan
    >     >     >     >     >     >         > >     >
    >     >     >     >     >     >         > >     > On 6/17/19, 3:43 PM, 
"Parthasarathy, Mohan" <mpart...@hpe.com> wrote:
    >     >     >     >     >     >         > >     >
    >     >     >     >     >     >         > >     >     Hi,
    >     >     >     >     >     >         > >     >
    >     >     >     >     >     >         > >     >     We are using suppress 
in the application. We see some state being created at some point in time. Now 
there is no new data for a day or two. We send new data but the old window of 
data (where we see the state being created) is not closing i.e not seeing it go 
through suppress and on to the next stage. It is as though the state created 
earlier was purged. Is this possible ?
    >     >     >     >     >     >         > >     >
    >     >     >     >     >     >         > >     >     Thanks
    >     >     >     >     >     >         > >     >     Mohan
    >     >     >     >     >     >         > >     >
    >     >     >     >     >     >         > >     >
    >     >     >     >     >     >         > >     >
    >     >     >     >     >     >         > >
    >     >     >     >     >     >         > >
    >     >     >     >     >     >
    >     >     >     >     >     >
    >     >     >     >     >     >
    >     >     >     >     >     >
    >     >     >     >     >
    >     >     >     >     >
    >     >     >     >
    >     >     >     >
    >     >     >
    >     >     >
    >     >
    >     >
    >
    >
    

Reply via email to