Hi Bruno, John:

1) That makes sense. If we consider them to be node-specific metrics that
only applies to a subset of built-in processor nodes that are irrelevant to
alert-relevant metrics (just like suppression-emit (rate | total)), they'd
better be per-node instead of per-task and we would not associate such
events with warning. With that in mind, I'd suggest we consider renaming
the metric without the `dropped` keyword to distinguish it with the
per-task level sensor. How about "idempotent-update-skip (rate | total)"?

Also a minor suggestion: we should clarify in the KIP / javadocs which
built-in processor nodes would have this metric while others don't.

2) About stream time tracking, there are multiple known issues that we
should close to improve our consistency semantics:

 a. preserve stream time of active tasks across rebalances where they may
be migrated. This is what KAFKA-9368
<https://issues.apache.org/jira/browse/KAFKA-9368> meant for.
 b. preserve stream time of standby tasks to be aligned with the active
tasks, via the changelog topics.

And what I'm more concerning is b) here. For example: let's say we have a
topology of `source -> A -> repartition -> B` where both A and B have
states along with changelogs, and both of them have standbys. If a record
is piped from the source and completed traversed through the topology, we
need to make sure that the stream time inferred across:

* active task A (inferred from the source record),
* active task B (inferred from the derived record from repartition topic),
* standby task A (inferred from the changelog topic of A's store),
* standby task B (inferred from the changelog topic of B's store)

are consistent (note I'm not saying they should be "exactly the same", but
consistent, meaning that they may have different values but as long as that
does not impact the time-based queries, it is fine). The main motivation is
that on IQ, where both active and standby tasks could be accessed, we can
eventually improve our consistency guarantee to have 1) read-your-write, 2)
consistency across stores, etc.

I agree with John's assessment in the previous email, and just to clarify
more concretely what I'm thinking.


Guozhang


On Tue, Mar 3, 2020 at 9:03 AM John Roesler <vvcep...@apache.org> wrote:

