Thanks, Guozhang and Bruno!

2)
I had a similar though to both of you about the metrics, but I ultimately
came out with a conclusion like Bruno's. These aren't dropped invalid
records, they're intentionally dropped, valid, but unnecessary, updates.
A "warning" for this case definitely seems wrong, and I'd also not recommend
counting these events along with "dropped-records", because those are
all dropped invalid records, e.g., late or null-keyed or couldn't be 
deserialized.

Like Bruno pointed out, an operator should be concerned to see
non-zero "dropped-records", and would then consult the logs for warnings.
But that same person should be happy to see "dropped-idempotent-updates"
increasing, since it means they're saving time and money. Maybe the name
of the metric could be different, but I couldn't think of a better one. OTOH,
maybe it just stands out to us because we recently discussed those other
metrics in KIP-444?

1)
Maybe we should discuss this point more. It seems like we should maintain
an invariant that the following three objects always have exactly the same
state (modulo flush boundaries):
1. The internal state store
2. The changelog
3. The operation's result view

That is, if I have a materialized Filter, then it seems like I _must_ store
exactly the same record in the store and the changelog, and also forward
the exact same record, including the timestamp, to the downstream
operations. 

If we store something different in the internal state store than the 
changelog, we can get a situation where the state is actually different after
restoration than it is during processing, and queries against standbys would
return different results than queries against the active tasks.

Regarding storing something different in the store+changelog than we
forward downstream, consider the following topology:
sourceTable
  .filter(someFilter, Materialized.as("f1"))
  .filter(_ -> true, Materialized.as("f2"))

If we didn't forward exactly the same data we store, then querying f2
would return different results than querying f1, which is clearly not
correct, given the topology.

It seems like maybe what you have in mind is the preservation of stream
time across restart/rebalance? This bug is still open, actually:
https://issues.apache.org/jira/browse/KAFKA-9368
It seems like solving that bug would be independent of KIP-557. I.e.,
KIP-557 neither makes that bug worse or better.

One other thought I had is maybe you were thinking that operators
would update their internally tracked stream time, but still discard
records? I think that _would_ be a bug. That is, if a record gets discarded
as idempotent, it should have no effect at all on the state of the application.
Reflecting on my prior analysis of stream time, most of the cases where we
track stream time is in Stream aggregations, and in those cases, if an
incoming record's timestamp is higher than the previous stream time, it
would already not be considered idempotent. So we would store, log, and
forward the result with the new timestamp.
The only other case is Suppress. With respect to idempotence, Suppress is
equivalent to a stateless no-op transformation. All it does is collect and delay
updates. It has no memory of what it previously emitted, so it wouldn't 
be possible for it to check for idempotence anyway.

Was that what you were thinking?
Thanks,
-John


On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> Hi Guozhang,
> 
> I also had the same thought about using the existing "dropped-records"
> metrics. However, I think in this case it would be better to use a new
> metric because dropped idempotent updates is an optimization, they do
> not represent missed records. The dropped idempotent updates in
> general do not change the result and so do not need a warn log
> message. Whereas dropped records due to expired windows, serialization
> errors, or lateness might be something concerning that need a warn log
> message.
> 
> Looking at the metrics, you would be happy to see
> "dropped-idempotent-updates" increase, because that means Streams gets
> rid of no-ops downstream, but you would be concerned if
> "dropped-records" would increase, because that means your records or
> the configuration of your app has issues. The
> "dropped-idempotent-updates" metric could also be an indication that
> you could further optimize your setup, by getting rid of idempotent
> updates further upstream.
> 
> Best,
> Bruno
> 
> On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > Hello Richard,
> >
> > Thanks for the KIP. I once reviewed it and was concerned about its effects
> > on stream time advancing. After reading the updated KIP I think it has
> > answered a lot of them already.
> >
> > I have a couple minor comments still, otherwise I'm +1:
> >
> > 1) I want to clarify that for operations resulted in KTables (not only
> > aggregations, but consider KTable#filter that may also result in a new
> > KTable), even if we drop emissions to the downstream topics we would still
> > append to the corresponding changelog if timestamp has changed. This is
> > because the timestamps on the changelog is read by the standby tasks which
> > relies on them to infer its own stream time advancing.
> >
> > 2) About the metrics, in KIP-444 we are consolidating all types of
> > scenarios that can cause dropped records to the same metrics:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> >
> > late-records-drop: INFO at processor node level, replaced by INFO
> > task-level "dropped-records".
> >
> > skipped-records: INFO at thread and processor node level, replaced by INFO
> > task-level "dropped-records".
> >
> > expired-window-record-drop: DEBUG at state store level, replaced by INFO
> > task-level "dropped-records".
> >
> > The main idea is that instead of using different metrics to indicate
> > different types of scenarios, and users just alert on that single metrics.
> > When alert triggers, they can look into the log4j for its causes (we made
> > sure that all sensor recordings of this metric would be associated with a
> > warning log4j).
> >
> > So I'd suggest that instead of introducing a new per-node
> > "dropped-idempotent-updates", we just piggy-back on the existing task-level
> > metric; unless we think that idempotent drops are more frequent than others
> > and also they do not worth a warning log, in that case we can consider
> > break this metric down with different tags for example.
> >
> > Guozhang
> >
> > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <yohan.richard...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > Thanks for the votes so far!
> > > @Matthias or @Guozhang Wang <guozh...@confluent.io> it would be great to
> > > also get your input on this KIP.
> > >
> > > It looks to be pretty close to completion, so the finishing touches are 
> > > all
> > > we need. :)
> > >
> > > Best,
> > > Richard
> > >
> > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> > > ghassan.yamm...@bazaarvoice.com> wrote:
> > >
> > > > Hello all,
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Thanks,
> > > >
> > > > Ghassan
> > > >
> > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io> wrote:
> > > >
> > > >     EXTERNAL: This email originated from outside of Bazaarvoice. Do not
> > > > click any links or open any attachments unless you trust the sender and
> > > > know the content is safe.
> > > >
> > > >
> > > >     Hi Richard,
> > > >
> > > >     +1 (non-binding)
> > > >
> > > >     Best,
> > > >     Bruno
> > > >
> > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <vvcep...@apache.org>
> > > > wrote:
> > > >     >
> > > >     > Hi Richard,
> > > >     >
> > > >     > Thanks for the KIP!
> > > >     >
> > > >     > I'm +1 (binding)
> > > >     >
> > > >     > -john
> > > >     >
> > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > > >     > > Hi all,
> > > >     > >
> > > >     > > I am proposing a new optimization to Kafka Streams which would
> > > > greatly
> > > >     > > reduce the number of idempotent updates (or no-ops) in the Kafka
> > > > Streams
> > > >     > > DAG.
> > > >     > > A number of users have been interested in this feature, so it
> > > > would be nice
> > > >     > > to pass this one in.
> > > >     > >
> > > >     > > For information, the KIP is described below:
> > > >     > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > >     > >
> > > >     > > We aim to make Kafka Streams more efficient by adopting the 
> > > > "emit
> > > > on
> > > >     > > change" reporting strategy.
> > > >     > >
> > > >     > > Please cast your vote!
> > > >     > >
> > > >     > > Best,
> > > >     > > Richard
> > > >     > >
> > > >
> > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
>

Reply via email to