Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-24 Thread Bruno Cadonna
Hi Jiri,

Thank you for the follow up.

I guess, it can happen that during start-up and the respective
rebalances some partitions are read more often than others and that
consequently the timestamps in the repartition topic are mixed up more
than during normal operation. Unfortunately, I do not know how to
resolve this other than you did.

Best,
Bruno

On Mon, Feb 24, 2020 at 10:58 AM Samek, Jiří  wrote:
>
> Hi all,
>
> I am writing to describe where I got with the issue.
>
> The next thing I wanted to do was to check topics if they contain records
> with mixed timestamps in single partition that could cause the
> warning "Skipping record for expired segment" - meaning, timestamp of
> incoming record is behind observed stream time and out-of a join window. I
> did the check. They do have the mixed timestamps. The input topics of the
> streaming app have timestamps in order. But I need to do repartitioning to
> introduce key needed for the join. And the repartition topics have mixed
> timestamps in single partition.
>
> It doesn't happen during continues run of the streaming application. It
> happens when I stop the streaming application for several minutes or
> more. The stream app is deployed in 10 instances. My theory is, that stream
> tasks doesn't start at the same time or doesn't process records at the same
> speed. So it can happen that one task is writing records with different
> timestamps than other task is writing to a given partition of the
> repartition topic. And so they get mixed. I am not aware of mechanism in
> Kafka Streams that could prevent mixing timestamps in repartition topics.
> If there is one, or if there is a configuration or something that could
> mitigate it, please let me know.
>
> So, in light of it, I think the warning is definitely a good thing. I have
> increased join-window duration to handle 2 hours pauses in stream
> processing (2 hour out-of-order records) which should do it for most cases.
> Increasing window duration has memory and cpu impact, I still wonder if
> there is more efficient way how to resolve it.
>
> Best Regards,
> Jiri
>
>
>
>
> On Tue, Feb 11, 2020 at 6:45 PM John Roesler  wrote:
>
> > Hi Jiří,
> >
> > Hmm, that is a mystery indeed. to inspect the log, you could try
> > kafka-dump-log (I've never used it).
> >
> > What I have done before is use kafka-console-consumer, with the
> > following option specified:
> > --property 
> >properties include:
> >
> > print.timestamp=true|false
> >
> > Which version of Streams are you running? This is bringing up a
> > vague memory of when I refactored the retention time logic a while
> > back, and added logic to skip writing changelog records upon restore
> > when we detect that they would already be expired according to the
> > current stream time. Previously, we would go ahead and write them
> > and then have to rotate store segments later on during the restoration
> > when we reach the current stream time. This is a pretty heavy and
> > completely avoidable I/O operation. If this is what's happening, then
> > it's just an unforseen consequence of the new log level. We might
> > need to follow up with a change to suppress the warnings specifically
> > in this circumstance.
> >
> > Feel free to open a bug ticket with all the relevant version info, repro,
> > logs etc., you've collected if you feel like the above might be what's
> > happening.
> >
> > For clarity, this wouldn't be a correctness problem at all, just a
> > misleading
> > and troubling log message we shouldn't be producing.
> >
> > Thanks,
> > -John
> >
> > On Tue, Feb 11, 2020, at 11:21, Samek, Jiří wrote:
> > > Hi Bruno, John and Sophie,
> > >
> > > thank you very much for quick responses, you are the best. After thinking
> > > about it a little bit more, it seems fishy.
> > >
> > > From logs, I see that it is not happening when application is running
> > > normally.
> > >
> > > I have checked timestamps (windowStartTimestamp) - connecting local
> > > instance in debug mode to Kafka cluster. And they are mixed up. Not
> > always,
> > > there can be a day with good sequence and then a time interval with mixed
> > > up timestamps, like these (store retention is 20.6 minutes):
> > > StreamThread-1.task.1_57, 2020-02-07T13:05:46.550Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:12:07.870Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:10:49.980Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:12:55.909Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:09:02.662Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:13:08.651Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:06:53.946Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:11:58.188Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:59:42.884Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:07:30.412Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:55:53.328Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:44:51.912Z
> > > StreamTh

Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-24 Thread Samek , Jiří
Hi all,

I am writing to describe where I got with the issue.

The next thing I wanted to do was to check topics if they contain records
with mixed timestamps in single partition that could cause the
warning "Skipping record for expired segment" - meaning, timestamp of
incoming record is behind observed stream time and out-of a join window. I
did the check. They do have the mixed timestamps. The input topics of the
streaming app have timestamps in order. But I need to do repartitioning to
introduce key needed for the join. And the repartition topics have mixed
timestamps in single partition.

It doesn't happen during continues run of the streaming application. It
happens when I stop the streaming application for several minutes or
more. The stream app is deployed in 10 instances. My theory is, that stream
tasks doesn't start at the same time or doesn't process records at the same
speed. So it can happen that one task is writing records with different
timestamps than other task is writing to a given partition of the
repartition topic. And so they get mixed. I am not aware of mechanism in
Kafka Streams that could prevent mixing timestamps in repartition topics.
If there is one, or if there is a configuration or something that could
mitigate it, please let me know.

So, in light of it, I think the warning is definitely a good thing. I have
increased join-window duration to handle 2 hours pauses in stream
processing (2 hour out-of-order records) which should do it for most cases.
Increasing window duration has memory and cpu impact, I still wonder if
there is more efficient way how to resolve it.

Best Regards,
Jiri




On Tue, Feb 11, 2020 at 6:45 PM John Roesler  wrote:

> Hi Jiří,
>
> Hmm, that is a mystery indeed. to inspect the log, you could try
> kafka-dump-log (I've never used it).
>
> What I have done before is use kafka-console-consumer, with the
> following option specified:
> --property 
>properties include:
>
> print.timestamp=true|false
>
> Which version of Streams are you running? This is bringing up a
> vague memory of when I refactored the retention time logic a while
> back, and added logic to skip writing changelog records upon restore
> when we detect that they would already be expired according to the
> current stream time. Previously, we would go ahead and write them
> and then have to rotate store segments later on during the restoration
> when we reach the current stream time. This is a pretty heavy and
> completely avoidable I/O operation. If this is what's happening, then
> it's just an unforseen consequence of the new log level. We might
> need to follow up with a change to suppress the warnings specifically
> in this circumstance.
>
> Feel free to open a bug ticket with all the relevant version info, repro,
> logs etc., you've collected if you feel like the above might be what's
> happening.
>
> For clarity, this wouldn't be a correctness problem at all, just a
> misleading
> and troubling log message we shouldn't be producing.
>
> Thanks,
> -John
>
> On Tue, Feb 11, 2020, at 11:21, Samek, Jiří wrote:
> > Hi Bruno, John and Sophie,
> >
> > thank you very much for quick responses, you are the best. After thinking
> > about it a little bit more, it seems fishy.
> >
> > From logs, I see that it is not happening when application is running
> > normally.
> >
> > I have checked timestamps (windowStartTimestamp) - connecting local
> > instance in debug mode to Kafka cluster. And they are mixed up. Not
> always,
> > there can be a day with good sequence and then a time interval with mixed
> > up timestamps, like these (store retention is 20.6 minutes):
> > StreamThread-1.task.1_57, 2020-02-07T13:05:46.550Z
> > StreamThread-1.task.1_57, 2020-02-07T13:12:07.870Z
> > StreamThread-1.task.1_57, 2020-02-07T13:10:49.980Z
> > StreamThread-1.task.1_57, 2020-02-07T13:12:55.909Z
> > StreamThread-1.task.1_57, 2020-02-07T13:09:02.662Z
> > StreamThread-1.task.1_57, 2020-02-07T13:13:08.651Z
> > StreamThread-1.task.1_57, 2020-02-07T13:06:53.946Z
> > StreamThread-1.task.1_57, 2020-02-07T13:11:58.188Z
> > StreamThread-1.task.1_57, 2020-02-07T12:59:42.884Z
> > StreamThread-1.task.1_57, 2020-02-07T13:07:30.412Z
> > StreamThread-1.task.1_57, 2020-02-07T12:55:53.328Z
> > StreamThread-1.task.1_57, 2020-02-07T12:44:51.912Z
> > StreamThread-1.task.1_57, 2020-02-07T12:59:27.364Z
> > StreamThread-1.task.1_57, 2020-02-07T13:01:34.313Z
> > StreamThread-1.task.1_57, 2020-02-07T13:07:56.379Z
> > StreamThread-1.task.1_57, 2020-02-07T12:45:32.984Z
> > StreamThread-1.task.1_57, 2020-02-07T12:45:44.232Z
> > StreamThread-1.task.1_57, 2020-02-07T12:45:59.594Z
> > StreamThread-1.task.1_57, 2020-02-07T12:46:02.860Z
> > StreamThread-1.task.1_57, 2020-02-07T13:02:17.658Z
> > StreamThread-1.task.1_57, 2020-02-07T12:46:25.125Z
> > StreamThread-1.task.1_57, 2020-02-07T12:46:44.864Z
> > StreamThread-1.task.1_57, 2020-02-07T12:44:44.074Z
> > Str

Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-11 Thread John Roesler
Hi Jiří,

Hmm, that is a mystery indeed. to inspect the log, you could try 
kafka-dump-log (I've never used it).

What I have done before is use kafka-console-consumer, with the
following option specified:
--property 
   properties include:  
print.timestamp=true|false 

Which version of Streams are you running? This is bringing up a
vague memory of when I refactored the retention time logic a while
back, and added logic to skip writing changelog records upon restore
when we detect that they would already be expired according to the
current stream time. Previously, we would go ahead and write them
and then have to rotate store segments later on during the restoration
when we reach the current stream time. This is a pretty heavy and
completely avoidable I/O operation. If this is what's happening, then
it's just an unforseen consequence of the new log level. We might
need to follow up with a change to suppress the warnings specifically
in this circumstance.

Feel free to open a bug ticket with all the relevant version info, repro,
logs etc., you've collected if you feel like the above might be what's
happening.

For clarity, this wouldn't be a correctness problem at all, just a misleading
and troubling log message we shouldn't be producing.

Thanks,
-John

On Tue, Feb 11, 2020, at 11:21, Samek, Jiří wrote:
> Hi Bruno, John and Sophie,
> 
> thank you very much for quick responses, you are the best. After thinking
> about it a little bit more, it seems fishy.
> 
> From logs, I see that it is not happening when application is running
> normally.
> 
> I have checked timestamps (windowStartTimestamp) - connecting local
> instance in debug mode to Kafka cluster. And they are mixed up. Not always,
> there can be a day with good sequence and then a time interval with mixed
> up timestamps, like these (store retention is 20.6 minutes):
> StreamThread-1.task.1_57, 2020-02-07T13:05:46.550Z
> StreamThread-1.task.1_57, 2020-02-07T13:12:07.870Z
> StreamThread-1.task.1_57, 2020-02-07T13:10:49.980Z
> StreamThread-1.task.1_57, 2020-02-07T13:12:55.909Z
> StreamThread-1.task.1_57, 2020-02-07T13:09:02.662Z
> StreamThread-1.task.1_57, 2020-02-07T13:13:08.651Z
> StreamThread-1.task.1_57, 2020-02-07T13:06:53.946Z
> StreamThread-1.task.1_57, 2020-02-07T13:11:58.188Z
> StreamThread-1.task.1_57, 2020-02-07T12:59:42.884Z
> StreamThread-1.task.1_57, 2020-02-07T13:07:30.412Z
> StreamThread-1.task.1_57, 2020-02-07T12:55:53.328Z
> StreamThread-1.task.1_57, 2020-02-07T12:44:51.912Z
> StreamThread-1.task.1_57, 2020-02-07T12:59:27.364Z
> StreamThread-1.task.1_57, 2020-02-07T13:01:34.313Z
> StreamThread-1.task.1_57, 2020-02-07T13:07:56.379Z
> StreamThread-1.task.1_57, 2020-02-07T12:45:32.984Z
> StreamThread-1.task.1_57, 2020-02-07T12:45:44.232Z
> StreamThread-1.task.1_57, 2020-02-07T12:45:59.594Z
> StreamThread-1.task.1_57, 2020-02-07T12:46:02.860Z
> StreamThread-1.task.1_57, 2020-02-07T13:02:17.658Z
> StreamThread-1.task.1_57, 2020-02-07T12:46:25.125Z
> StreamThread-1.task.1_57, 2020-02-07T12:46:44.864Z
> StreamThread-1.task.1_57, 2020-02-07T12:44:44.074Z
> StreamThread-1.task.1_57, 2020-02-07T13:03:36.221Z
> StreamThread-1.task.1_57, 2020-02-07T13:12:16.691Z
> StreamThread-1.task.1_57, 2020-02-07T12:56:55.214Z
> 
> Picking a few of these, the stack trace was like:
> put:134, InMemoryWindowStore (org.apache.kafka.streams.state.internals)
> lambda$init$0:112, InMemoryWindowStore
> (org.apache.kafka.streams.state.internals)
> restore:-1, 69348804
> (org.apache.kafka.streams.state.internals.InMemoryWindowStore$$Lambda$270)
> lambda$adapt$1:47, StateRestoreCallbackAdapter
> (org.apache.kafka.streams.processor.internals)
> restoreBatch:-1, 791473363
> (org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter$$Lambda$269)
> restoreBatch:89, CompositeRestoreListener
> (org.apache.kafka.streams.processor.internals)
> restore:92, StateRestorer (org.apache.kafka.streams.processor.internals)
> processNext:349, StoreChangelogReader
> (org.apache.kafka.streams.processor.internals)
> restore:93, StoreChangelogReader
> (org.apache.kafka.streams.processor.internals)
> updateNewAndRestoringTasks:389, TaskManager
> (org.apache.kafka.streams.processor.internals)
> runOnce:769, StreamThread (org.apache.kafka.streams.processor.internals)
> runLoop:698, StreamThread (org.apache.kafka.streams.processor.internals)
> run:671, StreamThread (org.apache.kafka.streams.processor.internals)
> 
> So I believe it happens on stream restoration phase. And it's restoring
> state from internal changelog topic. It's all task.1_57 so I expect that it
> is a single partition.
> 
> Thinking about it, I don't understand how such a case can even
> theoretically happen. I expect that a window, in order to be written to the
> changelog topic, first needs to go through "put"; so even if it's mixed on
> the input side, it should be skipped if expired at the moment of "p

Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-11 Thread Sophie Blee-Goldman
Is it possible that during normal processing these records were actually
dropped (eg due to a deserialization exception)? During restoration we
actually just copy plain bytes from the changelog directly, so if this was
an
optimized source table it's possible that records which were supposed to
be dropped ended up getting copied into the store during the restoration
phase.

It's a known issue, but tricky to solve due to the performance implications
of deserializing during restoration. Could this explain what you're seeing?

On Tue, Feb 11, 2020 at 9:21 AM Samek, Jiří  wrote:

> Hi Bruno, John and Sophie,
>
> thank you very much for quick responses, you are the best. After thinking
> about it a little bit more, it seems fishy.
>
> From logs, I see that it is not happening when application is running
> normally.
>
> I have checked timestamps (windowStartTimestamp) - connecting local
> instance in debug mode to Kafka cluster. And they are mixed up. Not always,
> there can be a day with good sequence and then a time interval with mixed
> up timestamps, like these (store retention is 20.6 minutes):
> StreamThread-1.task.1_57, 2020-02-07T13:05:46.550Z
> StreamThread-1.task.1_57, 2020-02-07T13:12:07.870Z
> StreamThread-1.task.1_57, 2020-02-07T13:10:49.980Z
> StreamThread-1.task.1_57, 2020-02-07T13:12:55.909Z
> StreamThread-1.task.1_57, 2020-02-07T13:09:02.662Z
> StreamThread-1.task.1_57, 2020-02-07T13:13:08.651Z
> StreamThread-1.task.1_57, 2020-02-07T13:06:53.946Z
> StreamThread-1.task.1_57, 2020-02-07T13:11:58.188Z
> StreamThread-1.task.1_57, 2020-02-07T12:59:42.884Z
> StreamThread-1.task.1_57, 2020-02-07T13:07:30.412Z
> StreamThread-1.task.1_57, 2020-02-07T12:55:53.328Z
> StreamThread-1.task.1_57, 2020-02-07T12:44:51.912Z
> StreamThread-1.task.1_57, 2020-02-07T12:59:27.364Z
> StreamThread-1.task.1_57, 2020-02-07T13:01:34.313Z
> StreamThread-1.task.1_57, 2020-02-07T13:07:56.379Z
> StreamThread-1.task.1_57, 2020-02-07T12:45:32.984Z
> StreamThread-1.task.1_57, 2020-02-07T12:45:44.232Z
> StreamThread-1.task.1_57, 2020-02-07T12:45:59.594Z
> StreamThread-1.task.1_57, 2020-02-07T12:46:02.860Z
> StreamThread-1.task.1_57, 2020-02-07T13:02:17.658Z
> StreamThread-1.task.1_57, 2020-02-07T12:46:25.125Z
> StreamThread-1.task.1_57, 2020-02-07T12:46:44.864Z
> StreamThread-1.task.1_57, 2020-02-07T12:44:44.074Z
> StreamThread-1.task.1_57, 2020-02-07T13:03:36.221Z
> StreamThread-1.task.1_57, 2020-02-07T13:12:16.691Z
> StreamThread-1.task.1_57, 2020-02-07T12:56:55.214Z
>
> Picking a few of these, the stack trace was like:
> put:134, InMemoryWindowStore (org.apache.kafka.streams.state.internals)
> lambda$init$0:112, InMemoryWindowStore
> (org.apache.kafka.streams.state.internals)
> restore:-1, 69348804
> (org.apache.kafka.streams.state.internals.InMemoryWindowStore$$Lambda$270)
> lambda$adapt$1:47, StateRestoreCallbackAdapter
> (org.apache.kafka.streams.processor.internals)
> restoreBatch:-1, 791473363
>
> (org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter$$Lambda$269)
> restoreBatch:89, CompositeRestoreListener
> (org.apache.kafka.streams.processor.internals)
> restore:92, StateRestorer (org.apache.kafka.streams.processor.internals)
> processNext:349, StoreChangelogReader
> (org.apache.kafka.streams.processor.internals)
> restore:93, StoreChangelogReader
> (org.apache.kafka.streams.processor.internals)
> updateNewAndRestoringTasks:389, TaskManager
> (org.apache.kafka.streams.processor.internals)
> runOnce:769, StreamThread (org.apache.kafka.streams.processor.internals)
> runLoop:698, StreamThread (org.apache.kafka.streams.processor.internals)
> run:671, StreamThread (org.apache.kafka.streams.processor.internals)
>
> So I believe it happens on stream restoration phase. And it's restoring
> state from internal changelog topic. It's all task.1_57 so I expect that it
> is a single partition.
>
> Thinking about it, I don't understand how such a case can even
> theoretically happen. I expect that a window, in order to be written to the
> changelog topic, first needs to go through "put"; so even if it's mixed on
> the input side, it should be skipped if expired at the moment of "put"
> (relatively to observedStreamTime) and on restoration everything should be
> fine.
>
> As the next step, I would like to list/inspect records and their timestamps
> from given partition of the changelog topic via a command line tool (or in
> some other way) - to confirm if they are really stored this way. If you
> have a tip on how to do it, please let me know.
>
> That is all I have for now. I would like to resolve it. I will post it here
> if I come up with something new.
>
> Thank you
> Jiri
>
>
>
> On Mon, Feb 10, 2020 at 10:14 PM John Roesler  wrote:
> >
> > Hey all,
> >
> > Sorry for the confusion. Bruno set me straight offline.
> >
> > Previously, we had metrics for each reason for skipping records, and the
> > rationale was that you would monitor the metrics and only turn to the
> logs
> > if you needed to *debug* unexpected r

Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-11 Thread Samek , Jiří
Hi Bruno, John and Sophie,

thank you very much for quick responses, you are the best. After thinking
about it a little bit more, it seems fishy.

>From logs, I see that it is not happening when application is running
normally.

I have checked timestamps (windowStartTimestamp) - connecting local
instance in debug mode to Kafka cluster. And they are mixed up. Not always,
there can be a day with good sequence and then a time interval with mixed
up timestamps, like these (store retention is 20.6 minutes):
StreamThread-1.task.1_57, 2020-02-07T13:05:46.550Z
StreamThread-1.task.1_57, 2020-02-07T13:12:07.870Z
StreamThread-1.task.1_57, 2020-02-07T13:10:49.980Z
StreamThread-1.task.1_57, 2020-02-07T13:12:55.909Z
StreamThread-1.task.1_57, 2020-02-07T13:09:02.662Z
StreamThread-1.task.1_57, 2020-02-07T13:13:08.651Z
StreamThread-1.task.1_57, 2020-02-07T13:06:53.946Z
StreamThread-1.task.1_57, 2020-02-07T13:11:58.188Z
StreamThread-1.task.1_57, 2020-02-07T12:59:42.884Z
StreamThread-1.task.1_57, 2020-02-07T13:07:30.412Z
StreamThread-1.task.1_57, 2020-02-07T12:55:53.328Z
StreamThread-1.task.1_57, 2020-02-07T12:44:51.912Z
StreamThread-1.task.1_57, 2020-02-07T12:59:27.364Z
StreamThread-1.task.1_57, 2020-02-07T13:01:34.313Z
StreamThread-1.task.1_57, 2020-02-07T13:07:56.379Z
StreamThread-1.task.1_57, 2020-02-07T12:45:32.984Z
StreamThread-1.task.1_57, 2020-02-07T12:45:44.232Z
StreamThread-1.task.1_57, 2020-02-07T12:45:59.594Z
StreamThread-1.task.1_57, 2020-02-07T12:46:02.860Z
StreamThread-1.task.1_57, 2020-02-07T13:02:17.658Z
StreamThread-1.task.1_57, 2020-02-07T12:46:25.125Z
StreamThread-1.task.1_57, 2020-02-07T12:46:44.864Z
StreamThread-1.task.1_57, 2020-02-07T12:44:44.074Z
StreamThread-1.task.1_57, 2020-02-07T13:03:36.221Z
StreamThread-1.task.1_57, 2020-02-07T13:12:16.691Z
StreamThread-1.task.1_57, 2020-02-07T12:56:55.214Z

Picking a few of these, the stack trace was like:
put:134, InMemoryWindowStore (org.apache.kafka.streams.state.internals)
lambda$init$0:112, InMemoryWindowStore
(org.apache.kafka.streams.state.internals)
restore:-1, 69348804
(org.apache.kafka.streams.state.internals.InMemoryWindowStore$$Lambda$270)
lambda$adapt$1:47, StateRestoreCallbackAdapter
(org.apache.kafka.streams.processor.internals)
restoreBatch:-1, 791473363
(org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter$$Lambda$269)
restoreBatch:89, CompositeRestoreListener
(org.apache.kafka.streams.processor.internals)
restore:92, StateRestorer (org.apache.kafka.streams.processor.internals)
processNext:349, StoreChangelogReader
(org.apache.kafka.streams.processor.internals)
restore:93, StoreChangelogReader
(org.apache.kafka.streams.processor.internals)
updateNewAndRestoringTasks:389, TaskManager
(org.apache.kafka.streams.processor.internals)
runOnce:769, StreamThread (org.apache.kafka.streams.processor.internals)
runLoop:698, StreamThread (org.apache.kafka.streams.processor.internals)
run:671, StreamThread (org.apache.kafka.streams.processor.internals)

So I believe it happens on stream restoration phase. And it's restoring
state from internal changelog topic. It's all task.1_57 so I expect that it
is a single partition.

Thinking about it, I don't understand how such a case can even
theoretically happen. I expect that a window, in order to be written to the
changelog topic, first needs to go through "put"; so even if it's mixed on
the input side, it should be skipped if expired at the moment of "put"
(relatively to observedStreamTime) and on restoration everything should be
fine.

As the next step, I would like to list/inspect records and their timestamps
from given partition of the changelog topic via a command line tool (or in
some other way) - to confirm if they are really stored this way. If you
have a tip on how to do it, please let me know.

That is all I have for now. I would like to resolve it. I will post it here
if I come up with something new.

Thank you
Jiri



On Mon, Feb 10, 2020 at 10:14 PM John Roesler  wrote:
>
> Hey all,
>
> Sorry for the confusion. Bruno set me straight offline.
>
> Previously, we had metrics for each reason for skipping records, and the
> rationale was that you would monitor the metrics and only turn to the logs
> if you needed to *debug* unexpected record skipping. Note that skipping
> records by itself isn't a cause for concern, since this is exactly what
Streams
> is designed to do in a number of situations.
>
> However, during the KIP-444 discussion, the decision was reversed, and we
> decided to just log one "roll-up" metric for all skips and increase the
log
> messages to warning level for debuggability. This particularly makes sense
> because you otherwise would have to restart the application to change the
> log level if you needed to figure out why the single skipped-record metric
> is non-zero. And then you may not even observe it again.
>
> I either missed the memo on that discussion, or participated in it and
then
> forgot it even happened. I'm not sure I want to look back at the thread 

Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-10 Thread John Roesler
Hey all,

Sorry for the confusion. Bruno set me straight offline.

Previously, we had metrics for each reason for skipping records, and the
rationale was that you would monitor the metrics and only turn to the logs
if you needed to *debug* unexpected record skipping. Note that skipping
records by itself isn't a cause for concern, since this is exactly what Streams
is designed to do in a number of situations.

However, during the KIP-444 discussion, the decision was reversed, and we
decided to just log one "roll-up" metric for all skips and increase the log
messages to warning level for debuggability. This particularly makes sense
because you otherwise would have to restart the application to change the
log level if you needed to figure out why the single skipped-record metric
is non-zero. And then you may not even observe it again.

I either missed the memo on that discussion, or participated in it and then
forgot it even happened. I'm not sure I want to look back at the thread to
find out.

Anyway, I've closed the PR I opened to move it back to debug. We should
still try to help figure out the root cause of this particular email thread,
though.

Thanks,
-John

On Mon, Feb 10, 2020, at 12:20, Sophie Blee-Goldman wrote:
> While I agree that seems like it was probably a refactoring mistake, I'm
> not
> convinced it isn't the right thing to do. John, can you reiterate the
> argument
> for setting it to debug way back when?
> 
> I would actually present this exact situation as an argument for keeping it
> as
> warn, since something indeed seems fishy here that was only surfaced
> through this warning. That said, maybe the metric is the more appropriate
> way to bring attention to this: not sure if it's info or debug level
> though, or
> how likely it is that anyone really pays attention to it?
> 
> On Mon, Feb 10, 2020 at 9:53 AM John Roesler  wrote:
> 
> > Hi,
> >
> > I’m sorry for the trouble. It looks like it was a mistake during
> >
> > https://github.com/apache/kafka/pull/6521
> >
> > Specifically, while addressing code review comments to change a bunch of
> > other logs from debugs to warnings, that one seems to have been included by
> > accident:
> > https://github.com/apache/kafka/commit/ac27e8578f69d60a56ba28232d7e96c76957f66c
> >
> > I’ll see if I can fix it today.
> >
> > Regarding Bruno's thoughts, there was a pretty old decision to capture the
> > "skipped records" as a metric for visibility and log it at the debug level
> > for debuggability. We decided that "warning" wasn't the right level because
> > Streams is operating completely as specified.
> >
> > However, I do agree that it doesn't seem right to see more skipped records
> > during start-up; I would expect to see exactly the same records skipped
> > during start-up as during regular processing, since the skipping logic is
> > completely deterministic and based on the sequence of timestamps your
> > records have in the topic.  Maybe you just notice it more during startup?
> > I.e., if there are 1000 warning logs spread over a few months, then you
> > don't notice it, but when you see them all together at start-up, it's more
> > concerning?
> >
> > Thanks,
> > -John
> >
> >
> > On Mon, Feb 10, 2020, at 10:15, Bruno Cadonna wrote:
> > > Hi,
> > >
> > > I am pretty sure this was intentional. All skipped records log
> > > messages are on WARN level.
> > >
> > > If a lot of your records are skipped on app restart with this log
> > > message on WARN-level, they were also skipped with the log message on
> > > DEBUG-level. You simply did not know about it before. With an
> > > in-memory window store, this message is logged when a window with a
> > > start time older than the current stream time minus the retention
> > > period is put into the window store, i.e., the window is NOT inserted
> > > into the window stroe. If you get a lot of them on app restart, you
> > > should have a look at the timestamps of your records and the retention
> > > of your window store. If those values do not explain the behavior,
> > > please try to find a minimal example that shows the issue and post it
> > > here on the mailing list.
> > >
> > > On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří 
> > wrote:
> > > >
> > > > Hi,
> > > >
> > > > in
> > > >
> > https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134
> > > >
> > > > log level of "Skipping record for expired segment" was changed from
> > debug
> > > > to warn. Was it intentional change? Should it be somehow handled by
> > user?
> > > > How can user handle it? I am getting a lot of these on app restart.
> > >
> >
>


Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-10 Thread Sophie Blee-Goldman
While I agree that seems like it was probably a refactoring mistake, I'm
not
convinced it isn't the right thing to do. John, can you reiterate the
argument
for setting it to debug way back when?

I would actually present this exact situation as an argument for keeping it
as
warn, since something indeed seems fishy here that was only surfaced
through this warning. That said, maybe the metric is the more appropriate
way to bring attention to this: not sure if it's info or debug level
though, or
how likely it is that anyone really pays attention to it?

On Mon, Feb 10, 2020 at 9:53 AM John Roesler  wrote:

> Hi,
>
> I’m sorry for the trouble. It looks like it was a mistake during
>
> https://github.com/apache/kafka/pull/6521
>
> Specifically, while addressing code review comments to change a bunch of
> other logs from debugs to warnings, that one seems to have been included by
> accident:
> https://github.com/apache/kafka/commit/ac27e8578f69d60a56ba28232d7e96c76957f66c
>
> I’ll see if I can fix it today.
>
> Regarding Bruno's thoughts, there was a pretty old decision to capture the
> "skipped records" as a metric for visibility and log it at the debug level
> for debuggability. We decided that "warning" wasn't the right level because
> Streams is operating completely as specified.
>
> However, I do agree that it doesn't seem right to see more skipped records
> during start-up; I would expect to see exactly the same records skipped
> during start-up as during regular processing, since the skipping logic is
> completely deterministic and based on the sequence of timestamps your
> records have in the topic.  Maybe you just notice it more during startup?
> I.e., if there are 1000 warning logs spread over a few months, then you
> don't notice it, but when you see them all together at start-up, it's more
> concerning?
>
> Thanks,
> -John
>
>
> On Mon, Feb 10, 2020, at 10:15, Bruno Cadonna wrote:
> > Hi,
> >
> > I am pretty sure this was intentional. All skipped records log
> > messages are on WARN level.
> >
> > If a lot of your records are skipped on app restart with this log
> > message on WARN-level, they were also skipped with the log message on
> > DEBUG-level. You simply did not know about it before. With an
> > in-memory window store, this message is logged when a window with a
> > start time older than the current stream time minus the retention
> > period is put into the window store, i.e., the window is NOT inserted
> > into the window stroe. If you get a lot of them on app restart, you
> > should have a look at the timestamps of your records and the retention
> > of your window store. If those values do not explain the behavior,
> > please try to find a minimal example that shows the issue and post it
> > here on the mailing list.
> >
> > On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří 
> wrote:
> > >
> > > Hi,
> > >
> > > in
> > >
> https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134
> > >
> > > log level of "Skipping record for expired segment" was changed from
> debug
> > > to warn. Was it intentional change? Should it be somehow handled by
> user?
> > > How can user handle it? I am getting a lot of these on app restart.
> >
>


Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-10 Thread John Roesler
Hi,

I’m sorry for the trouble. It looks like it was a mistake during

https://github.com/apache/kafka/pull/6521

Specifically, while addressing code review comments to change a bunch of other 
logs from debugs to warnings, that one seems to have been included by accident: 
https://github.com/apache/kafka/commit/ac27e8578f69d60a56ba28232d7e96c76957f66c

I’ll see if I can fix it today.

Regarding Bruno's thoughts, there was a pretty old decision to capture the 
"skipped records" as a metric for visibility and log it at the debug level for 
debuggability. We decided that "warning" wasn't the right level because Streams 
is operating completely as specified.

However, I do agree that it doesn't seem right to see more skipped records 
during start-up; I would expect to see exactly the same records skipped during 
start-up as during regular processing, since the skipping logic is completely 
deterministic and based on the sequence of timestamps your records have in the 
topic.  Maybe you just notice it more during startup? I.e., if there are 1000 
warning logs spread over a few months, then you don't notice it, but when you 
see them all together at start-up, it's more concerning?

Thanks,
-John


On Mon, Feb 10, 2020, at 10:15, Bruno Cadonna wrote:
> Hi,
> 
> I am pretty sure this was intentional. All skipped records log
> messages are on WARN level.
> 
> If a lot of your records are skipped on app restart with this log
> message on WARN-level, they were also skipped with the log message on
> DEBUG-level. You simply did not know about it before. With an
> in-memory window store, this message is logged when a window with a
> start time older than the current stream time minus the retention
> period is put into the window store, i.e., the window is NOT inserted
> into the window stroe. If you get a lot of them on app restart, you
> should have a look at the timestamps of your records and the retention
> of your window store. If those values do not explain the behavior,
> please try to find a minimal example that shows the issue and post it
> here on the mailing list.
> 
> On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří  wrote:
> >
> > Hi,
> >
> > in
> > https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134
> >
> > log level of "Skipping record for expired segment" was changed from debug
> > to warn. Was it intentional change? Should it be somehow handled by user?
> > How can user handle it? I am getting a lot of these on app restart.
>


Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-10 Thread Bruno Cadonna
Hi,

I am pretty sure this was intentional. All skipped records log
messages are on WARN level.

If a lot of your records are skipped on app restart with this log
message on WARN-level, they were also skipped with the log message on
DEBUG-level. You simply did not know about it before. With an
in-memory window store, this message is logged when a window with a
start time older than the current stream time minus the retention
period is put into the window store, i.e., the window is NOT inserted
into the window stroe. If you get a lot of them on app restart, you
should have a look at the timestamps of your records and the retention
of your window store. If those values do not explain the behavior,
please try to find a minimal example that shows the issue and post it
here on the mailing list.

On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří  wrote:
>
> Hi,
>
> in
> https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134
>
> log level of "Skipping record for expired segment" was changed from debug
> to warn. Was it intentional change? Should it be somehow handled by user?
> How can user handle it? I am getting a lot of these on app restart.


"Skipping record for expired segment" in InMemoryWindowStore

2020-02-10 Thread Samek , Jiří
Hi,

in
https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134

log level of "Skipping record for expired segment" was changed from debug
to warn. Was it intentional change? Should it be somehow handled by user?
How can user handle it? I am getting a lot of these on app restart.