> Thanks, Guozhang and Bruno!
>
> 2)
> I had a similar though to both of you about the metrics, but I ultimately
> came out with a conclusion like Bruno's. These aren't dropped invalid
> records, they're intentionally dropped, valid, but unnecessary, updates.
> A "warning" for this case definitely seems wrong, and I'd also not
> recommend
> counting these events along with "dropped-records", because those are
> all dropped invalid records, e.g., late or null-keyed or couldn't be
> deserialized.
>
> Like Bruno pointed out, an operator should be concerned to see
> non-zero "dropped-records", and would then consult the logs for warnings.
> But that same person should be happy to see "dropped-idempotent-updates"
> increasing, since it means they're saving time and money. Maybe the name
> of the metric could be different, but I couldn't think of a better one.
> OTOH,
> maybe it just stands out to us because we recently discussed those other
> metrics in KIP-444?
>
> 1)
> Maybe we should discuss this point more. It seems like we should maintain
> an invariant that the following three objects always have exactly the same
> state (modulo flush boundaries):
> 1. The internal state store
> 2. The changelog
> 3. The operation's result view
>
> That is, if I have a materialized Filter, then it seems like I _must_ store
> exactly the same record in the store and the changelog, and also forward
> the exact same record, including the timestamp, to the downstream
> operations.
>
> If we store something different in the internal state store than the
> changelog, we can get a situation where the state is actually different
> after
> restoration than it is during processing, and queries against standbys
> would
> return different results than queries against the active tasks.
>
> Regarding storing something different in the store+changelog than we
> forward downstream, consider the following topology:
> sourceTable
>   .filter(someFilter, Materialized.as("f1"))
>   .filter(_ -> true, Materialized.as("f2"))
>
> If we didn't forward exactly the same data we store, then querying f2
> would return different results than querying f1, which is clearly not
> correct, given the topology.
>
> It seems like maybe what you have in mind is the preservation of stream
> time across restart/rebalance? This bug is still open, actually:
> https://issues.apache.org/jira/browse/KAFKA-9368
> It seems like solving that bug would be independent of KIP-557. I.e.,
> KIP-557 neither makes that bug worse or better.
>
> One other thought I had is maybe you were thinking that operators
> would update their internally tracked stream time, but still discard
> records? I think that _would_ be a bug. That is, if a record gets discarded
> as idempotent, it should have no effect at all on the state of the
> application.
> Reflecting on my prior analysis of stream time, most of the cases where we
> track stream time is in Stream aggregations, and in those cases, if an
> incoming record's timestamp is higher than the previous stream time, it
> would already not be considered idempotent. So we would store, log, and
> forward the result with the new timestamp.
> The only other case is Suppress. With respect to idempotence, Suppress is
> equivalent to a stateless no-op transformation. All it does is collect and
> delay
> updates. It has no memory of what it previously emitted, so it wouldn't
> be possible for it to check for idempotence anyway.
>
> Was that what you were thinking?
> Thanks,
> -John
>
>
> On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> > Hi Guozhang,
> >
> > I also had the same thought about using the existing "dropped-records"
> > metrics. However, I think in this case it would be better to use a new
> > metric because dropped idempotent updates is an optimization, they do
> > not represent missed records. The dropped idempotent updates in
> > general do not change the result and so do not need a warn log
> > message. Whereas dropped records due to expired windows, serialization
> > errors, or lateness might be something concerning that need a warn log
> > message.
> >
> > Looking at the metrics, you would be happy to see
> > "dropped-idempotent-updates" increase, because that means Streams gets
> > rid of no-ops downstream, but you would be concerned if
> > "dropped-records" would increase, because that means your records or
> > the configuration of your app has issues. The
> > "dropped-idempotent-updates" metric could also be an indication that
> > you could further optimize your setup, by getting rid of idempotent
> > updates further upstream.
> >
> > Best,
> > Bruno
> >
> > On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <wangg...@gmail.com> wrote:
> > >
> > > Hello Richard,
> > >
> > > Thanks for the KIP. I once reviewed it and was concerned about its
> effects
> > > on stream time advancing. After reading the updated KIP I think it has
> > > answered a lot of them already.
> > >
> > > I have a couple minor comments still, otherwise I'm +1:
> > >
> > > 1) I want to clarify that for operations resulted in KTables (not only
> > > aggregations, but consider KTable#filter that may also result in a new
> > > KTable), even if we drop emissions to the downstream topics we would
> still
> > > append to the corresponding changelog if timestamp has changed. This is
> > > because the timestamps on the changelog is read by the standby tasks
> which
> > > relies on them to infer its own stream time advancing.
> > >
> > > 2) About the metrics, in KIP-444 we are consolidating all types of
> > > scenarios that can cause dropped records to the same metrics:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > >
> > > late-records-drop: INFO at processor node level, replaced by INFO
> > > task-level "dropped-records".
> > >
> > > skipped-records: INFO at thread and processor node level, replaced by
> INFO
> > > task-level "dropped-records".
> > >
> > > expired-window-record-drop: DEBUG at state store level, replaced by
> INFO
> > > task-level "dropped-records".
> > >
> > > The main idea is that instead of using different metrics to indicate
> > > different types of scenarios, and users just alert on that single
> metrics.
> > > When alert triggers, they can look into the log4j for its causes (we
> made
> > > sure that all sensor recordings of this metric would be associated
> with a
> > > warning log4j).
> > >
> > > So I'd suggest that instead of introducing a new per-node
> > > "dropped-idempotent-updates", we just piggy-back on the existing
> task-level
> > > metric; unless we think that idempotent drops are more frequent than
> others
> > > and also they do not worth a warning log, in that case we can consider
> > > break this metric down with different tags for example.
> > >
> > > Guozhang
> > >
> > > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <yohan.richard...@gmail.com>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for the votes so far!
> > > > @Matthias or @Guozhang Wang <guozh...@confluent.io> it would be
> great to
> > > > also get your input on this KIP.
> > > >
> > > > It looks to be pretty close to completion, so the finishing touches
> are all
> > > > we need. :)
> > > >
> > > > Best,
> > > > Richard
> > > >
> > > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> > > > ghassan.yamm...@bazaarvoice.com> wrote:
> > > >
> > > > > Hello all,
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Ghassan
> > > > >
> > > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io> wrote:
> > > > >
> > > > >     EXTERNAL: This email originated from outside of Bazaarvoice.
> Do not
> > > > > click any links or open any attachments unless you trust the
> sender and
> > > > > know the content is safe.
> > > > >
> > > > >
> > > > >     Hi Richard,
> > > > >
> > > > >     +1 (non-binding)
> > > > >
> > > > >     Best,
> > > > >     Bruno
> > > > >
> > > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
> vvcep...@apache.org>
> > > > > wrote:
> > > > >     >
> > > > >     > Hi Richard,
> > > > >     >
> > > > >     > Thanks for the KIP!
> > > > >     >
> > > > >     > I'm +1 (binding)
> > > > >     >
> > > > >     > -john
> > > > >     >
> > > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > > > >     > > Hi all,
> > > > >     > >
> > > > >     > > I am proposing a new optimization to Kafka Streams which
> would
> > > > > greatly
> > > > >     > > reduce the number of idempotent updates (or no-ops) in the
> Kafka
> > > > > Streams
> > > > >     > > DAG.
> > > > >     > > A number of users have been interested in this feature, so
> it
> > > > > would be nice
> > > > >     > > to pass this one in.
> > > > >     > >
> > > > >     > > For information, the KIP is described below:
> > > > >     > >
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > >     > >
> > > > >     > > We aim to make Kafka Streams more efficient by adopting
> the "emit
> > > > > on
> > > > >     > > change" reporting strategy.
> > > > >     > >
> > > > >     > > Please cast your vote!
> > > > >     > >
> > > > >     > > Best,
> > > > >     > > Richard
> > > > >     > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to