Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-14 Thread Guozhang Wang
Hello Hao,

Thanks for the proposal, I have some preference among the options here so I
will copy them here:

I'm now thinking if it's better to not add this new config on each of the
Window interfaces, but instead add that at the KGroupedStream#windowedBy
function. Also instead of adding just a boolean flag, maybe we can add a
Configured class like Grouped, Suppressed, etc, e.g. let's call it a
Emitted which for now would just have a single construct as
Emitted.atWindowClose whose semantics is the same as emitFinal == true. I
think the benefits are:

1) you do not need to modify multiple Window classes, but just overload one
windowedBy function with a second param. This is less of a scope for now,
and also more extensible for any future changes.

2) With a config interface, we maintain its extensibility as well as being
able to reuse this Emitted interface for other operators if we wanted to
expand to.



So in general I'm leaning towards option 2). For that, some more detailed
comments:

1) If we want to reuse that config object for other non-window stateful
operations, I think naming it as `EmitConfig` is probably better than
`WindowConfig`.
2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you are
also proposing to add new stores into the public factory Stores, but it's
not included in the KIP. Is that intentional? Personally I think that
although we may eventually want to add a new store type to the public APIs,
for this KIP maybe we do not have to add them but can delay for later after
we've learned the best way to layout. LMK what do you think?



Guozhang



On Fri, Mar 11, 2022 at 2:13 PM Hao Li  wrote:

> Hi Dev team,
>
> I'd like to start a discussion thread on Kafka Streams KIP-825:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
>
> This KIP is aimed to add new APIs to support outputting final aggregated
> results for windowed aggregations. I listed several options there and I'm
> looking forward to your feedback.
>
> Thanks,
> Hao
>


-- 
-- Guozhang


Re: Need permissions to edit the KIP-714

2022-03-14 Thread Guozhang Wang
Hello Sarat,

What's your apache ID? I cannot find it via your full name in apache wiki
space.

On Mon, Mar 14, 2022 at 3:48 PM Sarat Kakarla 
wrote:

> Hi Admin,
>
> Would you please grant me the permissions to edit the following kip:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability
>
> Thanks
> Sarat
>
>

-- 
-- Guozhang


[jira] [Created] (KAFKA-13737) Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection

2022-03-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13737:
-

 Summary: Flaky 
kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection
 Key: KAFKA-13737
 URL: https://issues.apache.org/jira/browse/KAFKA-13737
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


Examples:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests

{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: describeTopics
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
kafka.utils.TestUtils$.$anonfun$waitForLeaderToBecome$1(TestUtils.scala:1812)
at scala.util.Try$.apply(Try.scala:210)
at kafka.utils.TestUtils$.currentLeader$1(TestUtils.scala:1811)
at kafka.utils.TestUtils$.waitForLeaderToBecome(TestUtils.scala:1819)
at kafka.utils.TestUtils$.assertLeader(TestUtils.scala:1789)
at 
kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection(LeaderElectionCommandTest.scala:172)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-13736:
---

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>    Reporter: Guozhang Wang
>Priority: Major
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13736.
---
Resolution: Duplicate

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>    Reporter: Guozhang Wang
>Priority: Major
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13736:
-

 Summary: Flaky 
kafka.network.SocketServerTest.closingChannelWithBufferedReceives
 Key: KAFKA-13736
 URL: https://issues.apache.org/jira/browse/KAFKA-13736
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


Examples:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests

{code}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
at 
kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13735) Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13735:
-

 Summary: Flaky 
kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives
 Key: KAFKA-13735
 URL: https://issues.apache.org/jira/browse/KAFKA-13735
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


Examples:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11705/13/tests

{code}
Stacktrace
java.lang.IllegalStateException: Channel closed too early
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$5(SocketServerTest.scala:1511)
at scala.Option.getOrElse(Option.scala:201)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1511)
at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1482)
at 
kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives(SocketServerTest.scala:1393)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-13421:
---

Re-opening this ticket since the test is still failing.

> Fix 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> -
>
> Key: KAFKA-13421
> URL: https://issues.apache.org/jira/browse/KAFKA-13421
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Major
>
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
>  is failing with this error:
> {code}
> ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup()
>  failed, log available in 
> /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout
>   
>   
> ConsumerBounceTest > 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() 
> FAILED
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode 
> = NodeExists
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126) 
>
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
>  
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
> at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:320)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2
> 12)
> at 
> scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
> at 
> kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203)
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB
> igGroup$1(ConsumerBounceTest.scala:327)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C
> onsumerBounceTest.scala:319) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13722) Update internal interfaces that use ProcessorContext to use StateStoreContext instead

2022-03-09 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13722:
-

 Summary: Update internal interfaces that use ProcessorContext to 
use StateStoreContext instead
 Key: KAFKA-13722
 URL: https://issues.apache.org/jira/browse/KAFKA-13722
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


This is a remainder that when we remove the deprecated public APIs that uses 
the ProcessorContext, like `StateStore.init`, we should also consider updating 
the internal interfaces with the ProcessorContext as well. That includes:

1. Segments and related util classes which use ProcessorContext.
2. For state stores that leverage on ProcessorContext.getXXXTime, their logic 
should be moved out of the state store impl but to the processor node level 
that calls on these state stores.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-03-07 Thread Guozhang Wang
ave a simplier solution, but I'm also pretty sure
> > this complexity is necessary.
> >
> > Taking a step back, I do think this approach results in a
> > better API, even though the change is a little complicated.
> >
> > Thanks,
> > -John
> >
> > On Sun, 2022-03-06 at 10:51 +, Jorge Esteban Quilcate
> > Otoya wrote:
> > > Matthias, thanks for your feedback.
> > >
> > > I can see the following alternatives to deal with `processValues()`:
> > >
> > > 1. Runtime key validation (current proposal)
> > > 2. Using Void type. Guozhang already points out some important
> > > considerations about allocating `Record` twice.
> > > 3. Adding a new ValueRecord, proposed by Matthias. This one would carry
> > > some of the problems of the second alternative as ValueRecord will have
> > to
> > > be created from a Record. Also, either by having a public constructor
> or
> > > creation from a Record, the key _can_ be changed without being captured
> > by
> > > the Topology.
> > > 4. Reducing the KIP scope to `process` only, and removing/postponing
> > > `processValues` for a later DSL redesign.
> > >
> > > A couple of additional comments:
> > >
> > > About the Record API:
> > >
> > > IIUC, the issue with allocating new objects is coming from the current
> > > design of the Record API.
> > > If a user does record.withKey(...).withValue(...) is already leading
> to a
> > > couple of instatiations.
> > > My impression is that if the cost/value of immutability has been
> weighed
> > > already, then maybe the considerations for alternative 2 can be
> > disregarded?
> > > Either way, if the cost of recreation of objects is something we want
> to
> > > minimize, then maybe adding a Builder to the record should help to
> reduce
> > > the allocations.
> > >
> > > About the key validation:
> > >
> > > So far, the only way I can see to _really_ validate a key doesn't
> change
> > at
> > > compile-time is by not exposing it at all — as we are doing it today
> with
> > > Transform.
> > > Otherwise, deal with it at runtime — as we have been dealing with
> > Transform
> > > without the ability to forward.
> > > Processor API already —by definition— means lower-level abstraction,
> > > therefore users should be aware of the potential runtime exceptions if
> > the
> > > key changes.
> > > This is why I'm leaning towards alternative 1.
> > >
> > > Looking forward to your feedback.
> > > As a reminder, the vote thread is still open. Feel free to add your
> vote
> > or
> > > amend if needed.
> > >
> > > Cheers,
> > >
> > >
> > > On Fri, 4 Mar 2022 at 06:51, Matthias J. Sax  wrote:
> > >
> > > > John, thanks for verifying source compatibility. My impression was
> that
> > > > it should be source compatible, I was just not 100% sure.
> > > >
> > > > The question about `processValues()` is really a hard one. Guozhang's
> > > > point is very good one. Maybe we need to be pragmatic and accept the
> > > > runtime check (even if I deeply hate this solution compare to a
> compile
> > > > time check).
> > > >
> > > > Other possibilities to address this issue might just become too ugly?
> > It
> > > > seems it would require to add a new `ValueProcessorContext` that
> offers
> > > > a `#forward(ValueRecord)` method (with `ValueRecord` being a `Record`
> > > > with immutable key? Not sure if we would be willing to go down this
> > > > route? Personally, I would be ok with it, as a strongly prefer
> compile
> > > > time checks and I am happy to extend the API surface area to achieve
> it
> > > > -- however, I won't be surprised if others don't like this idea...
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 2/27/22 6:20 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > > Thanks, Guozhang.
> > > > >
> > > > > > Compared with reference checks and runtime exceptions for those
> who
> > > > > > mistakenly change the key, I think that enforcing everyone to
> > `setValue`
> > > > > > may incur more costs..
> > > > >
> > > > > This is a fair point. I agree that this may incur in more 

Re: [DISCUSS] Should we automatically close stale PRs?

2022-02-26 Thread Guozhang Wang
Hey David,

Just reviving on this thread, do you have some final decision on this now
with all the feedbacks received so far?

On Sun, Feb 13, 2022 at 8:41 PM Ismael Juma  wrote:

> Hi David,
>
> I think it's a good idea to use the bot for auto closing stale PRs. The
> ideal flow would be:
>
> 1. Write a comment and add stale label
> 2. If user responds saying that the PR is still valid, the stale label is
> removed
> 3. Otherwise, the PR is closed
>
> Thanks,
> Ismael
>
> On Sat, Feb 5, 2022, 2:22 AM David Jacot  wrote:
>
> > Hi team,
> >
> > I find our ever growing back of PRs a little frustrating, don't
> > you? I just made a pass over all the list and a huge chunk
> > of the PRs are abandoned, outdated or irrelevant with the
> > current code base. For instance, we still have PRs opened
> > back in 2015.
> >
> > There is not a Github Action [1] for automatically marking
> > PRs as stale and to automatically close them as well. How
> > would the community feel about enabling this? I think that
> > we could mark a PR as stable after one year and close it
> > a month after if there are no new activities. Reopening a
> > closed PR is really easy so there is no real arm is closing
> > it.
> >
> > [1] https://github.com/actions/stale
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-02-26 Thread Guozhang Wang
> > > arguments.
> > >
> > > Because the current methods are void returns, no existing
> > > code could be assigning the result to any variable, so
> > > moving from a void return to a typed return is always
> > > compatible.
> > >
> > > Jorge, do you mind clarifying these points in the
> > > Compatibility section of your KIP?
> > >
> > > Thanks,
> > > -John
> > >
> > >
> > > On Wed, 2022-02-23 at 15:07 -0800, Matthias J. Sax wrote:
> > > > For this KIP, I also see the value. I was just trying to make a step
> > > > back and ask if it's a good short term solution. If we believe it is,
> > I
> > > > am fine with it.
> > > >
> > > > (I am more worried about the header's KIP...)
> > > >
> > > > Btw: I am still wondering if we can change existing `process()` as
> > > > proposed in the KIP? It the propose change source compatible? (It's
> > for
> > > > sure not binary compatible, but this seems fine -- I don't think we
> > > > guarantee binary compatibility).
> > > >
> > > > Btw: would be good to clarify what is changes for process() -- should
> > be
> > > > return type change from `void` to `KStream` as well as
> > > > change of `ProcessorSupplier` generic types (output types change from
> > > > `Void` to `KOut` and `VOut`?
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 2/23/22 11:32 AM, Guozhang Wang wrote:
> > > > > Hi folks,
> > > > >
> > > > > I agree with John that this KIP by itself could be a good
> > improvement, and
> > > > > I feel it aligns well with the eventual DSL 2.0 proposal so we do
> > not need
> > > > > to hold it until later.
> > > > >
> > > > > Regarding the last point (i.e. whether we should do enforcement
> with
> > a new
> > > > > interface), here's my 2c: in the past we introduced public
> > > > > `ValueTransfomer/etc` for two purposes, 1) to enforce the key is
> not
> > > > > modifiable, 2) to indicate inside the library's topology builder
> > itself
> > > > > that since the key is not modified, the direct downstream does not
> > need to
> > > > > inject a repartition stage. I think we are more or less on the same
> > page
> > > > > that for purpose 1), doing runtime check could be sufficient; as
> for
> > the
> > > > > purpose of 2), as for this KIP itself I think it is similar to what
> > we have
> > > > > (i.e. just base on the function name "processValue" itself) and
> > hence are
> > > > > not sacrificed either. I do not know if
> > > > > `KStream#processValue(ProcessorSupplier
> > > > > processorSupplier)` will work, or work better, maybe Jorge could do
> > some
> > > > > digging and get back to us.
> > > > >
> > > > >
> > > > > On Fri, Feb 18, 2022 at 8:24 AM John Roesler 
> > wrote:
> > > > >
> > > > > > Hello all,
> > > > > >
> > > > > > While I sympathize with Matthias’s desire to wipe the slate clean
> > and
> > > > > > redesign the dsl with full knowledge of everything we’ve learned
> > in the
> > > > > > past few years, that would also be a pretty intense project on
> its
> > own. It
> > > > > > seems better to leave that project for someone who is motivated
> to
> > take it
> > > > > > on.
> > > > > >
> > > > > > Reading between the lines, it seems like Jorge’s motivation is
> > more along
> > > > > > the lines of removing a few specific pain points. I appreciate
> > Matthias
> > > > > > extending the offer, but if Jorge doesn’t want to redesign the
> dsl
> > right
> > > > > > now, we’re better off just accepting the work he’s willing to do.
> > > > > >
> > > > > > Specifically, this KIP is quite a nice improvement. Looking at
> the
> > KStream
> > > > > > interface, roughly half of it is devoted to various flavors of
> > “transform”,
> > > > > > which makes it really hard on users to figure out which they are
> > supposed
> > > > > > to use for what purpos

Re: [DISCUSS] KIP-761: Add total blocked time metric to streams

2022-02-25 Thread Guozhang Wang
Thanks Rohan,

I've reviewed the new PR and had a question regarding whether we should
have the new metric in ms or ns, maybe we can first discuss about that
before we finalize the KIP?


Guozhang

On Fri, Feb 25, 2022 at 5:48 AM Bruno Cadonna  wrote:

> Hi Rohan,
>
> Thank you for the heads up!
>
> Yes, please update the KIP and send this message also to the VOTE thread
> of the KIP.
>
> Best,
> Bruno
>
> On 25.02.22 04:01, Rohan Desai wrote:
> > Hello,
> >
> > I discovered a bug in the design of this metric. The bug is documented
> > here: https://github.com/apache/kafka/pull/11805. We need to include
> time
> > the producer spends waiting on topic metadata into the total blocked
> time.
> > But to do this we would need to add a new producer metric that tracks the
> > total time spent blocked on metadata. I've implemented this in a patch
> > here: https://github.com/apache/kafka/pull/11805
> >
> > I'm hoping I can just update this KIP to include the new producer metric.
> >
> > On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai 
> > wrote:
> >
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
> >>
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-02-23 Thread Guozhang Wang
 or somewhere else?
> >>
> >> @Guozhang: I am not worried about the runtime overhead. I am worries
> >> about user experience. It's not clear from the method signature, that
> >> you are not allowed to change the key, what seems to be bad API desig.
> >> Even if I understand the desire to keep the API surface ares small -- I
> >> would rather have a compile time enforcement than a runtime check.
> >>
> >> For example, we have `map()` and `mapValues()` and `mapValues()` returns
> >> a `Value V` (enforces that that key is not changes) instead of a
> >> `KeyValue` and we use a runtime check to check that the key is
> >> not changed.
> >>
> >> Naively, could we enforce something similar by setting the output key
> >> type as `Void`.
> >>
> >>KStream#processValue(ProcessorSupplier
> >> processorSupplier)
> >>
> >> Not sure if this would work or not?
> >>
> >> Or it might be worth to add a new interface, `ValueProcessorSupplier`
> >> that ensures that the key is not modified?
> >>
> >>
> > This is an important discussion, even more so with a DSL v2.0.
> >
> > At the moment, the DSL just flags whether partitioning is required based
> on
> > the DSL operation. As mentioned, `mapValues()` enforces only the value
> has
> > changed through the DSL, though the only _guarantee_ we have is that
> Kafka
> > Streams "owns" the implementation, and we can flag this properly.
> >
> > With a hypothetical v2.0 based on Record API, this will be harder to
> > enforce with the current APIs. e.g. with `mapValues(Record
> record)`,
> > nothing would stop users from using
> `record.withKey("needs_partitioning")`.
> >
> > The approach defined on this KIP is similar to what we have at the moment
> > on `ValueTransformer*` where it validates at runtime that the users are
> not
> > calling `forward` with `ForwardingDisabledProcessorContext`.
> > `ValueProcessorSupplier` is not meant to be a public API. Only to be used
> > internally on `processValues` implementation.
> >
> > At first, `KStream#processValue(ProcessorSupplier
> > processorSupplier)` won't work as it will require the `Processor`
> > implementation to actually change the key. Will take a deeper look to
> > validate if this could solve this issue.
> >
> >
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 2/17/22 10:56 AM, Guozhang Wang wrote:
> >> > Regarding the last question Matthias had, I wonder if it's similar to
> my
> >> > first email's point 2) above? I think the rationale is that, since
> >> > reference checks are relatively very cheap, it is worthwhile to pay
> this
> >> > extra runtime checks and in return to have a single consolidated
> >> > ProcessorSupplier programming interface (i.e. we would eventually
> >> > deprecate ValueTransformerWithKeySupplier).
> >> >
> >> > On Wed, Feb 16, 2022 at 10:57 AM Jorge Esteban Quilcate Otoya <
> >> > quilcate.jo...@gmail.com> wrote:
> >> >
> >> >> Thank you Matthias, this is great feedback.
> >> >>
> >> >> Adding my comments below.
> >> >>
> >> >> On Wed, 16 Feb 2022 at 00:42, Matthias J. Sax 
> wrote:
> >> >>
> >> >>> Thanks for the KIP.
> >> >>>
> >> >>> In alignment to my reply to KIP-634, I am wondering if we are
> heading
> >> >>> into the right direction, or if we should consider to re-design the
> DSL
> >> >>> from scratch?
> >> >>>
> >> >>>
> >> >> I'm very excited about the idea of a DLS v2.0. It probably deserves
> its
> >> own
> >> >> thread to start discussing ideas.
> >> >>
> >> >> For this KIP specifically, I think about it as a continuation from
> >> KIP-478.
> >> >> Therefore, it could make sense to have it as part of the current
> >> version of
> >> >> the DSL.
> >> >>
> >> >>
> >> >>>
> >> >>> Even if we don't do a DSL 2.0 right now, I have some concerns about
> >> this
> >> >>> KIP:
> >> >>>
> >> >>> (1) I am not sure if the propose changed is backward compatible? We
> >> >>> currently have:
> >> >>>
> >> >>> void KStream#pr

Re: [VOTE] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-17 Thread Guozhang Wang
Thanks Sueng-chan,

I'm +1 on the proposal.


Guozhang

On Tue, Feb 15, 2022 at 7:06 AM Seung-chan Ahn 
wrote:

> Hi team,
>
> I feel like we have a finalized proposal for this improvement
> .
> I want to know what you think.
>
> Thanks in advance for your help,
>
> Seung-chan
>


-- 
-- Guozhang


Re: [VOTE] KIP-820: Extend KStream process with new Processor API

2022-02-17 Thread Guozhang Wang
Thanks Jorge, overall looks good to me.

Maybe we can clarify a bit in the wiki that the reason we have to not
include the additional `final String... stateStoreNames` params in the new
`process` API is that we need to have overloaded functions which takes
`ProcessorSupplier<...> ` where the output types are not `Void`, but due to
type eraser we cannot distinguish the new overloaded function signatures
with the old ones if they also include `final String... stateStoreNames`.
And in javadocs explains that if users want to connect state stores to this
processor, they could use the `connectState` API instead.

Otherwise, I'm +1.

Guozhang

On Tue, Feb 15, 2022 at 11:54 AM John Roesler  wrote:

> Thanks, Jorge!
>
> I'm +1 (binding)
>
> -John
>
> On Tue, 2022-02-15 at 19:16 +, Jorge Esteban Quilcate
> Otoya wrote:
> > Hi all,
> >
> > I'd like to start a vote on KIP-820 which proposes extending KStream to
> use
> > the new Processor API
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API
> >
> > Thanks,
> > Jorge
>
>

-- 
-- Guozhang


Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-02-17 Thread Guozhang Wang
eaders is "not great but ok", then maybe
> > >> we leave it alone.
> > >>
> > >> Thanks,
> > >> -John
> > >>
> > >> On Mon, 2022-02-14 at 13:58 -0600, Paul Whalen wrote:
> > >>> No specific comments, but I just wanted to mention I like the
> > direction of
> > >>> the KIP.  My team is a big user of "transform" methods because of the
> > >>> ability to chain them, and I have always found the terminology
> > challenging
> > >>> to explain alongside "process".  It felt like one concept with two
> > names.
> > >>> So moving towards a single API that is powerful enough to handle both
> > use
> > >>> cases seems absolutely correct to me.
> > >>>
> > >>> Paul
> > >>>
> > >>> On Mon, Feb 14, 2022 at 1:12 PM Jorge Esteban Quilcate Otoya <
> > >>> quilcate.jo...@gmail.com> wrote:
> > >>>
> > >>>> Got it. Thanks John, this make sense.
> > >>>>
> > >>>> I've updated the KIP to include the deprecation of:
> > >>>>
> > >>>> - KStream#transform
> > >>>> - KStream#transformValues
> > >>>> - KStream#flatTransform
> > >>>> - KStream#flatTransformValues
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Fri, 11 Feb 2022 at 15:16, John Roesler 
> > wrote:
> > >>>>
> > >>>>> Thanks, Jorge!
> > >>>>>
> > >>>>> I think it’ll be better to keep this KIP focused on KStream methods
> > only.
> > >>>>> I suspect that the KTable methods may be more complicated than just
> > that
> > >>>>> proposed replacement, but it’ll also be easier to consider that
> > question
> > >>>> in
> > >>>>> isolation.
> > >>>>>
> > >>>>> The nice thing about just deprecating the KStream methods and not
> the
> > >>>>> Transform* interfaces is that you can keep your proposal just
> scoped
> > to
> > >>>>> KStream and not have any consequences for the rest of the DSL.
> > >>>>>
> > >>>>> Thanks again,
> > >>>>> John
> > >>>>>
> > >>>>> On Fri, Feb 11, 2022, at 06:43, Jorge Esteban Quilcate Otoya wrote:
> > >>>>>> Thanks, John.
> > >>>>>>
> > >>>>>>> 4) I agree that we shouldn't deprecate the Transformer*
> > >>>>>> classes, but do you think we should deprecate the
> > >>>>>> KStream#transform* methods? I'm curious if there's any
> > >>>>>> remaining reason to have those methods, or if your KIP
> > >>>>>> completely obviates them.
> > >>>>>>
> > >>>>>> Good catch.
> > >>>>>> I considered that deprecating `Transformer*` and `transform*`
> would
> > go
> > >>>>> hand
> > >>>>>> in hand — maybe it happened similarly with old `Processor` and
> > >>>> `process`?
> > >>>>>> Though deprecating only `transform*` operations could be a better
> > >>>> signal
> > >>>>>> for users than non deprecating anything at all and pave the way to
> > it's
> > >>>>>> deprecation.
> > >>>>>>
> > >>>>>> Should this deprecation also consider including
> > >>>> `KTable#transformValues`?
> > >>>>>> The approach proposed on the KIP:
> > >>>>>> `ktable.toStream().processValues().toTable()` seems fair to me,
> > though
> > >>>> I
> > >>>>>> will have to test it further.
> > >>>>>>
> > >>>>>> I'm happy to update the KIP if there's some consensus around this.
> > >>>>>> Will add the deprecation notes these days and wait for any
> > additional
> > >>>>>> feedback on this topic before wrapping up the KIP.
> > >>>>>>
> > >>>>>>
> > >>>>>> On Fri, 11 Feb 2022 at 04:03, John Roesler 
> > >>>> wrote:
> > >>>>>>
> > >>>

Re: [DISCUSS] KIP-813 Shared State Stores

2022-02-17 Thread Guozhang Wang
Hi Daan,

I think for the read-only state stores you'd need ot slightly augment the
checkpointing logic so that it would still write the checkpointed offsets
while restoring from the changelogs.


Guozhang

On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis 
wrote:

> > Could you add more details about the signature of
> > `addReadOnlyStateStore()` -- What parameters does it take? Are there any
> > overloads taking different parameters? The KIP only contains some verbal
> > description on the "Implementation Plan" section, that is hard to find
> > and hard to read.
> >
> > The KIP mentions a `ProcessorProvider` -- do you mean
> `ProcessorSupplier`?
> >
> > About timestamp synchronization: why do you propose to disable timestamp
> > synchronization (similar to global state stores)? It seems to be an
> > unnecessary limitation? -- Given that we could re-use the new method for
> > source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
> > timestamp synchronization enabled seems to be important?
>
> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
> have allow for timestamp synchronization.
>
> Another thing we were confronted with was the restoring of state when the
> actual local storage is gone. For example, we host on K8s with ephemeral
> pods, so there is no persisted storage between pod restarts. However, the
> consumer group will be already been at the latest offset, preventing from
> previous data to be restored within the new pod’s statestore.
>
> If I remember correctly, there was some checkpoint logic available when
> restoring, but we are bypassing that since logging is disabled on the
> statestore, no?
>
> As always, thanks for your insights.
>
> Cheers,
> D.
>
>
> From: Matthias J. Sax 
> Date: Wednesday, 16 February 2022 at 02:09
> To: dev@kafka.apache.org 
> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> Thanks for updating the KIP.
>
> Could you add more details about the signature of
> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
> overloads taking different parameters? The KIP only contains some verbal
> description on the "Implementation Plan" section, that is hard to find
> and hard to read.
>
> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?
>
> About timestamp synchronization: why do you propose to disable timestamp
> synchronization (similar to global state stores)? It seems to be an
> unnecessary limitation? -- Given that we could re-use the new method for
> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
> timestamp synchronization enabled seems to be important?
>
>
> -Matthias
>
>
> On 2/8/22 11:01 PM, Guozhang Wang wrote:
> > Daan,
> >
> > Thanks for the replies, those make sense to me.
> >
> > On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis 
> wrote:
> >
> >> I just updated the KIP to reflect the things discussed in this thread.
> >>
> >> As for your questions Guozhang:
> >>
> >>> 1) How do we handle if the num.partitions of app A's store changelog is
> >>> different from the num.tasks of app B's sub-topology with that
> read-only
> >>> store? Or are we going to let each task of B keep a whole copy of the
> >> store
> >>> of A by reading all of its changelog partitions, like global stores?
> >>
> >> Good question. Both need to be co-partitioned to have the data
> available.
> >> Another option would be to use IQ to make the request, but that seems
> far
> >> from ideal.
> >>
> >>> 2) Are we trying to synchronize the store updates from the changelog to
> >> app
> >>> B's processing timelines, or just like what we do for global stores
> that
> >> we
> >>> just update the read-only stores async?
> >>
> >> Pretty much the same as we do for global stores.
> >>
> >>> 3) If the answer to both of the above questions are the latter, then
> >> what's
> >>> the main difference of adding a read-only store v.s. adding a global
> >> store?
> >>
> >> I think because of the first answer the behavior differs from global
> >> stores.
> >>
> >> Makes sense?
> >>
> >> Cheers,
> >>
> >> D.
> >>
> >> From: Matthias J. Sax 
> >> Date: Thursday, 20 January 2022 at 21:12
> >> To: dev@kafka.apache.org 
> >> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> >>> Any processor that would use that materialized, read-only state

Re: [DISCUSS] KIP-822: Optimize the semantics of KafkaConsumer#pause to be consistent between the two RebalanceProtocols

2022-02-14 Thread Guozhang Wang
Hi Luke,

Yeah my understanding of the KIP are from
https://issues.apache.org/jira/browse/KAFKA-13463 and
https://issues.apache.org/jira/browse/KAFKA-13425, and I think we and
Riven are on the same page.

If the changes are just within the internal classes, I think we can just
treat it as a normal bug fix and not need a KIP for it.


Guozhang

On Mon, Feb 14, 2022 at 2:22 AM Luke Chen  wrote:

> Hi Guozhang,
>
> In short, what Riven wants to do, is that we keep the `pause` flag after
> rebalanced, while before this change, Rebalance does not preserve
> pause/resume state as described in KAFKA-2350
> <https://issues.apache.org/jira/browse/KAFKA-2350>.
> In this case, do you think we can skip the KIP and move on the
> implementation?
>
> I'm good if we need a KIP or not, just want to make it clear in case it
> breaks any existing expectation/behavior.
>
> Thank you.
> Luke
>
> On Mon, Feb 14, 2022 at 10:09 AM Guozhang Wang  wrote:
>
> > Hello Riven,
> >
> >
> > Thanks for bringing this proposal. As we discussed on the JIRA I'm
> > personally in favor of this fix. But if all the proposed changes are in
> > `ConsumerCoordinator`, then we do not need a KIP since that class is
> > internal only.
> >
> >
> > Guozhang
> >
> > On Sat, Feb 12, 2022 at 1:35 AM Riven Sun 
> > wrote:
> >
> > > Sorry, I sent this email via GMail. Refer to the contents of other
> > people's
> > > DISSCUSS emails. Mistakenly introduced someone else's KIP.
> > >
> > > The KIP related to this DISCUSS is
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199534763
> > >
> > > Thank you for your kindness
> > > RivenSun
> > >
> > > On Sat, Feb 12, 2022 at 5:32 PM Riven Sun  wrote:
> > >
> > > >
> > > >> Sorry, I sent this email via GMail. Refer to the contents of other
> > > >> people's DISSCUSS emails. Mistakenly introduced someone else's KIP.
> > > >>
> > > >> The KIP related to this DISCUSS is
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199534763
> > > >>
> > > >> Thank you for your kindness
> > > >> RivenSun
> > > >>
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-816: Topology changes without local state reset

2022-02-14 Thread Guozhang Wang
Thanks for the clarification John!

Nick, sorry that I was not super clear in my latest email. I meant exactly
what John said.

Just to clarify, I do think that this KIP is relatively orthogonal to the
named topology work; as long as we still keep the topo name encoded it
should be fine since two named topologies can indeed have the same store
name, but that would not need to be considered as part of this KIP.


Guozhang

On Mon, Feb 14, 2022 at 9:02 AM John Roesler  wrote:

> Hi Nick,
>
> When Guozgang and I were chatting, we realized that it’s not completely
> sufficient just to move the state store directories, because their names
> are not unique. In particular, more than one partition of the store may be
> assigned to the same instance. Right now, this is handled because the task
> is encoded the partition number.
>
> For example, if we have a store "mystore" in subtopology 1 and we have two
> out of four partitions (0 and 3) assigned to the local node, the disk will
> have these paths:
>
> {app_id}/1_0/rocksdb/mystore
> {app_id}/1_3/rocksdb/mystore
>
> Clearly, we can't just elevate both "mystore" directories to reside under
> {appid}, because
> they have the same name. When I think of option (A), here's what I picture:
>
> {app_id}/rocksdb/mystore-0
> {app_id}/rocksdb/mystore-3
>
> In the future, one thing we're considering to do is actually store all the
> positions in the same rocksDB database, which is a pretty convenient step
> away from option (A) (another reason to prefer it to option (B) ).
>
> I just took a look at how named topologies are handled, and they're
> actually
> a separate path segment, not part of the task id, like this:
>
> {app_id}/__{topo_name}__/1_0/rocksdb/mystore
> {app_id}/__{topo_name}__/1_3/rocksdb/mystore
>
> Which is pretty convenient because it means there are no
> implications for your proposal. If you implement the above
> code, then we'll just wind up with:
>
> {app_id}/__{topo_name}__/rocksdb/mystore-0
> {app_id}/__{topo_name}__/rocksdb/mystore-3
>
> Does that make sense?
>
> Thanks,
> -John
>
>
> On Mon, Feb 14, 2022, at 03:57, Nick Telford wrote:
> > Hi Guozhang,
> >
> > Sorry I haven't had the time to respond to your earlier email, but I just
> > wanted to clarify something with respect to your most recent email.
> >
> > My original plan in option A is to remove the entire Task ID from the
> State
> > Store path, which would insulate it from any changes to the Task ID
> format
> > introduced by Named Topologies or anything else. This would in fact
> > consolidate the store for the instance, rather than by-Task (which I
> think
> > is what you meant by "one physical store per state"?).
> >
> > I did highlight in option C the possibility of changing the format of the
> > Task ID to change the sub-topology ID from an ordinal to a stable
> > identifier. Although I'm not convinced that this option is viable, or
> even
> > desirable.
> >
> > Regards,
> >
> > Nick
> >
> > On Sat, 12 Feb 2022 at 00:36, Guozhang Wang  wrote:
> >
> > > Just to follow-up on this thread, I had another chat with John
> regarding
> > > option a) and I think the key thought is that, today the task-id is in
> the
> > > form of [sub-topologyID]-[partitionID] --- and in the future with
> > > named-topology it could be extended to three digits as
> > > [named-topologyID]-[sub-topologyID]-[partitionID] --- and for the
> purpose
> > > of this KIP's option A), we actually just want to remove the
> > > [sub-topologyID] from the taskID as part of the file path hierarchy,
> right?
> > >
> > > If yes, given that in the future we want:
> > >
> > > * allow topology evolution with compatibility validations.
> > > * consolidating persistent state stores so that we do not have one
> physical
> > > store per state, but potentially one store for the whole instance.
> > >
> > > No matter if we want to provide certain tooling for mapping the
> persistent
> > > state path / names as in option B), pursuing some solutions in the
> > > direction of option A) to be independent of the sub-topologyID since
> state
> > > store names within a topology should be sufficiently unique would make
> a
> > > lot of sense.
> > >
> > >
> > > On Mon, Feb 7, 2022 at 3:52 PM Guozhang Wang 
> wrote:
> > >
> > > > Hello Nick,
> > > >
> > > > I think I'm on the same page of the scope of your KIP, and what I was
> > > > trying to get is that, there a

Re: [VOTE] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-14 Thread Guozhang Wang
Ah got it.

I'd suggest we keep it a task-level metric then.

On Mon, Feb 14, 2022 at 10:07 AM Sagar  wrote:

> Actually @Guozhang, I just realised I didn't frame my question properly.
> What I wanted to know what, should this be a task-level metric or a
> cache-level metric. Place of definition is one thing, but what kind of
> metric should it be is what I wanted to know.
>
> Thanks!
> Sagar.
>
> On Mon, Feb 14, 2022 at 8:28 AM Sagar  wrote:
>
> > Thanks Guozhang.
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, Feb 14, 2022 at 7:40 AM Guozhang Wang 
> wrote:
> >
> >> Hi Sagar,
> >>
> >> Looks good to define it in the NamedCacheMetrics. Though since this is
> an
> >> internal implementation detail and neither of the classes are public, we
> >> do
> >> not actually need to define it in the KIP :)
> >>
> >>
> >> Guozhang
> >>
> >> On Sat, Feb 12, 2022 at 4:22 AM Sagar 
> wrote:
> >>
> >> > @Guozhang,
> >> >
> >> > I have sent an update to this KIP. I have a question though.. Should
> >> this
> >> > new metric be defined in TaskMetrics level or NamedCacheMetrics? I
> think
> >> > the latter makes sense as that holds the cache size at a task level
> and
> >> > exposes some other cache related metrics as well like hit-ratio.
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> >
> >> > On Sat, Feb 12, 2022 at 1:14 PM Sagar 
> >> wrote:
> >> >
> >> > > Hi All,
> >> > >
> >> > > There's another amendment proposed for this KIP. We are adding a new
> >> > > metric type called *cache-size-bytes-total  *to capture the cache
> >> size in
> >> > > bytes accumulated by a task.
> >> > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> >> > >
> >> > > Thanks!
> >> > > Sagar.
> >> > >
> >> > > On Mon, Jan 24, 2022 at 7:55 AM Guozhang Wang 
> >> > wrote:
> >> > >
> >> > >> Thanks Sagar, I'm +1 on the renamed metric.
> >> > >>
> >> > >> On Sat, Jan 22, 2022 at 6:56 PM Sagar 
> >> > wrote:
> >> > >>
> >> > >> > Hi All,
> >> > >> >
> >> > >> > There is a small update to the KIP whereby the newly introduced
> >> metric
> >> > >> > *total-bytes
> >> > >> > *has been renamed to *input-buffer-bytes-total.*
> >> > >> >
> >> > >> > Thanks!
> >> > >> > Sagar.
> >> > >> >
> >> > >> > On Wed, Sep 29, 2021 at 9:57 AM Sagar  >
> >> > >> wrote:
> >> > >> >
> >> > >> > > We have 3 binding votes: Sophie/Guozhang/Mathias
> >> > >> > > and 2 non-binding votes: Josep/Luke and no -1 votes.
> >> > >> > >
> >> > >> > > Marking this KIP as accepted.
> >> > >> > >
> >> > >> > > Thanks everyone!
> >> > >> > >
> >> > >> > > Thanks!
> >> > >> > > Sagar.
> >> > >> > >
> >> > >> > >
> >> > >> > >
> >> > >> > > On Wed, Sep 29, 2021 at 7:18 AM Matthias J. Sax <
> >> mj...@apache.org>
> >> > >> > wrote:
> >> > >> > >
> >> > >> > >> +1 (binding)
> >> > >> > >>
> >> > >> > >> On 9/28/21 10:40 AM, Sagar wrote:
> >> > >> > >> > Hi All,
> >> > >> > >> >
> >> > >> > >> > Bumping this vote thread again!
> >> > >> > >> >
> >> > >> > >> > Thanks!
> >> > >> > >> > Sagar.
> >> > >> > >> >
> >> > >> > >> > On Wed, Sep 8, 2021 at 1:19 PM Luke Chen  >
> >> > >> wrote:
> >> > >> > >> >
> >> > >> > >> >> Thanks for the KIP.
> >> > >> > >>

Re: [DISCUSS] KIP-792: Add "generation" field into consumer protocol

2022-02-14 Thread Guozhang Wang
Thanks Luke, no more comments from me, nice work!

On Mon, Feb 14, 2022 at 5:22 AM Luke Chen  wrote:

> Hi Guozhang,
>
> Thanks for your comments. I've updated the KIP.
> Here's what I've updated:
>
> * In the motivation section, I've added this paragraph after
> cooperativeStickyAssignor like this:
>
> *On the other hand,  `StickyAssignor` is also adding "generation" field
> plus the "ownedPartitions" into subscription userData bytes. the difference
> is that the `StickyAssignor`'s user bytes also encode the prev-owned
> partitions while the `CooperativeStickyAssignor` relies on the prev-owned
> partitions on the subscription protocol directly.*
>
> * In the proposed change section, I've updated the paragraph as:
>
>
> *For built-in CooperativeStickyAssignor, if there are consumers in old
> bytecode and some in the new bytecode, it's totally fine, because the
> subscription data from old consumers will contain \[empty ownedPartitions +
> default generation(-1)] in V0, or \[current ownedPartitions + default
> generation(-1)] in V1. For V0 case, it's quite simple, because we'll just
> ignore the info since they are empty. For V1 case, we'll get the
> "ownedPartitions" data, and then decode the "generation" info in the
> subscription userData bytes. So that we can continue to do assignment with
> these information.*
> * Also, after the "cooperativeStickyAssignor paragraph, I've also mentioned
> stickyAssignor:
>
>
> *For built-in StickyAssignor, if there are consumers in old bytecode and
> some in the new bytecode, it's also fine, because the subscription data
> from old consumers will contain \[empty ownedPartitions + default
> generation(-1)] in V0, or \[current ownedPartitions + default
> generation(-1)] in V1. For both V0 and V1 case, we'll directly use the
> ownedPartition and generation info in the subscription userData bytes. *
>
> Please let me know if you have other comments.
>
> Thank you.
> Luke
>
> On Wed, Feb 9, 2022 at 2:57 PM Guozhang Wang  wrote:
>
> > Hello Luke,
> >
> > Thanks for the updated KIP, I've taken a look at it and still LGTM. Just
> a
> > couple minor comments in the wiki:
> >
> > * Both `StickyAssignor` and `CooperativeStickyAssignor` that there's
> > already generation is encoded in user-data bytes, the difference is that
> > the `StickyAssignor`'s user bytes also encode the prev-owned partitions
> > while the `CooperativeStickyAssignor` relies on the prev-owned partitions
> > on the subscription protocol directly. So we can add the `StickyAssignor`
> > in your paragraph talking about `CooperativeStickyAssignor` as well.
> >
> > * This sentence: "otherwise, we'll take the ownedPartitions as default
> > generation(-1)." does not read right to me, maybe need to rephrase a bit?
> >
> >
> > Guozhang
> >
> > On Mon, Feb 7, 2022 at 7:36 PM Luke Chen  wrote:
> >
> > > Hi David,
> > >
> > > Thanks for your comments.
> > > I've updated the KIP to add changes in Subscription class.
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Fri, Feb 4, 2022 at 11:43 PM David Jacot
>  > >
> > > wrote:
> > >
> > > > Hi Luke,
> > > >
> > > > Thanks for updating the KIP. I just have a minor request.
> > > > Could you fully describe the changes to the Subscription
> > > > public class in the KIP? I think that it would be better than
> > > > just saying that the generation will be added to id.
> > > >
> > > > Otherwise, the KIP LGTM.
> > > >
> > > > Thanks,
> > > > David
> > > >
> > > > On Mon, Nov 29, 2021 at 4:29 AM Luke Chen  wrote:
> > > > >
> > > > > Hi devs,
> > > > > Welcome to provide feedback.
> > > > >
> > > > > If there are no other comments, I'll start a vote tomorrow.
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > >
> > > > > On Mon, Nov 22, 2021 at 4:16 PM Luke Chen 
> wrote:
> > > > >
> > > > > > Hello David,
> > > > > >
> > > > > > For (3):
> > > > > >
> > > > > >
> > > > > >
> > > > > > * I suppose that we could add a `generation` field to the
> > > > JoinGroupRequest
> > > > > > instead to do the fencing that you describe while handling the
> > > > sentinel in
&

Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-13 Thread Guozhang Wang
Thanks Seung-chan!

Re: "As I read the code, we can set a consumer to leave the group while
shutting
down the thread, using `StreamThread#requestLeaveGroupDuringShutdown`
method. Is it enough to call that method on every thread created in
`KafkaStream` sometime before we call `StreamThread#shutdown`?"

I think you are right. We can bypass API changes in consumers for this KIP.

Re: "I updated the `Public Interfaces` section in the KIP to specify full
signature. As you can see, I suggested putting the `CloseOptions` class in
the `KafkaStream` class. I feel like it'd be too general name to put it in
a separate file."

Re-read the updated KIP, looks good to me!


Guozhang

On Fri, Feb 11, 2022 at 11:21 PM Seung-chan Ahn 
wrote:

> (re-sending with the better syntax of quotation)
>
> Hello Guozhang,
>
> Thanks for your rich comment! I'd carefully read through all you mentioned.
> I updated the `Public Interfaces` section of KIP and this is what I think:
>
> @Guozhang: Today we use the timeout to try to tackle all three cases, but
> > ideally we want the client to submit extra information to help
> distinguish
> > them. I.e. we just use timeout for case 1) only, while we use separate
> > mechanisms to differentiate 2) and 3) from it. Personally I think we
> could
> > consider having an augmented leave-group (or maybe in the long run, we
> can
> > merge that RPC as part of heartbeat) with a flag indicating 2) or 3),
> while
> > just relying on the timeout for case 1).
>
>
> I know you proposed this idea in a wider scope than this KIP, but it'd be
> worth keeping the discussion. I've thought about the idea of `augmented
> leave-group with a flag indicating 2) or 3)`. In the case that a bouncing
> consumer requested, with a flag, to leave the group, and unfortunately, it
> failed to restart, I guess the group’s coordinator still needs to drop the
> consumer after some while. And by its nature, the coordinator would wait
> for the consumer till the timeout reached. Eventually, it seems like not
> really different from the case the consumer restarts and prays the timeout
> is enough. In my very naive thought, `augmented leave-group with a flag
> indicating 2)` is not supposed to be a request to leave the group but the
> one for being exempted from the timeout. So I’d rather consider having a
> request to extend the timeout for one time instead.
>
> @Guozhang: 1. Regarding the API change, I feel just doing that on the
> > streams side is not enough since by the end of the day we still need the
> > consumer to incorporate it (today it's via a static config and hence we
> > cannot just dynamically change the config).
>
>
> As I read the code, we can set a consumer to leave the group while shutting
> down the thread, using `StreamThread#requestLeaveGroupDuringShutdown`
> method. Is it enough to call that method on every thread created in
> `KafkaStream` sometime before we call `StreamThread#shutdown`?
>
> @Guozhang: 2. Regarding the API itself, I want to see a more concrete
> > proposal that contains the full signature, e.g.
> does".closeAndLeaveGroup()"
> > include the timeout param as well, etc? My very subjective preference is
> to
> > not differentiate by the function name in case we may want to augment the
> > close function in the future, which would explode the function names :P
> > Instead maybe we can just overload the `close()` function again but with
> a
> > control object, that includes 1) timeout, 2) leave-group flag, and hence
> > can also extend to include more variables just in case. Would like to
> hear
> > others' thoughts as well.
>
>
> I personally prefer the control object pattern too. It will save us from
> the "telescoping constructors" pattern. Also, I found that we already
> introduced this way on `AdminClient`. It sounds consistent to have the same
> pattern in this case.
>
> I updated the `Public Interfaces` section in the KIP to specify full
> signature. As you can see, I suggested putting the `CloseOptions` class in
> the `KafkaStream` class. I feel like it'd be too general name to put it in
> a separate file.
>
> I’m fully open :) Feel free to oppose any.
>
> On Mon, Feb 7, 2022 at 12:50 PM Guozhang Wang  wrote:
>
> > Hello Seung-chan,
> >
> > Thanks for the KIP writeup and summary! I made a pass on it and want to
> > share some of my thoughts:
> >
> > On the very high level, we want to be able to effectively differentiate
> > several cases as follows:
> >
> > 1) There's a network partition / soft failure hence clients cannot reach
> > the broker, temporarily: here we want to give some time to see if the
> > clients 

Re: [VOTE] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-13 Thread Guozhang Wang
Hi Sagar,

Looks good to define it in the NamedCacheMetrics. Though since this is an
internal implementation detail and neither of the classes are public, we do
not actually need to define it in the KIP :)


Guozhang

On Sat, Feb 12, 2022 at 4:22 AM Sagar  wrote:

> @Guozhang,
>
> I have sent an update to this KIP. I have a question though.. Should this
> new metric be defined in TaskMetrics level or NamedCacheMetrics? I think
> the latter makes sense as that holds the cache size at a task level and
> exposes some other cache related metrics as well like hit-ratio.
>
> Thanks!
> Sagar.
>
>
> On Sat, Feb 12, 2022 at 1:14 PM Sagar  wrote:
>
> > Hi All,
> >
> > There's another amendment proposed for this KIP. We are adding a new
> > metric type called *cache-size-bytes-total  *to capture the cache size in
> > bytes accumulated by a task.
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, Jan 24, 2022 at 7:55 AM Guozhang Wang 
> wrote:
> >
> >> Thanks Sagar, I'm +1 on the renamed metric.
> >>
> >> On Sat, Jan 22, 2022 at 6:56 PM Sagar 
> wrote:
> >>
> >> > Hi All,
> >> >
> >> > There is a small update to the KIP whereby the newly introduced metric
> >> > *total-bytes
> >> > *has been renamed to *input-buffer-bytes-total.*
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> > On Wed, Sep 29, 2021 at 9:57 AM Sagar 
> >> wrote:
> >> >
> >> > > We have 3 binding votes: Sophie/Guozhang/Mathias
> >> > > and 2 non-binding votes: Josep/Luke and no -1 votes.
> >> > >
> >> > > Marking this KIP as accepted.
> >> > >
> >> > > Thanks everyone!
> >> > >
> >> > > Thanks!
> >> > > Sagar.
> >> > >
> >> > >
> >> > >
> >> > > On Wed, Sep 29, 2021 at 7:18 AM Matthias J. Sax 
> >> > wrote:
> >> > >
> >> > >> +1 (binding)
> >> > >>
> >> > >> On 9/28/21 10:40 AM, Sagar wrote:
> >> > >> > Hi All,
> >> > >> >
> >> > >> > Bumping this vote thread again!
> >> > >> >
> >> > >> > Thanks!
> >> > >> > Sagar.
> >> > >> >
> >> > >> > On Wed, Sep 8, 2021 at 1:19 PM Luke Chen 
> >> wrote:
> >> > >> >
> >> > >> >> Thanks for the KIP.
> >> > >> >>
> >> > >> >> + 1 (non-binding)
> >> > >> >>
> >> > >> >> Thanks.
> >> > >> >> Luke
> >> > >> >>
> >> > >> >> On Wed, Sep 8, 2021 at 2:48 PM Josep Prat
> >> >  >> > >> >
> >> > >> >> wrote:
> >> > >> >>
> >> > >> >>> +1 (non binding).
> >> > >> >>>
> >> > >> >>> Thanks for the KIP Sagar!
> >> > >> >>> ———
> >> > >> >>> Josep Prat
> >> > >> >>>
> >> > >> >>> Aiven Deutschland GmbH
> >> > >> >>>
> >> > >> >>> Immanuelkirchstraße 26, 10405 Berlin
> >> > >> >>>
> >> > >> >>> Amtsgericht Charlottenburg, HRB 209739 B
> >> > >> >>>
> >> > >> >>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >> > >> >>>
> >> > >> >>> m: +491715557497
> >> > >> >>>
> >> > >> >>> w: aiven.io
> >> > >> >>>
> >> > >> >>> e: josep.p...@aiven.io
> >> > >> >>>
> >> > >> >>> On Wed, Sep 8, 2021, 03:29 Sophie Blee-Goldman
> >> > >> >>  >> > >> >>>>
> >> > >> >>> wrote:
> >> > >> >>>
> >> > >> >>>> +1 (binding)
> >> > >> >>>>
> >> > >> >>>> Thanks for the KIP!
> >> > >> >>>>
> >> > >> >>>> -Sophie
> >> > >> >>>>
> >> > >> >>>> On Tue, Sep 7, 2021 at 1:59 PM Guozhang Wang <
> >> wangg...@gmail.com>
> >> > >> >> wrote:
> >> > >> >>>>
> >> > >> >>>>> Thanks Sagar, +1 from me.
> >> > >> >>>>>
> >> > >> >>>>>
> >> > >> >>>>> Guozhang
> >> > >> >>>>>
> >> > >> >>>>> On Sat, Sep 4, 2021 at 10:29 AM Sagar <
> >> sagarmeansoc...@gmail.com>
> >> > >> >>> wrote:
> >> > >> >>>>>
> >> > >> >>>>>> Hi All,
> >> > >> >>>>>>
> >> > >> >>>>>> I would like to start a vote on the following KIP:
> >> > >> >>>>>>
> >> > >> >>>>>>
> >> > >> >>>>>
> >> > >> >>>>
> >> > >> >>>
> >> > >> >>
> >> > >>
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> >> > >> >>>>>>
> >> > >> >>>>>> Thanks!
> >> > >> >>>>>> Sagar.
> >> > >> >>>>>>
> >> > >> >>>>>
> >> > >> >>>>>
> >> > >> >>>>> --
> >> > >> >>>>> -- Guozhang
> >> > >> >>>>>
> >> > >> >>>>
> >> > >> >>>
> >> > >> >>
> >> > >> >
> >> > >>
> >> > >
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-822: Optimize the semantics of KafkaConsumer#pause to be consistent between the two RebalanceProtocols

2022-02-13 Thread Guozhang Wang
Hello Riven,


Thanks for bringing this proposal. As we discussed on the JIRA I'm
personally in favor of this fix. But if all the proposed changes are in
`ConsumerCoordinator`, then we do not need a KIP since that class is
internal only.


Guozhang

On Sat, Feb 12, 2022 at 1:35 AM Riven Sun  wrote:

> Sorry, I sent this email via GMail. Refer to the contents of other people's
> DISSCUSS emails. Mistakenly introduced someone else's KIP.
>
> The KIP related to this DISCUSS is
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199534763
>
> Thank you for your kindness
> RivenSun
>
> On Sat, Feb 12, 2022 at 5:32 PM Riven Sun  wrote:
>
> >
> >> Sorry, I sent this email via GMail. Refer to the contents of other
> >> people's DISSCUSS emails. Mistakenly introduced someone else's KIP.
> >>
> >> The KIP related to this DISCUSS is
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199534763
> >>
> >> Thank you for your kindness
> >> RivenSun
> >>
> >
>


-- 
-- Guozhang


Re: [DISCUSS] Should we automatically close stale PRs?

2022-02-13 Thread Guozhang Wang
Thanks for the investigation, Luke!

I think just pining committer's ids in a comment is sufficient.

On Sun, Feb 13, 2022 at 5:00 AM Luke Chen  wrote:

> Hi Guozhang,
>
> Had a quick search on the github action, didn't find any notification
> related actions.
> However, there's a github action to auto leave a comment before closing
> PR.[1]
> So, I think at least, we can leave a comment that notify the PR
> participants.
> If the auto comment action can support mention users (i.e. @ user), we can
> notify anyone we want.
>
> [1] https://github.com/actions/stale#close-pr-message
>
> Thanks.
> Luke
>
>
> On Fri, Feb 11, 2022 at 6:36 AM Guozhang Wang  wrote:
>
> > Just going back to the PRs, @David Jacot, do you know if the
> actions/stale
> > <https://github.com/actions/stale> tool is able to send notifications to
> > pre-configured recipients when closing stale PRs?
> >
> > On Wed, Feb 9, 2022 at 9:21 PM Matthias J. Sax  >
> > wrote:
> >
> > > Nikolay,
> > >
> > > thanks for helping out!
> > >
> > > > First, I thought it’s an author job to keep KIP status up to date.
> > > > But, it can be tricky to determine actual KIP status because of lack
> of
> > > feedback from committers
> > >
> > > Yes, it is the author's task, but it's also the author's task to keep
> > > the discussion alive (what -- to be fair -- can be hard). We had some
> > > great contributions thought that took very long, but the KIP author
> kept
> > > following up and thus signaling that they still have interest. Just
> > > going silent and effectively dropping a KIP is not the best way (even
> if
> > > I can understand that it sometime frustrating and some people just walk
> > > away).
> > >
> > >
> > > > Second - the other issue is determine - what KIP just wait for a hero
> > to
> > > implement it, and what just wrong idea or something similar.
> > >
> > > As pointed out on the KIP wiki page, if somebody is not willing to
> > > implement the KIP, they should not even start it. Getting a KIP voted
> > > but not finishing it, is not really helping the project.
> > >
> > > About "just the wrong idea": this also happens, but usually it's clear
> > > quite quickly if people raise concerns about an idea.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 2/9/22 12:13, Nikolay Izhikov wrote:
> > > >> Thanks for that list, Nikolay,
> > > >
> > > > Thank, John.
> > > >
> > > > I made a second round of digging through abandoned PR’s.
> > > > Next pack, that should be closed:
> > > >
> > > > https://github.com/apache/kafka/pull/1291
> > > > https://github.com/apache/kafka/pull/1323
> > > > https://github.com/apache/kafka/pull/1412
> > > > https://github.com/apache/kafka/pull/1757
> > > > https://github.com/apache/kafka/pull/1741
> > > > https://github.com/apache/kafka/pull/1715
> > > > https://github.com/apache/kafka/pull/1668
> > > > https://github.com/apache/kafka/pull/1666
> > > > https://github.com/apache/kafka/pull/1661
> > > > https://github.com/apache/kafka/pull/1626
> > > > https://github.com/apache/kafka/pull/1624
> > > > https://github.com/apache/kafka/pull/1608
> > > > https://github.com/apache/kafka/pull/1606
> > > > https://github.com/apache/kafka/pull/1582
> > > > https://github.com/apache/kafka/pull/1522
> > > > https://github.com/apache/kafka/pull/1516
> > > > https://github.com/apache/kafka/pull/1493
> > > > https://github.com/apache/kafka/pull/1473
> > > > https://github.com/apache/kafka/pull/1870
> > > > https://github.com/apache/kafka/pull/1883
> > > > https://github.com/apache/kafka/pull/1893
> > > > https://github.com/apache/kafka/pull/1894
> > > > https://github.com/apache/kafka/pull/1912
> > > > https://github.com/apache/kafka/pull/1933
> > > > https://github.com/apache/kafka/pull/1983
> > > > https://github.com/apache/kafka/pull/1984
> > > > https://github.com/apache/kafka/pull/2017
> > > > https://github.com/apache/kafka/pull/2018
> > > >
> > > >> 9 февр. 2022 г., в 22:37, John Roesler 
> > > написал(а):
> > > >>
> > > >> Thanks for that list, Nikolay,
> > > >>
> > > >> I've just

Re: [DISCUSS] KIP-816: Topology changes without local state reset

2022-02-11 Thread Guozhang Wang
Just to follow-up on this thread, I had another chat with John regarding
option a) and I think the key thought is that, today the task-id is in the
form of [sub-topologyID]-[partitionID] --- and in the future with
named-topology it could be extended to three digits as
[named-topologyID]-[sub-topologyID]-[partitionID] --- and for the purpose
of this KIP's option A), we actually just want to remove the
[sub-topologyID] from the taskID as part of the file path hierarchy, right?

If yes, given that in the future we want:

* allow topology evolution with compatibility validations.
* consolidating persistent state stores so that we do not have one physical
store per state, but potentially one store for the whole instance.

No matter if we want to provide certain tooling for mapping the persistent
state path / names as in option B), pursuing some solutions in the
direction of option A) to be independent of the sub-topologyID since state
store names within a topology should be sufficiently unique would make a
lot of sense.


On Mon, Feb 7, 2022 at 3:52 PM Guozhang Wang  wrote:

> Hello Nick,
>
> I think I'm on the same page of the scope of your KIP, and what I was
> trying to get is that, there are some other efforts going on in parallel
> that tries to identify if two topologies, or some part of them, are
> isomorphic in structure, and hence their corresponding persistent states
> may be reusable. That's why I was saying that "assume, we know which
> persistent states can be reusable". i.e. let's say we know that the new
> topology's sub-topology 1's state store A-0005 is the same as the old
> topology's sub-topology 0' state store A-0004, then what we can do to let
> the new topology state store to be loaded as the old state store. With that
> in my mind originally, I said maybe option B) is sufficient to rename the
> dir path / state store names before we start the new app's topology. But
> that's just one aspect of it and we do not necessarily need to follow :) If
> you are up to do a prototype following option A) with a good upgrade path,
> it would be a great solution too.
>
>
> On Mon, Feb 7, 2022 at 8:59 AM John Roesler  wrote:
>
>> Thanks, Nick,
>>
>> It sounds like we're on the same page. I didn't think (A)
>> would be fundamentally "hard", just that it might be a pain
>> in practice. Reading your response, if you're up for it, it
>> sounds like a prototype of (A) would be the tie-breaker
>> between the two approaches.
>>
>> To be honest, I've been burned enough times that I tend to
>> prototype my KIPs more often than not anyway.
>>
>> Thanks,
>> -John
>>
>> On Mon, 2022-02-07 at 11:42 +, Nick Telford wrote:
>> > Hi everyone,
>> >
>> > Guozhang, the scope of my KIP is specifically about deploying structural
>> > changes to existing applications, i.e. "upgrades". Sharing state between
>> > different applications was not in the scope of my original proposal.
>> >
>> > John's email has it exactly right, and I think this points to my KIP not
>> > explaining the problem correctly. Any suggestions on how I could better
>> > clarify the intent of my proposal in the KIP?
>> >
>> > John, regarding your comments:
>> >
>> > A) being difficult to clean up state after migrations. Unless I've
>> missed
>> > something, this shouldn't be a problem. Tasks are already internally
>> aware
>> > of which stores they own from the Topology structure, irrespective of
>> where
>> > on-disk the StateStore data is, they should be able to find it. I think
>> the
>> > only real issue with this approach is that it will require changing,
>> most
>> > likely, quite a bit of code. We'll need to separate the concept of
>> "state
>> > directory" from "task directory", (which will still be needed to store
>> Task
>> > meta-data, like .lock files). At the very least, I think significant
>> > changes may need to be made to StateDirectory and StateManager, but I
>> > haven't investigated in detail. Perhaps it would make sense to first
>> > explore this approach with a prototype to see how invasive it would
>> become?
>> >
>> > B) My intent was always that this process would occur between
>> > KafkaStreams.start() and threads actually starting, so that the
>> migration
>> > would occur safely. I'm not sure what kind of unexpected structural
>> changes
>> > could be detected by such a process; it might just be useful for general
>> > validation. The main reasons I prefer (A) is that: 1) (B) requires
>>

[jira] [Resolved] (KAFKA-12256) auto commit causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION

2022-02-11 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12256.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> auto commit causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION
> -
>
> Key: KAFKA-12256
> URL: https://issues.apache.org/jira/browse/KAFKA-12256
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.0.0
>Reporter: Ryan Leslie
>Priority: Minor
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.2.0
>
>
> In KAFKA-6829 a change was made to the consumer to internally retry commits 
> upon receiving UNKNOWN_TOPIC_OR_PARTITION.
> Though this helped mitigate issues around stale broker metadata, there were 
> some valid concerns around the negative effects for routine topic deletion:
> https://github.com/apache/kafka/pull/4948
> In particular, if a commit is issued for a deleted topic, retries can block 
> the consumer for up to max.poll.interval.ms. This is tunable of course, but 
> any amount of stalling in a consumer can lead to unnecessary lag.
> One of the assumptions while permitting the change was that in practice it 
> should be rare for commits to occur for deleted topics, since that would 
> imply messages were being read or published at the time of deletion. It's 
> fair to expect users to not delete topics that are actively published to. But 
> this assumption is false in cases where auto commit is enabled.
> With the current implementation of auto commit, the consumer will regularly 
> issue commits for all topics being fetched from, regardless of whether or not 
> messages were actually received. The fetch positions are simply flushed, even 
> when they are 0. This is simple and generally efficient, though it does mean 
> commits are often redundant. Besides the auto commit interval, commits are 
> also issued at the time of rebalance, which is often precisely at the time 
> topics are deleted.
> This means that in practice commits for deleted topics are not really rare. 
> This is particularly an issue when the consumer is subscribed to a multitude 
> of topics using a wildcard. For example, a consumer might subscribe to a 
> particular "flavor" of topic with the aim of auditing all such data, and 
> these topics might dynamically come and go. The consumer's metadata and 
> rebalance mechanisms are meant to handle this gracefully, but the end result 
> is that such groups are often blocked in a commit for several seconds or 
> minutes (the default is 5 minutes) whenever a delete occurs. This can 
> sometimes result in significant lag.
> Besides having users abandon auto commit in the face of topic deletes, there 
> are probably multiple ways to deal with this, including reconsidering if 
> commits still truly need to be retried here, or if this behavior should be 
> more configurable; e.g. having a separate commit timeout or policy. In some 
> cases the loss of a commit and subsequent message duplication is still 
> preferred to processing delays. And having an artificially low 
> max.poll.interval.ms or rebalance.timeout.ms comes with its own set of 
> concerns.
> In the very least the current behavior and pitfalls around delete with active 
> consumers should be documented.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-818: Introduce cache-size-bytes-total Task Level Metric

2022-02-10 Thread Guozhang Wang
Thanks for the updates Sagar!

On Thu, Feb 10, 2022 at 5:59 PM Sagar  wrote:

> Hi All,
>
> As discussed above, this KIP would be discarded and the new metric proposed
> here would be added to KIP-770 as the need to add a new metric was
> discovered when working on it.
>
> Thanks!
> Sagar.
>
> On Thu, Feb 10, 2022 at 9:54 AM Sagar  wrote:
>
> > Hi Guozhang,
> >
> > Sure. I will add it to the KIP.
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, Feb 7, 2022 at 6:22 AM Guozhang Wang  wrote:
> >
> >> Since the PR is reopened and we are going to re-merged the fixed PRs,
> what
> >> about just adding that as part of the KIP as the addendum?
> >>
> >> On Fri, Feb 4, 2022 at 2:13 AM Sagar  wrote:
> >>
> >> > Thanks Sophie/Guozhang.
> >> >
> >> > Yeah I could have amended the KIP but it slipped my mind when Guozhang
> >> > proposed this in the PR. Later on, the PR was merged and KIP was
> marked
> >> as
> >> > adopted so I thought I will write a new one. I know the PR had been
> >> > reopened now :p . I dont have much preference on a new KIP v/s the
> >> original
> >> > one so anything is ok with me as well.
> >> >
> >> > I agree with the INFO part. I will make that change.
> >> >
> >> > Regarding task level, from my understanding, since every task's
> >> > buffer/cache size might be different so if a certain task might be
> >> > overshooting the limits then the task level metric might help people
> to
> >> > infer this. Also, thanks for the explanation Guozhang on why this
> >> should be
> >> > a task level metric. What are your thoughts on this @Sophie?
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> >
> >> > On Fri, Feb 4, 2022 at 4:47 AM Guozhang Wang 
> >> wrote:
> >> >
> >> > > Thanks Sagar for proposing the KIP, and Sophie for sharing your
> >> thoughts.
> >> > > Here're my 2c:
> >> > >
> >> > > I think I agree with Sophie for making the two metrics (both the
> added
> >> > and
> >> > > the newly proposed) on INFO level since we are always calculating
> them
> >> > > anyways. Regarding the level of the cache-size though, I'm thinking
> a
> >> bit
> >> > > different with you two: today we do not actually keep that caches on
> >> the
> >> > > per-store level, but rather on the per-thread level, i.e. when the
> >> cache
> >> > is
> >> > > full we would flush not only on the triggering state store but also
> >> > > potentially on other state stores as well of the task that thread
> >> owns.
> >> > > This mechanism, in hindsight, is a bit weird and we have some
> >> discussions
> >> > > about refactoring that in the future already. Personally I'd like to
> >> make
> >> > > this new metric to be aligned with whatever our future design will
> be.
> >> > >
> >> > > In the long run if we would not have a static assignment from tasks
> to
> >> > > threads, it may not make sense to keep a dedicated cache pool per
> >> thread.
> >> > > Instead all tasks will be dynamically sharing the globally
> configured
> >> max
> >> > > cache size (dynamically here means, we would not just divide the
> total
> >> > size
> >> > > by the num.tasks and then assign that to each task), and when a
> cache
> >> put
> >> > > triggers the flushing because the sum now exceeds the global
> >> configured
> >> > > value, we would potentially flush all the cached records for that
> >> task.
> >> > If
> >> > > this is the end stage, then I think keeping this metric at the task
> >> level
> >> > > is good.
> >> > >
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Thu, Feb 3, 2022 at 10:15 AM Sophie Blee-Goldman
> >> > >  wrote:
> >> > >
> >> > > > Hey Sagar,  thanks for the KIP!
> >> > > >
> >> > > > And yes, all metrics are considered part of the public API and
> thus
> >> > > require
> >> > > > a KIP to add

Re: [DISCUSS] Should we automatically close stale PRs?

2022-02-10 Thread Guozhang Wang
 label "Stale" is added to the PR.
> >>>>>>>
> >>>>>>> +1 for Matthias' proposal of non-committers doing a pre-review.
> That
> >>>>>> would definitely save some time for committer reviews.
> >>>>>>>
> >>>>>>> Nikolay, great that you are willing to do reviews. We do not have a
> >>>>>> separate list of PRs that need pre-reviews. You can consult the
> list of PRs
> >>>>>> of Apache Kafka (https://github.com/apache/kafka/pulls) and choose
> from
> >>>>>> there. I think that is the simplest way to start reviewing. Maybe
> Luke has
> >>>>>> some tips here since he does an excellent job in reviewing as a
> >>>>>> non-committer.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>> On 07.02.22 08:24, Nikolay Izhikov wrote:
> >>>>>>>> Hello, Matthias, Luke.
> >>>>>>>>> I agree with Matthias that contributors could and should help do
> more
> >>>>>> "pre-review" PRs.
> >>>>>>>> I, personally, ready to do the initial review of PRs. Do we have
> some
> >>>>>> recipe to filter PRs that has potential to land in trunk?
> >>>>>>>> Can, you, please, send me list of PRs that need to be
> pre-reviewed?
> >>>>>>>>> I might be useful thought to just do a better job to update KIP
> status
> >>>>>> more frequently
> >>>>>>>> First, I thought it’s an author job to keep KIP status up to date.
> >>>>>>>> But, it can be tricky to determine actual KIP status because of
> lack of
> >>>>>> feedback from committers :)
> >>>>>>>> Second - the other issue is determine - what KIP just wait for a
> hero
> >>>>>> to implement it, and what just wrong idea or something similar.
> >>>>>>>> All of this kind of KIPs has status «Under discussion».
> >>>>>>>> Actually, if someone has a list of potentially useful KIPS -
> please,
> >>>>>> send it.
> >>>>>>>> I’m ready to work on one of those.
> >>>>>>>>> 7 февр. 2022 г., в 05:28, Luke Chen 
> написал(а):
> >>>>>>>>>
> >>>>>>>>> I agree with Matthias that contributors could and should help do
> more
> >>>>>>>>> "pre-review" PRs.
> >>>>>>>>> Otherwise, we're not fixing the root cause of the issue, and
> still
> >>>>>> keeping
> >>>>>>>>> piling up the PRs (and auto closing them after stale)
> >>>>>>>>>
> >>>>>>>>> And I also agree with Guozhang that we should try to notify at
> least
> >>>>>> the
> >>>>>>>>> committers about the closed PRs (maybe PR participants +
> committers if
> >>>>>>>>> possible).
> >>>>>>>>> Although the PRs are stale, there might be some good PRs just got
> >>>>>> ignored.
> >>>>>>>>>
> >>>>>>>>> Thank you.
> >>>>>>>>> Luke
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Feb 7, 2022 at 6:50 AM Guozhang Wang  >
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks for bringing this up David. I'm in favor of some
> automatic
> >>>>>> ways to
> >>>>>>>>>> clean up stale PRs. More specifically:
> >>>>>>>>>>
> >>>>>>>>>> * I think there are indeed many root causes why we have so many
> stale
> >>>>>> PRs
> >>>>>>>>>> that we should consider, and admittedly the reviewing manpower
> cannot
> >>>>>> keep
> >>>>>>>>>> up with the contributing pace is a big one of them. But in this
> >>>>>> discussion
> >>>>>>>>>> I'd personally like to keep this out of the scope and maybe
> keep it
> >>>>>> as a
> &g

[ANNOUNCE] New committer: Luke Chen

2022-02-09 Thread Guozhang Wang
The PMC for Apache Kafka has invited Luke Chen (showuon) as a committer and
we are pleased to announce that he has accepted!

Luke has been actively contributing to Kafka since early 2020. He has
made more than 120 commits on various components of Kafka, with notable
contributions to the rebalance protocol in Consumer and Streams (KIP-766,
KIP-726, KIP-591, KAFKA-12675 and KAFKA12464, to just name a few), as well
as making an impact on improving test stability of the project. Aside from
all his code contributions, Luke has been a great participant in
discussions across the board, a very active and helpful reviewer of other
contributors' works, all of which are super valuable and highly appreciated
by the community.


Thanks for all of your contributions Luke. Congratulations!

-- Guozhang, on behalf of the Apache Kafka PMC


Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-02-09 Thread Guozhang Wang
I'm +1 on John's point 3) for punctuations.

And I think if people are on the same page that a reference equality check
per record is not a huge overhead, I think doing that enforcement is better
than documentations and hand-wavy undefined behaviors.


Guozhang

On Wed, Feb 9, 2022 at 11:27 AM John Roesler  wrote:

> Thanks for the KIP Jorge,
>
> I'm in support of your proposal.
>
> 1)
> I do agree with Guozhang's point (1). I think the cleanest
> approach. I think it's cleaner and better to keep the
> enforcement internal to the framework than to introduce a
> public API or context wrapper for processors to use
> explicitly.
>
> 2) I tend to agree with you on this one; I think the
> equality check ought to be fast enough in practice.
>
> 3) I think this is implicit, but should be explicit in the
> KIP: For the `processValues` API, because the framework sets
> the key on the context before calling `process` and then
> unsets it afterwards, there will always be no key set during
> task puctuation. Therefore, while processors may still
> register punctuators, they will not be able to forward
> anything from them.
>
> This is functionally equivalent to the existing
> transformers, by the way, that are also forbidden to forward
> anything during punctuation.
>
> For what it's worth, I think this is the best tradeoff.
>
> The only alternative I see is not to place any restriction
> on forwarded keys at all and just document that if users
> don't maintain proper partitioning, they'll get undefined
> behavior. That might be more powerful, but it's also a
> usability problem.
>
> Thanks,
> -John
>
> On Wed, 2022-02-09 at 11:34 +, Jorge Esteban Quilcate
> Otoya wrote:
> > Thanks Guozhang.
> >
> > > Does `ValueProcessorContext` have to be a public API? It seems to me
> > that this can be completely abstracted away from user interfaces as an
> > internal class
> >
> > Totally agree. No intention to add these as public APIs. Will update the
> > KIP to reflect this.
> >
> > > in the past the rationale for enforcing it at the
> > interface layer rather than do runtime checks is that it is more
> efficient.
> > > I'm not sure how much overhead it may incur to check if the key did not
> > change: if it is just a reference equality check maybe it's okay. What's
> > your take on this?
> >
> > Agree, reference equality should cover this validation and the overhead
> > impact should not be meaningful.
> > Will update the KIP to reflect this as well.
> >
> >
> > On Tue, 8 Feb 2022 at 19:05, Guozhang Wang  wrote:
> >
> > > Hello Jorge,
> > >
> > > Thanks for bringing this KIP! I think this is a nice idea to consider
> using
> > > a single overloaded function name for #process, just a couple quick
> > > questions after reading the proposal:
> > >
> > > 1) Does `ValueProcessorContext` have to be a public API? It seems to me
> > > that this can be completely abstracted away from user interfaces as an
> > > internal class, and we call the `setKey` before calling
> user-instantiated
> > > `process` function, and then in its overridden `forward` it can just
> check
> > > if the key changes or not.
> > > 2) Related to 1) above, in the past the rationale for enforcing it at
> the
> > > interface layer rather than do runtime checks is that it is more
> efficient.
> > > I'm not sure how much overhead it may incur to check if the key did not
> > > change: if it is just a reference equality check maybe it's okay.
> What's
> > > your take on this?
> > >
> > >
> > > Guozhang
> > >
> > > On Tue, Feb 8, 2022 at 5:17 AM Jorge Esteban Quilcate Otoya <
> > > quilcate.jo...@gmail.com> wrote:
> > >
> > > > Hi Dev team,
> > > >
> > > > I'd like to start a new discussion thread on Kafka Streams KIP-820:
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API
> > > >
> > > > This KIP is aimed to extend the current `KStream#process` API to
> return
> > > > output values that could be chained across the topology, as well as
> > > > introducing a new `KStream#processValues` to use processor while
> > > validating
> > > > keys haven't change and repartition is not required.
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > > Regards,
> > > > Jorge.
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
>
>

-- 
-- Guozhang


[jira] [Resolved] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and traffic

2022-02-08 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13310.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> ---
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> ThirdDebugLog1.png, ThirdDebugLog2.png, brokerCpu.png, brokerNetBytes.png, 
> kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group". See as below:
>  
> {code:java}
>  if (includeMetadataInTimeout) {
> // try to update assignment metadata BUT do not need to block on the 
> timer for join group
> updateAssignmentMetadataIfNeeded(timer, false);
> } else {
> while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), 
> true)) {
> log.warn("Still waiting for metadata");
> }
> }{code}
>  
>  
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to the method 
> joinGroupIfNeeded (final Timer timer) in AbstractCoordinator. See as below:
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMs is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map OffsetAndMetadata> offsets, Ti

Re: [DISCUSS] KIP-813 Shared State Stores

2022-02-08 Thread Guozhang Wang
Daan,

Thanks for the replies, those make sense to me.

On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis  wrote:

> I just updated the KIP to reflect the things discussed in this thread.
>
> As for your questions Guozhang:
>
> > 1) How do we handle if the num.partitions of app A's store changelog is
> > different from the num.tasks of app B's sub-topology with that read-only
> > store? Or are we going to let each task of B keep a whole copy of the
> store
> > of A by reading all of its changelog partitions, like global stores?
>
> Good question. Both need to be co-partitioned to have the data available.
> Another option would be to use IQ to make the request, but that seems far
> from ideal.
>
> > 2) Are we trying to synchronize the store updates from the changelog to
> app
> > B's processing timelines, or just like what we do for global stores that
> we
> > just update the read-only stores async?
>
> Pretty much the same as we do for global stores.
>
> > 3) If the answer to both of the above questions are the latter, then
> what's
> > the main difference of adding a read-only store v.s. adding a global
> store?
>
> I think because of the first answer the behavior differs from global
> stores.
>
> Makes sense?
>
> Cheers,
>
> D.
>
> From: Matthias J. Sax 
> Date: Thursday, 20 January 2022 at 21:12
> To: dev@kafka.apache.org 
> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> > Any processor that would use that materialized, read-only statestore
> would need to wait for the store to be restored. I can't find a way to make
> that possible since processors can't wait for the statestore to be restored.
>
> This is built into the runtime already. Nothing to worry about. It's
> part of the regular restore logic -- as long as any store is restoring,
> all processing is blocked.
>
> > Also, since the statestore would have logging disabled, it means there
> is no initial restoration going on.
>
> No. When we hookup the input topic as changelog (as the DSL does) we
> restore from the input topic during regular restore phase. The restore
> logic does not even know it's reading from the input topic, but not from
> a "*-changelog" topic).
>
> Disabling changelogging does only affect the write path (ie,
> `store.put()`) but not the restore path due to the internal "hookup" of
> the input topic inside the restore logic.
>
> It's not easy to find/understand by reverse engineering I guess, but
> it's there.
>
> One pointer where the actual hookup happens (might help to dig into it
> more if you want):
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356
>
>
> -Matthias
>
>
> On 1/20/22 10:04 AM, Guozhang Wang wrote:
> > Hello Daan,
> >
> > Thanks for writing the KIP. I just read through it and just my 2c here:
> to
> > me it seems that one of the goal would be to "externalize" the internal
> > changelog topic of an application (say A) so that other consumers can
> > directly read them --- though technically without any auth, anyone
> knowing
> > the topic name would be able to write to it too, conceptually we would
> just
> > assume that app A is the only writer of that topic --- The question I had
> > is how much we want to externalize the topic. For example we can,
> > orthogonally to this KIP, just allow users to pass in a customized topic
> > name when constructing a state store, indicating the application A to use
> > that as the changelog, and since that topic is created outside of A and
> is
> > publicly visible to anyone else on that cluster, anyone --- including any
> > consumers, or streams apps. This is probably most flexible as for
> sharing,
> > but we are even less assured that if application A is the only writer to
> > that external topic unless we have explicit auth for A on that topic.
> >
> > Aside of that, here are a few more detailed comments about the
> > implementation design itself following your current proposal:
> >
> > 1) How do we handle if the num.partitions of app A's store changelog is
> > different from the num.tasks of app B's sub-topology with that read-only
> > store? Or are we going to let each task of B keep a whole copy of the
> store
> > of A by reading all of its changelog partitions, like global stores?
> > 2) Are we trying to synchronize the store updates from the changelog to
> app
> > B's processing timelines, or just like what we do for global stores that
> we
> > just update the read-only stores async?
> > 3) If the 

Re: [DISCUSS] KIP-792: Add "generation" field into consumer protocol

2022-02-08 Thread Guozhang Wang
Hello Luke,

Thanks for the updated KIP, I've taken a look at it and still LGTM. Just a
couple minor comments in the wiki:

* Both `StickyAssignor` and `CooperativeStickyAssignor` that there's
already generation is encoded in user-data bytes, the difference is that
the `StickyAssignor`'s user bytes also encode the prev-owned partitions
while the `CooperativeStickyAssignor` relies on the prev-owned partitions
on the subscription protocol directly. So we can add the `StickyAssignor`
in your paragraph talking about `CooperativeStickyAssignor` as well.

* This sentence: "otherwise, we'll take the ownedPartitions as default
generation(-1)." does not read right to me, maybe need to rephrase a bit?


Guozhang

On Mon, Feb 7, 2022 at 7:36 PM Luke Chen  wrote:

> Hi David,
>
> Thanks for your comments.
> I've updated the KIP to add changes in Subscription class.
>
> Thank you.
> Luke
>
> On Fri, Feb 4, 2022 at 11:43 PM David Jacot 
> wrote:
>
> > Hi Luke,
> >
> > Thanks for updating the KIP. I just have a minor request.
> > Could you fully describe the changes to the Subscription
> > public class in the KIP? I think that it would be better than
> > just saying that the generation will be added to id.
> >
> > Otherwise, the KIP LGTM.
> >
> > Thanks,
> > David
> >
> > On Mon, Nov 29, 2021 at 4:29 AM Luke Chen  wrote:
> > >
> > > Hi devs,
> > > Welcome to provide feedback.
> > >
> > > If there are no other comments, I'll start a vote tomorrow.
> > >
> > > Thank you.
> > > Luke
> > >
> > >
> > > On Mon, Nov 22, 2021 at 4:16 PM Luke Chen  wrote:
> > >
> > > > Hello David,
> > > >
> > > > For (3):
> > > >
> > > >
> > > >
> > > > * I suppose that we could add a `generation` field to the
> > JoinGroupRequest
> > > > instead to do the fencing that you describe while handling the
> > sentinel in
> > > > the assignor directly. If we would add the `generation` to the
> request
> > > > itself, would we need the `generation` in the subscription protocol
> as
> > > > well?*
> > > >
> > > > On second thought, I think this is not better than adding
> `generation`
> > > > field in the subscription protocol, because I think we don't have to
> > do any
> > > > generation validation on joinGroup request. The purpose of
> > > > `joinGroupRequest` is to accept any members to join this group, even
> > if the
> > > > member is new or ever joined or what. As long as we have the
> > generationId
> > > > in the subscription metadata, the consumer lead can leverage the info
> > to
> > > > ignore the old ownedPartitions (or do other handling), and the
> > rebalance
> > > > can still complete successfully and correctly. On the other hand, if
> > we did
> > > > the generation check on JoinGroupRequest, and return
> > `ILLEGAL_GENERATION`
> > > > back to consumer, the consumer needs to clear its generation info and
> > > > rejoin the group to continue the rebalance. It needs more
> > request/response
> > > > network and slow down the rebalance.
> > > >
> > > > So I think we should add the `generationId` field into Subscription
> > > > protocol to achieve what we want.
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Thu, Nov 18, 2021 at 8:51 PM Luke Chen  wrote:
> > > >
> > > >> Hi David,
> > > >> Thanks for your feedback.
> > > >>
> > > >> I've updated the KIP for your comments (1)(2).
> > > >> For (3), it's a good point! Yes, we didn't deserialize the
> > subscription
> > > >> metadata on broker side, and it's not necessary to add overhead on
> > broker
> > > >> side. And, yes, I think we can fix the original issue by adding a
> > > >> "generation" field into `JoinGroupRequest` instead, and also add a
> > field
> > > >> into `JoinGroupResponse` in `JoinGroupResponseMember` field. That
> > way, the
> > > >> broker can identify the old member from `JoinGroupRequest`. And the
> > > >> assignor can also get the "generation" info via the `Subscription`
> > instance.
> > > >>
> > > >> I'll update the KIP to add "generation" field into
> `JoinGroupRequest`
> > and
> > > >> `JoinGroupResponse`, if there is no other options.
> > > >>
> > > >> Thank you.
> > > >> Luke
> > > >>
> > > >>
> > > >> On Tue, Nov 16, 2021 at 12:31 AM David Jacot
> > 
> > > >> wrote:
> > > >>
> > > >>> Hi Luke,
> > > >>>
> > > >>> Thanks for the KIP. Overall, I think that the motivation makes
> > sense. I
> > > >>> have a couple of comments/questions:
> > > >>>
> > > >>> 1. In the Public Interfaces section, it would be great if you could
> > put
> > > >>> the
> > > >>> end state not the current one.
> > > >>>
> > > >>> 2. Do we need to update the Subscription class to expose the
> > > >>> generation? If so, it would be great to mention it in the Public
> > > >>> Interfaces section as well.
> > > >>>
> > > >>> 3. You mention that the broker will set the generation if the
> > > >>> subscription
> > > >>> contains a sentinel value (-1). As of today, the broker does not
> > parse
> > > >>> the subscription so I am not sure how/why we would do this. I
> suppose
> > > >>> 

Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-02-08 Thread Guozhang Wang
Hello Jorge,

Thanks for bringing this KIP! I think this is a nice idea to consider using
a single overloaded function name for #process, just a couple quick
questions after reading the proposal:

1) Does `ValueProcessorContext` have to be a public API? It seems to me
that this can be completely abstracted away from user interfaces as an
internal class, and we call the `setKey` before calling user-instantiated
`process` function, and then in its overridden `forward` it can just check
if the key changes or not.
2) Related to 1) above, in the past the rationale for enforcing it at the
interface layer rather than do runtime checks is that it is more efficient.
I'm not sure how much overhead it may incur to check if the key did not
change: if it is just a reference equality check maybe it's okay. What's
your take on this?


Guozhang

On Tue, Feb 8, 2022 at 5:17 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Hi Dev team,
>
> I'd like to start a new discussion thread on Kafka Streams KIP-820:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API
>
> This KIP is aimed to extend the current `KStream#process` API to return
> output values that could be chained across the topology, as well as
> introducing a new `KStream#processValues` to use processor while validating
> keys haven't change and repartition is not required.
>
> Looking forward to your feedback.
>
> Regards,
> Jorge.
>


-- 
-- Guozhang


Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?

2022-02-08 Thread Guozhang Wang
We have a 4-month major release cadence in Apache Kafka, and given that
3.1.0 is just released, a tentative estimate for 3.2.0 would be around
early June.

Guozhang

On Tue, Feb 8, 2022 at 8:19 AM Jonathan Albrecht 
wrote:

>
>
>
> No problem, glad that was more clear and thanks for considering it. I
> totally
> understand if this is not the type of change to put into a point release.
>
> I'd like to understand the release schedule a bit more. Is 3.2 probably
> coming
> out in May?
>
> Thanks,
>
> Jon
>
> "Bruno Cadonna"  wrote on 2022-02-08 04:21:50 AM:
>
> > From: "Bruno Cadonna" 
> > To: dev@kafka.apache.org
> > Date: 2022-02-08 04:22 AM
> > Subject: [EXTERNAL] Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?
> >
> > Thank you Jonathan! Now I understand the situation better.
> >
> > I agree with Guozhang about the waiting times for 3.2.0, 3.0.1, and
> 3.1.1.
> >
> > Obviously that does not satisfy the organizational requirements you
> > mentioned.
> >
> > Best,
> > Bruno
> >
> > On 07.02.22 23:24, Guozhang Wang wrote:
> > > Thanks for the clarification Jonathan, I think timing wise, waiting to
> get
> > > 3.2.0 v.s. get 3.1.1 / 3.0.1 would not be much longer.
> > >
> > > On Mon, Feb 7, 2022 at 12:16 PM Jonathan Albrecht
> 
> > > wrote:
> > >
> > >>
> > >>
> > >>
> > >> Hi Bruno and Guozhang,
> > >>
> > >> Thanks for your thoughtful replies and I hope I can clarify. I work on
> > >> a team that helps port open source software to s390x. The example I
> used
> > >> about users needing to be on a specific minor release is a general one
> > >> that I see and not specific to kafka and usually its not due to
> technical
> > >> reasons. For example, sometimes organizations want to support the same
> > >> version of a component across platforms and back porting support for
> > >> s390x mean they can get to a supported version sooner. Maybe in
> kafka's
> > >> case, there are not technical reasons for that to happen but sometimes
> > >> there are organizational reasons.
> > >>
> > >> Hope that clarifies where I'm coming from. I definitely don't have a
> > >> specific technical issue that would be solved by back porting. I
> > >> understand there are risks and it might not be appropriate for a
> bugfix
> > >> release which is why I wanted to ask first before going any further.
> > >>
> > >> Thanks,
> > >>
> > >> Jon
> > >>
> > >> "Bruno Cadonna"  wrote on 2022-02-07 05:30:05 AM:
> > >>
> > >>> From: "Bruno Cadonna" 
> > >>> To: dev@kafka.apache.org
> > >>> Date: 2022-02-07 05:30 AM
> > >>> Subject: [EXTERNAL] Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?
> > >>>
> > >>> Hi Jonathan and Guozhang,
> > >>>
> > >>> I am pretty sure that upgrading RocksDB in 3.0.x and 3.1.x should not
> be
> > >>> an issue, since we recently upgraded it to 6.27.3 on trunk and
> judging
> > >>> from the compatibility report in the ticket [1] and the code the API
> > >>> does not break backward-compatibility for the AK 3.x series.
> > >>>
> > >>> I am wondering, why users would not be willing to use the upcoming
> 3.2.0
> > >>> if they are migrating or creating a Kafka Streams application on the
> > >>> s390x platform. Even if we ported the upgrade to 3.1.1 and 3.0.1,
> they
> > >>> would need to wait until those versions are released. Additionally,
> > >>> their applications need to comply with 3.x APIs and APIs should be
> > >>> backward compatible in the 3.x releases. So, everything that works
> with
> > >>> 3.1.1 and 3.0.1 should also work with 3.2.0. Of course there could be
> an
> > >>> issue in 3.2.0 that will force them to use a 3.1.1 or 3.0.1, but that
> > >>> would be rather an exception that we can then fix when the exception
> > >>> happens.
> > >>>
> > >>> Jonathan, could you clarify what the motivation of using hypothetical
> > >>> 3.1.1 or 3.0.1 with the RocksDB upgrade instead of 3.2.0 on a
> platform
> > >>> that was not supported before (i.e., 3.1.0 and 3.0.0) might be?
> > >>>
> > >>> In the end, it is always a risk to upgrade a library in a bugfix
> release
> > >>> without a critical issue that the upgrade fixes.
> > >>>
> > >>>
> > >>> Best,
> > >>> Bruno
> > >>>
> > >>>
> > >>> [1]
> > >>
> https://issues.apache.org/jira/browse/KAFKA-13599
>
> > >>
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-816: Topology changes without local state reset

2022-02-07 Thread Guozhang Wang
ms
> > > won't be able to find the local store files anymore. IIUC,
> > > the changelog topic will still be fine, so Streams would
> > > just allocate a new state directory in the new task name and
> > > restore the changelog into it.
> > >
> > > So, I think all this KIP is after is a way to preserve the
> > > local state files of a named store in the face of task
> > > renumbering. That's not to say that there's not some overlap
> > > with the NamedTopologies work, or that there's no value in
> > > being able to automatically reuse unnamed stores. But it
> > > probably makes sense to let Nick fix this one specific
> > > problem instead of coupling it to other large-scale
> > > engineering projects.
> > >
> > > Regarding the KIP itself:
> > >
> > > (A) is quite clean, but it does make it more challenging to
> > > clean up state when tasks migrate to other nodes. If that's
> > > the only problem, then I agree this is probably the best
> > > solution.
> > >
> > > (B) also makes a lot of sense to me, and I actually don't
> > > think it's a hack. It might also be useful for detecting
> > > when a topology has changed unexpectedly, for example. On
> > > the other hand, to safely move a state directory from one
> > > task directory to the other, we have to be sure no other
> > > thread is using either directory. To do that, we could
> > > either perform the operation in `KafkaStreams.start()`
> > > before any threads are started (we already know the topology
> > > at this point), or we can try to grab the directory locks on
> > > both tasks (but that sounds like a recipe for deadlock).
> > >
> > > In a nutshell, I'm supportive of this KIP, and I'd sugest we
> > > do a little more discovery on the implications of dropping
> > > the task level of the directory hierarchy before committing
> > > to A. And/or be a little more specific about how we can
> > > safely move state directories around before committing to B.
> > >
> > > Thanks again!
> > > -John
> > >
> > > On Fri, 2022-02-04 at 11:09 -0800, Guozhang Wang wrote:
> > > > Hi folks,
> > > >
> > > > I think the NamedTopology work would help with the convenience of the
> > > > solution for this KIP, but I feel it is not by itself the solution
> here.
> > > If
> > > > I'm not mistaken, the scope of this KIP is trying to tackle that,
> > > *assuming
> > > > the developer already knows* a new topology or part of the topology
> e.g.
> > > > like a state store of the topology does not change, then how to
> > > effectively
> > > > be able to reuse that part of the topology. Today it is very hard to
> > > reuse
> > > > part (say a state store, an internal topic) of a previous topology's
> > > > persistent state because:
> > > >
> > > > 1) the names of those persistent states are prefixed by the
> application
> > > id.
> > > > 2) the names of those persistent states are suffixed by the index,
> which
> > > > reflects the structure of the topology.
> > > > 3) the dir path of the persistent states are "prefixed" by the task
> id,
> > > > which is hence dependent on the sub-topology id.
> > > >
> > > > My quick thoughts are that 1) is easy to go around as long as users
> reuse
> > > > the same appId, 3) can be tackled with the help of the named
> topology but
> > > > each named topology can still be composed of multiple sub-topologies
> so
> > > > extra work is still needed to align the sub-topology ids, but we
> still
> > > need
> > > > something to tackle 2) here, which I was pondering between those
> options
> > > > and at the moment leaning towards option 2).
> > > >
> > > > Does that make sense to you?
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Feb 4, 2022 at 4:38 AM Nick Telford 
> > > wrote:
> > > >
> > > > > Hi Guozhang, Sophie,
> > > > >
> > > > > Thanks for both taking the time to review my proposal.
> > > > >
> > > > > I did actually see the NamedTopology classes, and noted that they
> were
> > > > > internal. I didn't realise they are 

Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?

2022-02-07 Thread Guozhang Wang
Thanks for the clarification Jonathan, I think timing wise, waiting to get
3.2.0 v.s. get 3.1.1 / 3.0.1 would not be much longer.

On Mon, Feb 7, 2022 at 12:16 PM Jonathan Albrecht 
wrote:

>
>
>
> Hi Bruno and Guozhang,
>
> Thanks for your thoughtful replies and I hope I can clarify. I work on
> a team that helps port open source software to s390x. The example I used
> about users needing to be on a specific minor release is a general one
> that I see and not specific to kafka and usually its not due to technical
> reasons. For example, sometimes organizations want to support the same
> version of a component across platforms and back porting support for
> s390x mean they can get to a supported version sooner. Maybe in kafka's
> case, there are not technical reasons for that to happen but sometimes
> there are organizational reasons.
>
> Hope that clarifies where I'm coming from. I definitely don't have a
> specific technical issue that would be solved by back porting. I
> understand there are risks and it might not be appropriate for a bugfix
> release which is why I wanted to ask first before going any further.
>
> Thanks,
>
> Jon
>
> "Bruno Cadonna"  wrote on 2022-02-07 05:30:05 AM:
>
> > From: "Bruno Cadonna" 
> > To: dev@kafka.apache.org
> > Date: 2022-02-07 05:30 AM
> > Subject: [EXTERNAL] Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?
> >
> > Hi Jonathan and Guozhang,
> >
> > I am pretty sure that upgrading RocksDB in 3.0.x and 3.1.x should not be
> > an issue, since we recently upgraded it to 6.27.3 on trunk and judging
> > from the compatibility report in the ticket [1] and the code the API
> > does not break backward-compatibility for the AK 3.x series.
> >
> > I am wondering, why users would not be willing to use the upcoming 3.2.0
> > if they are migrating or creating a Kafka Streams application on the
> > s390x platform. Even if we ported the upgrade to 3.1.1 and 3.0.1, they
> > would need to wait until those versions are released. Additionally,
> > their applications need to comply with 3.x APIs and APIs should be
> > backward compatible in the 3.x releases. So, everything that works with
> > 3.1.1 and 3.0.1 should also work with 3.2.0. Of course there could be an
> > issue in 3.2.0 that will force them to use a 3.1.1 or 3.0.1, but that
> > would be rather an exception that we can then fix when the exception
> > happens.
> >
> > Jonathan, could you clarify what the motivation of using hypothetical
> > 3.1.1 or 3.0.1 with the RocksDB upgrade instead of 3.2.0 on a platform
> > that was not supported before (i.e., 3.1.0 and 3.0.0) might be?
> >
> > In the end, it is always a risk to upgrade a library in a bugfix release
> > without a critical issue that the upgrade fixes.
> >
> >
> > Best,
> > Bruno
> >
> >
> > [1]
> https://issues.apache.org/jira/browse/KAFKA-13599
>
> >
> > On 07.02.22 04:19, Guozhang Wang wrote:
> > > Hi Jonathan,
> > >
> > > I'm not against the idea of upgrading in 3.0.x and 3.1.x, assuming that
> the
> > > v6.27.3 version does not make any API or any semantic behavioral
> changes.
> > > But I can only speak for myself, not the whole community. For older
> > > versions as Bruno mentioned since there's compatibility issues we
> cannot
> > > upgrade RocksDB any more.
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Feb 3, 2022 at 1:56 PM Jonathan Albrecht
> 
> > > wrote:
> > >
> > >>
> > >>
> > >> Thanks Guozhang, yes the motivation is to support the s390x platform.
> It's
> > >> not a critical bug for other platforms.
> > >>
> > >> Any chance that gaining platform support is also a valid reason? I was
> > >> hoping it would be but I won't submit a PR if it isn't.
> > >>
> > >> Thanks,
> > >>
> > >> Jon
> > >>
> > >> "Guozhang Wang"  wrote on 2022-02-03 02:14:34 PM:
> > >>
> > >>> From: "Guozhang Wang" 
> > >>> To: "dev" 
> > >>> Date: 2022-02-03 02:15 PM
> > >>> Subject: [EXTERNAL] Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?
> > >>>
> > >>> Hello Jonathan,
> > >>>
> > >>> I think Bruno's point is that we can only upgrade in the bugfix
> releases if
> > >>> the old version of rocksDB has a critical bug that the new version
> would
> > >>> fix. For 6.27.3 

Re: [DISCUSS] Apache Kafka 3.0.1

2022-02-07 Thread Guozhang Wang
+1, thanks Mickael!

On Mon, Feb 7, 2022 at 9:27 AM Bruno Cadonna  wrote:

> +1 Thank you!
>
> Best,
> Bruno
>
> On 07.02.22 18:24, Ismael Juma wrote:
> > Thanks Mickael, +1.
> >
> > Ismael
> >
> > On Mon, Feb 7, 2022 at 9:17 AM Mickael Maison 
> wrote:
> >
> >> Hi,
> >>
> >> I'd like to volunteer to be the release manager for the next bugfix
> >> release, 3.0.1.
> >>
> >> Thanks,
> >> Mickael
> >>
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-06 Thread Guozhang Wang
Hello Seung-chan,

Thanks for the KIP writeup and summary! I made a pass on it and want to
share some of my thoughts:

On the very high level, we want to be able to effectively differentiate
several cases as follows:

1) There's a network partition / soft failure hence clients cannot reach
the broker, temporarily: here we want to give some time to see if the
clients can reconnect back, and hence the timeout makes sense.
2) The client is being bounced, i.e. a shutdown followed by a restart: here
we do not want to trigger any rebalance, but today we can only hope that
the timeout is long enough to cover that bounce window.
3) The client is shutdown and won't be back: here we want to trigger the
rebalance immediately, but today we'd have to wait for the timeout value.

Today we use the timeout to try to tackle all three cases, but ideally we
want the client to submit extra information to help distinguish them. I.e.
we just use timeout for case 1) only, while we use separate mechanisms to
differentiate 2) and 3) from it. Personally I think we could consider
having an augmented leave-group (or maybe in the long run, we can merge
that RPC as part of heartbeat) with a flag indicating 2) or 3), while just
relying on the timeout for case 1).

But to consider a narrower scope for this KIP that does not touch on
protocol changes, I think just differentiate 2/3) by not sending
leave-group for 2) v.s. sending leave-group for 3) is sufficient.

As for the KIP itself, I have a few minor comments:

1. Regarding the API change, I feel just doing that on the streams side is
not enough since by the end of the day we still need the consumer to
incorporate it (today it's via a static config and hence we cannot just
dynamically change the config).

2. Regarding the API itself, I want to see a more concrete proposal that
contains the full signature, e.g. does".closeAndLeaveGroup()" include the
timeout param as well, etc? My very subjective preference is to not
differentiate by the function name in case we may want to augment the close
function in the future, which would explode the function names :P Instead
maybe we can just overload the `close()` function again but with a control
object, that includes 1) timeout, 2) leave-group flag, and hence can also
extend to include more variables just in case. Would like to hear others'
thoughts as well.


Guozhang


On Wed, Jan 12, 2022 at 5:51 AM Seung-chan Ahn 
wrote:

> Hi team,
>
> Here's the new KIP
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group
> >
>  for this issue .
>
> The title says pretty much what this KIP is for. Even though it's my first
> draft, as A. Sophie Blee-Goldman has written rich descriptions and already
> the solutions in the issue thread, I've enjoyed following up on the idea.
>
> Please feel free to review on any point!
>


-- 
-- Guozhang


Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?

2022-02-06 Thread Guozhang Wang
Hi Jonathan,

I'm not against the idea of upgrading in 3.0.x and 3.1.x, assuming that the
v6.27.3 version does not make any API or any semantic behavioral changes.
But I can only speak for myself, not the whole community. For older
versions as Bruno mentioned since there's compatibility issues we cannot
upgrade RocksDB any more.


Guozhang

On Thu, Feb 3, 2022 at 1:56 PM Jonathan Albrecht 
wrote:

>
>
> Thanks Guozhang, yes the motivation is to support the s390x platform. It's
> not a critical bug for other platforms.
>
> Any chance that gaining platform support is also a valid reason? I was
> hoping it would be but I won't submit a PR if it isn't.
>
> Thanks,
>
> Jon
>
> "Guozhang Wang"  wrote on 2022-02-03 02:14:34 PM:
>
> > From: "Guozhang Wang" 
> > To: "dev" 
> > Date: 2022-02-03 02:15 PM
> > Subject: [EXTERNAL] Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?
> >
> > Hello Jonathan,
> >
> > I think Bruno's point is that we can only upgrade in the bugfix releases
> if
> > the old version of rocksDB has a critical bug that the new version would
> > fix. For 6.27.3 it seems not fixing a critical bug but for a new feature,
> > and hence unless we change the policy we cannot upgrade in 3.0.1 / 3.1.1
> > releases.
> >
> >
> > Guozhang
> >
> > On Thu, Feb 3, 2022 at 9:22 AM Jonathan Albrecht
> 
> > wrote:
> >
> > >
> > >
> > >
> > > Thanks for the info Bruno. In that case, if no other concerns, I'll try
> > > updating RocksDB to v6.27.3 on the 3.0 and 3.1 branches and file issues
> and
> > > PRs if everything looks good.
> > >
> > > Jon
> > >
> > >
> > > "Bruno Cadonna"  wrote on 2022-02-03 10:40:00 AM:
> > >
> > > > From: "Bruno Cadonna" 
> > > > To: dev@kafka.apache.org
> > > > Date: 2022-02-03 10:40 AM
> > > > Subject: [EXTERNAL] Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?
> > > >
> > > > Hi Jonathan,
> > > >
> > > > We had to wait until AK 3.0 to upgrade RocksDB to 6.19.3 due to
> source
> > > > compatibility issue. More specifically, we expose RocksDB APIs in
> Kafka
> > > > Streams for configuring RocksDB and those RocksDB APIs changed. So
> > > > upgrading RocksDB was actually a compatibility breaking change. We
> had
> > > > to wait for the major release 3.0.0 to make the upgrade. That means,
> if
> > > > the policy allows to upgrade dependencies in bugfix releases we can
> only
> > > > upgrade RocksDB in bugfix releases for 3.1 and 3.0. Upgrading RocksDB
> in
> > > > earlier releases would break compatibility.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On 03.02.22 15:15, Jonathan Albrecht wrote:
> > > > >
> > > > >
> > > > > The rocksdbjni dependency has been upgraded to v6.27.3 on trunk and
> I
> > > > > wanted to ask if it would be ok to also upgrade it to v6.27.3 on
> the
> > > > > 3.1
> > > > > branch (and possibly earlier branches). I thought I should ask in
> case
> > > > > there are some policies around changing dependency versions in
> point
> > > > > releases.
> > > > >
> > > > > The motivation is that this is the first version of rocksdbjni that
> > > > > supports s390x and it allows kafka to be built out of the box on
> this
> > > > > platform. Having this support in earlier releases helps users on
> s390x
> > > > > that
> > > > > may need a specific minor release.
> > > > >
> > > > > If upgrading earlier releases is ok, how far back would be
> reasonable?
> > > > > I'm
> > > > > happy to create the issues and PRs and do the local testing, of
> course.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jonathan Albrecht
> > > > > Advisory Software Developer
> > > > > Linux on IBM Z Open Source Ecosystem
> > > > > 1 905 413 3577 Office
> > > > > jonathan.albre...@ibm.com
> > > > >
> > > > > IBM
> > > > >
> > >
> >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-818: Introduce cache-size-bytes-total Task Level Metric

2022-02-06 Thread Guozhang Wang
Since the PR is reopened and we are going to re-merged the fixed PRs, what
about just adding that as part of the KIP as the addendum?

On Fri, Feb 4, 2022 at 2:13 AM Sagar  wrote:

> Thanks Sophie/Guozhang.
>
> Yeah I could have amended the KIP but it slipped my mind when Guozhang
> proposed this in the PR. Later on, the PR was merged and KIP was marked as
> adopted so I thought I will write a new one. I know the PR had been
> reopened now :p . I dont have much preference on a new KIP v/s the original
> one so anything is ok with me as well.
>
> I agree with the INFO part. I will make that change.
>
> Regarding task level, from my understanding, since every task's
> buffer/cache size might be different so if a certain task might be
> overshooting the limits then the task level metric might help people to
> infer this. Also, thanks for the explanation Guozhang on why this should be
> a task level metric. What are your thoughts on this @Sophie?
>
> Thanks!
> Sagar.
>
>
> On Fri, Feb 4, 2022 at 4:47 AM Guozhang Wang  wrote:
>
> > Thanks Sagar for proposing the KIP, and Sophie for sharing your thoughts.
> > Here're my 2c:
> >
> > I think I agree with Sophie for making the two metrics (both the added
> and
> > the newly proposed) on INFO level since we are always calculating them
> > anyways. Regarding the level of the cache-size though, I'm thinking a bit
> > different with you two: today we do not actually keep that caches on the
> > per-store level, but rather on the per-thread level, i.e. when the cache
> is
> > full we would flush not only on the triggering state store but also
> > potentially on other state stores as well of the task that thread owns.
> > This mechanism, in hindsight, is a bit weird and we have some discussions
> > about refactoring that in the future already. Personally I'd like to make
> > this new metric to be aligned with whatever our future design will be.
> >
> > In the long run if we would not have a static assignment from tasks to
> > threads, it may not make sense to keep a dedicated cache pool per thread.
> > Instead all tasks will be dynamically sharing the globally configured max
> > cache size (dynamically here means, we would not just divide the total
> size
> > by the num.tasks and then assign that to each task), and when a cache put
> > triggers the flushing because the sum now exceeds the global configured
> > value, we would potentially flush all the cached records for that task.
> If
> > this is the end stage, then I think keeping this metric at the task level
> > is good.
> >
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Thu, Feb 3, 2022 at 10:15 AM Sophie Blee-Goldman
> >  wrote:
> >
> > > Hey Sagar,  thanks for the KIP!
> > >
> > > And yes, all metrics are considered part of the public API and thus
> > require
> > > a KIP to add (or modify, etc...) Although in this particular case, you
> > > could probably make a good case for just considering it as an update to
> > the
> > > original KIP which added the analogous metric
> `input-buffer-bytes-total`.
> > > For  things like this that weren't considered during the KIP proposal
> but
> > > came up during the implementation or review, and are small changes that
> > > would have made sense to include in that KIP had they been thought of,
> > you
> > > can just send an update to the existing KIP's discussion and.or voting
> > > thread that explains what you want to add or modify and maybe a brief
> > > description why.
> > >
> > > It's always ok to make a new KIP when in doubt, but there are some
> cases
> > > where an update email is sufficient. If there are any concerns or
> > > suggestions that significantly expand the scope of the update, you can
> > > always go create a new KIP and move the discussion there.
> > >
> > > I'd say you can feel free to proceed in whichever way you'd prefer for
> > this
> > > new proposal -- it just needs to appear in some KIP somewhere, and have
> > > given the community thew opportunity to discuss and provide feedback
> on.
> > >
> > > On that note, I do have two suggestions:
> > >
> > > 1)  since we need to measure the size of the cache (and the input
> > buffer(s)
> > > anyways, we may as well make `cache-size-bytes-total` -- and also the
> new
> > > input-buffer-bytes-total -- an INFO level metric. In general the more
> > > metrics the merrier, the only real reason for disabling some are if
> they
> &g

[jira] [Resolved] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)

2022-02-06 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13563.
---
Fix Version/s: 3.2.0
   3.1.1
   Resolution: Fixed

> FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
> ---
>
> Key: KAFKA-13563
> URL: https://issues.apache.org/jira/browse/KAFKA-13563
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.1, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.2.0, 3.1.1
>
> Attachments: kafka.zip
>
>
> In KAFKA-10793, we fix the race condition when lookup coordinator by clearing 
> the _findCoordinatorFuture_ when handling the result, rather than in the 
> listener callbacks. It works well under consumer group mode (i.e. 
> Consumer#subscribe), but we found when user is using non consumer group mode 
> (i.e. Consumer#assign) with group id provided (for offset commitment, so that 
> there will be consumerCoordinator created), the _findCoordinatorFuture_ will 
> never be cleared in some situations, and cause the offset committing keeps 
> getting NOT_COORDINATOR error.
>  
> After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places:
>  # heartbeat thread
>  # AbstractCoordinator#ensureCoordinatorReady
> But in non consumer group mode with group id provided, there will be no 
> (1)heartbeat thread , and it only call 
> (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to 
> fetch committed offset position. That is, after 2nd lookupCoordinator call, 
> we have no chance to clear the _findCoordinatorFuture_ .
>  
> To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear 
> the _findCoordinatorFuture_ in the future listener. So, I think we can fix 
> this issue by calling AbstractCoordinator#ensureCoordinatorReady when 
> coordinator unknown in non consumer group case, under each Consumer#poll.
>  
> Reproduce steps:
>  
> 1. Start a 3 Broker cluster with a Topic having Replicas=3.
> 2. Start a Client with Producer and Consumer (with Consumer#assign(), not 
> subscribe, and provide a group id) communicating over the Topic.
> 3. Stop the Broker that is acting as the Group Coordinator.
> 4. Observe successful Rediscovery of new Group Coordinator.
> 5. Restart the stopped Broker.
> 6. Stop the Broker that became the new Group Coordinator at step 4.
> 7. Observe "Rediscovery will be attempted" message but no "Discovered group 
> coordinator" message.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Should we automatically close stale PRs?

2022-02-06 Thread Guozhang Wang
Thanks for bringing this up David. I'm in favor of some automatic ways to
clean up stale PRs. More specifically:

* I think there are indeed many root causes why we have so many stale PRs
that we should consider, and admittedly the reviewing manpower cannot keep
up with the contributing pace is a big one of them. But in this discussion
I'd personally like to keep this out of the scope and maybe keep it as a
separate discussion (I think we are having some discussions on some of
these root causes in parallel at the moment).

* As for just how to handle the existing stale PRs, I think having an
automatic way would be possibly the most effective manner, as I suspect how
maintainable it would be to do that manually. The question though would be:
do we just automatically close those PRs silently or should we also send
notifications along with it. It seems https://github.com/actions/stale can
definitely do the first, but not sure if it could the second? Plus let's
say if we want notifications and it's doable via Action, could we configure
just the committers list (as sending notifications to all community
subscribers may be too spammy)? Personally I feel setting 6 months for
closing and notifying committers on a per-week basis seems sufficient.


Guozhang


On Sun, Feb 6, 2022 at 9:58 AM Matthias J. Sax  wrote:

> I am +1 to close stale PRs -- not sure to what extend we want to
> automate it, or just leave it up to the committers to do the cleanup
> manually. I am happy both ways.
>
> However, I also want to point out, that one reason why we have so many
> stale PRs is the committer overload to actually review PRs. It's a pity
> that committer cannot keep up with the load (guilty as charged myself).
> Not sure if it would help if more contributors could help doing reviews,
> such that PRs are "pre-reviewed" and already in good shape before a
> committer reviews it?
>
>
>
>
> For KIPs, there is actually two more categories:
>
> - "Dormant/Inactive"
> - "Discarded:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-DiscardedKIPs
>
> For Kafka Streams in particular, we also try to make the status of KIP
> clear in the corresponding sub-page:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams
>
> I might be useful thought to just do a better job to update KIP status
> more frequently -- we could also re-organize the main KIP wiki page -- I
> think it contains too much information and is hard to read.
>
> For the Kafka Streams sub-page, we use it for all "active" KIPs, while
> we maintain a second page for adopted KIPs grouped by release:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+KIP+Overview
>
> I find this much more digestible compared to the main KIP page.
>
> Might also be good to have a sub-page for Connect KIPs?
>
>
> -Matthias
>
>
> On 2/5/22 05:57, Luke Chen wrote:
> > Hi Nikolay,
> >
> > That's a good question!
> > But I think for stale KIP, we should have another discussion thread.
> >
> > In my opinion, I agree we should also have similar mechanism for KIP.
> > Currently, the state of KIP has "under discussion", "voting", and
> > "accepted".
> > The KIP might stay in "discussion" or "voting" state forever.
> > We might be able to have a new state called "close" for KIP.
> > And we can review those inactive KIPs for a long time like PR did, to see
> > if these KIPs need to close or re-start the discussion again.
> >
> > Thank you.
> > Luke
> >
> > On Sat, Feb 5, 2022 at 9:23 PM Nikolay Izhikov 
> wrote:
> >
> >> Hello, David, Luke.
> >>
> >> What about KIPs?
> >> Should we have some special state on KIPs that was rejected or can’t be
> >> implemented due to lack of design or when Kafka goes in another
> direction?
> >> Right now those kind of KIPs just have no feedback.
> >> For me as a contributor it’s not clear - what is wrong with the KIP.
> >>
> >> Is it wrong? Is there are no contributor to do the implementation?
> >>
> >>> 5 февр. 2022 г., в 15:49, Luke Chen  написал(а):
> >>>
> >>> Hi David,
> >>>
> >>> I agree with it! This is also a good way to let both parties (code
> author
> >>> and reviewers) know there's a PR is not active anymore. Should we
> >> continue
> >>> it or close it directly?
> >>>
> >>> In my opinion, 1 year is too long, half a year should be long enough.
> >>>
> >>> Thank you.
> >>> Luke
> >>>
> >>> On Sat, Feb 5, 2022 at 8:17 PM Sagar 
> wrote:
> >>>
>  Hey David,
> 
>  That's a great idea.. Just to stress your point, this keeps both
> parties
>  informed if a PR has become stale. So, the reviewer would know that
> >> there
>  was some PR which was being reviewed but due to inactivity it got
> >> closed so
>  maybe time to relook and similarly the submitter.
> 
>  And yeah, any stale/unused PRs can be closed straight away thereby
> >> reducing
>  the load on reviewers. I have done some work on kubernetes open source
> >> and
>  they follow a 

[jira] [Resolved] (KAFKA-13346) Kafka Streams fails due to RocksDB Locks Not Available Exception

2022-02-04 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13346.
---
Resolution: Not A Problem

> Kafka Streams fails due to RocksDB Locks Not Available Exception
> 
>
> Key: KAFKA-13346
> URL: https://issues.apache.org/jira/browse/KAFKA-13346
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Amit Gupta
>Priority: Major
>
> Hello,
> We are using Kafka Streams and we observe that some times on some of the 
> hosts running streams application, Kafka streams instance fails with 
> unexpected exception. We are running with 40 stream threads per host and 20 
> hosts in total.
> Can some one please help on what can be the root cause here?
>  
> |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> state-store at location .
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:199)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:76)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:95)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:426)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:660)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  ~[kafka-streams-2.6.0.jar:?]
>  Caused by: org.rocksdb.RocksDBException: lock : 
> ./0_468/rocksdb/state-store/LOCK: No locks available
>  at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.18.3.jar:?]
>  at org.rocksdb.RocksDB.open(RocksDB.java:286) ~[rocksdbjni-5.18.3.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:211)
>  ~[kafka-streams-2.6.0.jar:?]
>  ... 15 more
>   
>  Some times I also see this exception
>   |
> |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> state-store at location ./0_433/rocksdb/state-store
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kaf

Re: [DISCUSS] Apache Kafka 3.2.0 release

2022-02-04 Thread Guozhang Wang
Thanks Bruno! +1

On Fri, Feb 4, 2022 at 4:14 PM Ismael Juma  wrote:

> Thanks for volunteering, Bruno. +1!
>
> Ismael
>
> On Fri, Feb 4, 2022 at 7:03 AM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > I'd like to volunteer to be the release manager for our next
> > feature release, 3.2.0. If there are no objections, I'll send
> > out the release plan soon.
> >
> > Best,
> > Bruno
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-816: Topology changes without local state reset

2022-02-04 Thread Guozhang Wang
; > implementation
> > completed -- but so far it's only exposed via internal APIs and hasn't
> been
> > brought to a KIP
> > yet, as it's a fairly large and complex project and I wanted to get all
> the
> > details hashed out
> > before settling on a public API.
> >
> > For some sense of how complicated it's been, you can check out the JIRA
> > ticket we've been
> > filing PRs under -- there are already 25 PRs to the feature. See
> > KAFKA-12648
> > <https://issues.apache.org/jira/browse/KAFKA-12648>. You can check
> > out the new KafkaStreamsNamedTopologyWrapper class to see what the
> current
> > API looks like
> > -- I recommend taking a look to see if this might cover some or all of
> the
> > things you wanted
> > this KIP to do.
> >
> > For a high-level sketch, my work introduces the concept of a
> > "NamedTopology" (which will be
> > renamed to "ModularTopology" in the future, but is still referred to as
> > "named" in the codebase
> > so I'll keep using it for now) . Each KafkaStreams app can execute
> multiple
> > named topologies,
> > which are just regular topologies that are given a unique name. The
> > essential feature of a
> > named topology is that it can be dynamically added or removed without
> even
> > stopping the
> > application, much less resetting it. Technically a NamedTopology can be
> > composed or one
> > or more subtopologies, but if you want to be able to update the
> application
> > at a subtopology
> > level you can just name each  subtopology.
> >
> > So I believe the feature you want is actually already implemented, for
> the
> > most part -- it's currently
> > missing a few things that I just didn't bother to implement yet since
> I've
> > been focused
> > on getting a working, minimal POC that I could use for testing. (For
> > example it doesn't yet
> > support global state stores) But beyond that, the only remaining work to
> > make this feature
> > available is to settle on the APIs, get a KIP passed, and implement said
> > APIs.
> >
> > Would you be interested in helping out with the NamedTopology work so we
> > can turn it into a
> > a full-fledged public feature? I'm happy to let you take the lead on the
> > KIP, maybe by adapting
> > this one if you think it makes sense to do so. The NamedTopology feature
> is
> > somewhat larger
> > in scope than strictly necessary for your purposes, however, so you could
> > take on just a part
> > of it and leave anything beyond that for me to do as followup.
> >
> > By the way: one advantage of the NamedTopology feature is that we don't
> > have to worry about
> > any compatibility issues or upgrade/migration path -- it's opt-in by
> > definition. (Of course we would
> > recommend using it to all users, like we do with named operators)
> >
> > Let me know what you think and how you want to proceed from here -- I
> > wouldn't want you to
> > spend time re-implementing more or less the same thing, but I most likely
> > wasn't going to find time
> > to put out a KIP for the NamedTopology feature in the near future. If you
> > would be able to help
> > drive this to completion, we'd each have significantly less work to do to
> > achieve our goals :)
> >
> > Cheers,
> > Sophie
> >
> >
> > On Thu, Feb 3, 2022 at 6:12 PM Guozhang Wang  wrote:
> >
> > > Hello Nick,
> > >
> > > Thanks for bringing this up and for the proposed options. I read though
> > > your writeup and here are some of my thoughts:
> > >
> > > 1) When changing the topology of Kafka Streams, the developer need to
> > first
> > > decide if the whole topology's persisted state (including both the
> state
> > > store as well as its changelogs, and the repartition topics, and the
> > > source/sink external topics) or part of the persisted state can be
> > reused.
> > > This involves two types of changes:
> > >
> > > a) structural change of the topology, such like a new processor node is
> > > added/removed, a new intermediate topic is added/removed etc.
> > > b) semantic change of a processor, such as a numerical filter node
> > changing
> > > its filter threshold etc.
> > >
> > > Today both of them are more or less determined by developers manually.
> > > However, though automatically determining on changes of type b) is hard
> > if
> > > not possible, automatic determini

Re: [VOTE] KIP-591: Add Kafka Streams config to set default state store

2022-02-03 Thread Guozhang Wang
to be
> >> >> in memory? Well, you're out of luck.
> >> >>
> >> >> Therefore, I think there's significant value in modifying
> >> >> the DSL to allow users to orthogonally specify the storage
> >> >> engine and the name of the store, as in your KIP as written.
> >> >>
> >> >> On the other hand, I don't see how the latter of these is
> >> >> more compelling than the former:
> >> >> .count(Materialized.as(Stores.inMemoryKeyValueStore("count-
> >> >> store")));
> >> >> .count(Materialized.as(Stores.keyValueStoreSupplier(StoreTyp
> >> >> e.IN_MEMORY, "count-store")));
> >> >>
> >> >>
> >> >> Regardless, I don't want to let perfect be the enemy of
> >> >> good. Like I said, I think that the key benefit you're
> >> >> really going for is the config, so maybe you want to just
> >> >> drop the Materialize/Stores aspect and simplify the
> >> >> proposal. Or if you want to keep the latter, I'm fine with
> >> >> whatever approach you feel is best (which is why I still
> >> >> voted).
> >> >>
> >> >> This feels like the kind of thing that won't really be
> >> >> crystal clear until the PR is under review (and I'd
> >> >> encourage you and the reviewer to pay particular attention
> >> >> to how the new APIs actually look when used in the tests).
> >> >>
> >> >> Thanks again! People have been asking for this for a long
> >> >> time.
> >> >> -John
> >> >>
> >> >>
> >> >> On Fri, 2022-01-28 at 13:46 -0800, Guozhang Wang wrote:
> >> >>> Hi Luke,
> >> >>>
> >> >>> I'm in favor of using the newly proposed
> `#sessionStore(StoreType..)`
> >> and
> >> >>> deprecating the existing `#persistenSessionStore` etc.
> >> >>>
> >> >>> Thanks,
> >> >>> Guozhang
> >> >>>
> >> >>> On Tue, Jan 25, 2022 at 12:17 AM Luke Chen 
> wrote:
> >> >>>
> >> >>>> Thanks Matthias!
> >> >>>>
> >> >>>> I agree we could deprecate the existing ones, and add the one with
> >> >>>> storeType parameter.
> >> >>>>
> >> >>>> That is:
> >> >>>> @deprecated
> >> >>>> Stores#persistentSessionStore(...)
> >> >>>> @deprecated
> >> >>>> Stores#inMemorySessionStore(...)
> >> >>>> @new added with an additional storeType parameter (IN_MEMORY or
> >> >> ROCKS_DB)
> >> >>>> Stores#sessionStoreSupplier(StoreType storeType, ...)
> >> >>>>
> >> >>>> Let's see what others think about it.
> >> >>>>
> >> >>>> Thank you.
> >> >>>> Luke
> >> >>>>
> >> >>>> On Tue, Jan 25, 2022 at 4:01 PM Matthias J. Sax 
> >> >> wrote:
> >> >>>>
> >> >>>>> Thanks,
> >> >>>>>
> >> >>>>> There is already `Stores.persistentSessionStore` and
> >> >>>>> `Stores.inMemorySessionStore`. From a DSL code POV, I don't see
> >> large
> >> >>>>> benefits to add a new one, but it also does not hurt.
> >> >>>>>
> >> >>>>> Do you propose to add the third one only, or to also deprecate the
> >> >>>>> existing ones? In general, we should avoid to extend the API
> surface
> >> >>>>> area, so it could be a good simplification is we plan to remove
> the
> >> >>>>> existing ones?
> >> >>>>>
> >> >>>>> Btw: we could name the new method just `sessionStoreSupplier` for
> >> >>>>> simplicity (especially, if we deprecate the existing ones)?
> >> >>>>>
> >> >>>>> Not sure what others think. I am fine adding it, if we deprecate
> the
> >> >>>>> existing ones.
> >> >>>>>
> >> >>>>> -Matthias
> >> >>>>>
> >> >>>>>
> >> >>>>> On 1/24/22 5:03 PM, Luke Chen

Re: [DISCUSS] KIP-816: Topology changes without local state reset

2022-02-03 Thread Guozhang Wang
Hello Nick,

Thanks for bringing this up and for the proposed options. I read though
your writeup and here are some of my thoughts:

1) When changing the topology of Kafka Streams, the developer need to first
decide if the whole topology's persisted state (including both the state
store as well as its changelogs, and the repartition topics, and the
source/sink external topics) or part of the persisted state can be reused.
This involves two types of changes:

a) structural change of the topology, such like a new processor node is
added/removed, a new intermediate topic is added/removed etc.
b) semantic change of a processor, such as a numerical filter node changing
its filter threshold etc.

Today both of them are more or less determined by developers manually.
However, though automatically determining on changes of type b) is hard if
not possible, automatic determining on the type of a) is doable since it's
depend on just the information of:
* number of sub-topologies, and their orders (i.e. sequence of ids)
* used state stores and changelog topics within the sub-topology
* used repartition topics
* etc

So let's assume in the long run we can indeed automatically determine if a
topology or part of it (a sub-topology) is structurally the same, what we
can do is to "translate" the old persisted state names to the
new, isomorphic topology's names. Following this thought I'm leaning
towards the direction of option B in your proposal. But since in this KIP
automatic determining structural changes are out of the scope, I feel we
can consider adding some sort of a "migration tool" from an old topology to
new topology by renaming all the persisted states (store dirs and names,
topic names).


Guozhang


On Tue, Jan 25, 2022 at 9:10 AM Nick Telford  wrote:

> Hi everyone,
>
> I'd like to start a discussion on Kafka Streams KIP-816 (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset
> )
>
> This KIP outlines 3 possible solutions to the problem, and I plan to
> whittle this down to a definitive solution based on this discussion.
>
> Of the 3 proposed solutions:
> * 'A' is probably the "correct" solution, but is also quite a significant
> change.
> * 'B' is the least invasive, but most "hacky" solution.
> * 'C' requires a change to the wire protocol and will likely have
> unintended consequences. C is also the least complete solution, and will
> need significant additional work to make it work.
>
> Please let me know if the Motivation and Background sections need more
> clarity.
>
> Regards,
>
> Nick Telford
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-818: Introduce cache-size-bytes-total Task Level Metric

2022-02-03 Thread Guozhang Wang
Thanks Sagar for proposing the KIP, and Sophie for sharing your thoughts.
Here're my 2c:

I think I agree with Sophie for making the two metrics (both the added and
the newly proposed) on INFO level since we are always calculating them
anyways. Regarding the level of the cache-size though, I'm thinking a bit
different with you two: today we do not actually keep that caches on the
per-store level, but rather on the per-thread level, i.e. when the cache is
full we would flush not only on the triggering state store but also
potentially on other state stores as well of the task that thread owns.
This mechanism, in hindsight, is a bit weird and we have some discussions
about refactoring that in the future already. Personally I'd like to make
this new metric to be aligned with whatever our future design will be.

In the long run if we would not have a static assignment from tasks to
threads, it may not make sense to keep a dedicated cache pool per thread.
Instead all tasks will be dynamically sharing the globally configured max
cache size (dynamically here means, we would not just divide the total size
by the num.tasks and then assign that to each task), and when a cache put
triggers the flushing because the sum now exceeds the global configured
value, we would potentially flush all the cached records for that task. If
this is the end stage, then I think keeping this metric at the task level
is good.



Guozhang




On Thu, Feb 3, 2022 at 10:15 AM Sophie Blee-Goldman
 wrote:

> Hey Sagar,  thanks for the KIP!
>
> And yes, all metrics are considered part of the public API and thus require
> a KIP to add (or modify, etc...) Although in this particular case, you
> could probably make a good case for just considering it as an update to the
> original KIP which added the analogous metric `input-buffer-bytes-total`.
> For  things like this that weren't considered during the KIP proposal but
> came up during the implementation or review, and are small changes that
> would have made sense to include in that KIP had they been thought of, you
> can just send an update to the existing KIP's discussion and.or voting
> thread that explains what you want to add or modify and maybe a brief
> description why.
>
> It's always ok to make a new KIP when in doubt, but there are some cases
> where an update email is sufficient. If there are any concerns or
> suggestions that significantly expand the scope of the update, you can
> always go create a new KIP and move the discussion there.
>
> I'd say you can feel free to proceed in whichever way you'd prefer for this
> new proposal -- it just needs to appear in some KIP somewhere, and have
> given the community thew opportunity to discuss and provide feedback on.
>
> On that note, I do have two suggestions:
>
> 1)  since we need to measure the size of the cache (and the input buffer(s)
> anyways, we may as well make `cache-size-bytes-total` -- and also the new
> input-buffer-bytes-total -- an INFO level metric. In general the more
> metrics the merrier, the only real reason for disabling some are if they
> have a performance impact or other cost that not everyone will want to pay.
> In this case we're already computing the value of these metrics, so why not
> expose it to the user as an INFO metric
> 2) I think it would be both more natural and easier to implement if this
> was a store-level metric. A single task could in theory contain multiple
> physical state store caches and we would have to roll them up to report the
> size for the task as a whole. It's additional work just to lose some
> information that the user may want to have
>
> Let me know if anything here doesn't make sense or needs clarification. And
> thanks for the quick followup to get this 2nd metric!
>
> -Sophie
>
> On Sat, Jan 29, 2022 at 4:27 AM Sagar  wrote:
>
> > Hi All,
> >
> > I would like to open a discussion thread on the following KIP:
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> >
> > PS: This is about introducing a new metric and I am assuming that it
> > requires a KIP. If that isn't the case, I can close it.
> >
> > Thanks!
> > Sagar.
> >
>


-- 
-- Guozhang


Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?

2022-02-03 Thread Guozhang Wang
Hello Jonathan,

I think Bruno's point is that we can only upgrade in the bugfix releases if
the old version of rocksDB has a critical bug that the new version would
fix. For 6.27.3 it seems not fixing a critical bug but for a new feature,
and hence unless we change the policy we cannot upgrade in 3.0.1 / 3.1.1
releases.


Guozhang

On Thu, Feb 3, 2022 at 9:22 AM Jonathan Albrecht 
wrote:

>
>
>
> Thanks for the info Bruno. In that case, if no other concerns, I'll try
> updating RocksDB to v6.27.3 on the 3.0 and 3.1 branches and file issues and
> PRs if everything looks good.
>
> Jon
>
>
> "Bruno Cadonna"  wrote on 2022-02-03 10:40:00 AM:
>
> > From: "Bruno Cadonna" 
> > To: dev@kafka.apache.org
> > Date: 2022-02-03 10:40 AM
> > Subject: [EXTERNAL] Re: Kafka <= 3.1 upgrade RocksDB to v6.27.3?
> >
> > Hi Jonathan,
> >
> > We had to wait until AK 3.0 to upgrade RocksDB to 6.19.3 due to source
> > compatibility issue. More specifically, we expose RocksDB APIs in Kafka
> > Streams for configuring RocksDB and those RocksDB APIs changed. So
> > upgrading RocksDB was actually a compatibility breaking change. We had
> > to wait for the major release 3.0.0 to make the upgrade. That means, if
> > the policy allows to upgrade dependencies in bugfix releases we can only
> > upgrade RocksDB in bugfix releases for 3.1 and 3.0. Upgrading RocksDB in
> > earlier releases would break compatibility.
> >
> > Best,
> > Bruno
> >
> > On 03.02.22 15:15, Jonathan Albrecht wrote:
> > >
> > >
> > > The rocksdbjni dependency has been upgraded to v6.27.3 on trunk and I
> > > wanted to ask if it would be ok to also upgrade it to v6.27.3 on the
> 3.1
> > > branch (and possibly earlier branches). I thought I should ask in case
> > > there are some policies around changing dependency versions in point
> > > releases.
> > >
> > > The motivation is that this is the first version of rocksdbjni that
> > > supports s390x and it allows kafka to be built out of the box on this
> > > platform. Having this support in earlier releases helps users on s390x
> that
> > > may need a specific minor release.
> > >
> > > If upgrading earlier releases is ok, how far back would be reasonable?
> I'm
> > > happy to create the issues and PRs and do the local testing, of course.
> > >
> > > Thanks,
> > >
> > > Jonathan Albrecht
> > > Advisory Software Developer
> > > Linux on IBM Z Open Source Ecosystem
> > > 1 905 413 3577 Office
> > > jonathan.albre...@ibm.com
> > >
> > > IBM
> > >
>


-- 
-- Guozhang


Re: [VOTE] KIP-814: Static membership protocol should let the leader skip assignment

2022-02-02 Thread Guozhang Wang
+1. Thanks David!

On Tue, Feb 1, 2022 at 9:39 AM Ryan Leslie (BLOOMBERG/ 919 3RD A) <
rles...@bloomberg.net> wrote:

> Thanks, David.
>
> +1 (non-binding)
>
> From: show...@gmail.com At: 01/31/22 22:13:57 UTC-5:00To:
> dev@kafka.apache.org
> Subject: Re: [VOTE] KIP-814: Static membership protocol should let the
> leader skip assignment
>
> Hi David,
>
> Thanks for the KIP.
>
> I'm +1(non-binding)
>
> Thanks.
> Luke
>
> Jason Gustafson  於 2022年2月1日 週二 上午7:11
> 寫道:
>
> > +1 Thanks!
> >
> > On Mon, Jan 31, 2022 at 12:17 AM David Jacot  wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a vote about KIP-814: Static membership protocol
> > > should let the leader
> > > skip assignment.
> > >
> > > The KIP is here: https://cwiki.apache.org/confluence/x/C5-kCw.
> > >
> > > Best,
> > > David
> > >
> >
>
>
>

-- 
-- Guozhang


Re: [VOTE] KIP-591: Add Kafka Streams config to set default state store

2022-01-28 Thread Guozhang Wang
Hi Luke,

I'm in favor of using the newly proposed `#sessionStore(StoreType..)` and
deprecating the existing `#persistenSessionStore` etc.

Thanks,
Guozhang

On Tue, Jan 25, 2022 at 12:17 AM Luke Chen  wrote:

> Thanks Matthias!
>
> I agree we could deprecate the existing ones, and add the one with
> storeType parameter.
>
> That is:
> @deprecated
> Stores#persistentSessionStore(...)
> @deprecated
> Stores#inMemorySessionStore(...)
> @new added with an additional storeType parameter (IN_MEMORY or ROCKS_DB)
> Stores#sessionStoreSupplier(StoreType storeType, ...)
>
> Let's see what others think about it.
>
> Thank you.
> Luke
>
> On Tue, Jan 25, 2022 at 4:01 PM Matthias J. Sax  wrote:
>
> > Thanks,
> >
> > There is already `Stores.persistentSessionStore` and
> > `Stores.inMemorySessionStore`. From a DSL code POV, I don't see large
> > benefits to add a new one, but it also does not hurt.
> >
> > Do you propose to add the third one only, or to also deprecate the
> > existing ones? In general, we should avoid to extend the API surface
> > area, so it could be a good simplification is we plan to remove the
> > existing ones?
> >
> > Btw: we could name the new method just `sessionStoreSupplier` for
> > simplicity (especially, if we deprecate the existing ones)?
> >
> > Not sure what others think. I am fine adding it, if we deprecate the
> > existing ones.
> >
> > -Matthias
> >
> >
> > On 1/24/22 5:03 PM, Luke Chen wrote:
> > > Hi Matthias,
> > >
> > > I didn't "save" the change. >.<
> > > Anyway, you can refer to this WIP PR to have better understanding
> > why/what
> > > the new API is:
> > >
> >
> https://github.com/apache/kafka/pull/11705/files#diff-c552e58e01169886c5d8b8b149f5c8cd48ea1fc1c3d7b932d055d3df9a00e1b5R464-R477
> > >
> > > It's not necessary, actually, but it can make the implementation
> cleaner.
> > > If you think this change is unnecessary and will make the `Stores` API
> > more
> > > complicated, it's fine to me.
> > >
> > > I'll update the KIP after we have a conclusion for it.
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Tue, Jan 25, 2022 at 2:37 AM Matthias J. Sax 
> > wrote:
> > >
> > >> I don't see the KIP update? Did you hit "save"?
> > >>
> > >> Also, the formatting in your email for the new methods is hard to
> read.
> > >> Not sure atm why we need the API change? Can you elaborate? what does
> > >>
> > >>> I found it'd be better
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 1/24/22 2:29 AM, Luke Chen wrote:
> > >>> Thanks for all your votes.
> > >>>
> > >>> During the implementation, I found it'd be better to have helper
> > methods
> > >> in
> > >>> `Stores`, to be able to get the store supplier by the store type:
> > >>>
> > >>>
> > >>>
> > >>> *public static SessionBytesStoreSupplier
> > >>> sessionStoreSupplierByStoreType()public static
> WindowBytesStoreSupplier
> > >>> windowStoreSupplierByStoreType()public static
> > KeyValueBytesStoreSupplier
> > >>> keyValueStoreSupplierByStoreType()*
> > >>>
> > >>> I've also updated in the KIP.
> > >>> Please let me know if you other thoughts.
> > >>>
> > >>> Also, welcome to vote for this KIP.
> > >>>
> > >>> Thank you.
> > >>> Luke
> > >>>
> > >>>
> > >>> On Fri, Jan 21, 2022 at 4:39 AM Walker Carlson
> > >>>  wrote:
> > >>>
> > >>>> +1 non binding
> > >>>>
> > >>>> On Thu, Jan 20, 2022 at 2:00 PM Matthias J. Sax 
> > >> wrote:
> > >>>>
> > >>>>> +1 (binding)
> > >>>>>
> > >>>>> On 1/20/22 10:52 AM, Guozhang Wang wrote:
> > >>>>>> Thanks Luke! I'm +1 on the KIP.
> > >>>>>>
> > >>>>>>
> > >>>>>> Guozhang
> > >>>>>>
> > >>>>>> On Wed, Jan 19, 2022 at 5:58 PM Luke Chen 
> > wrote:
> > >>>>>>
> > >>>>>>> Hi devs,
> > >>>>>>>
> > >>>>>>> I'd like to start a vote for the KIP-591: Add Kafka Streams
> config
> > to
> > >>>>> set
> > >>>>>>> default state store. The goal is to allow users to set a default
> > >> store
> > >>>>> in
> > >>>>>>> the config, so it can apply to all the streams.
> > >>>>>>>
> > >>>>>>> Detailed description can be found here:
> > >>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Thank you.
> > >>>>>>> Luke
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>


-- 
-- Guozhang


Re: Request to Contribute

2022-01-27 Thread Guozhang Wang
Hello,

Thanks for your interests in contributing, I've added you to JIRA.

Guozhang

On Thu, Jan 27, 2022 at 4:54 AM Tomonari Yamashita 
wrote:

> Hi Apache Kafka team,
>
> Request to Contribute
>
> Could you add me to the contributor list?
>
> I’ve found a issue: "[#KAFKA-13619] zookeeper.sync.time.ms is no longer
> used”
> And I’ve also created a pull request for that.
>
> JIRA username: tyamashi-oss   (Tomonari Yamashita)
> Wiki username:  [same as the above]
>
> Best regards,
> Tomonari
>


-- 
-- Guozhang


Re: [VOTE] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-01-23 Thread Guozhang Wang
Thanks Sagar, I'm +1 on the renamed metric.

On Sat, Jan 22, 2022 at 6:56 PM Sagar  wrote:

> Hi All,
>
> There is a small update to the KIP whereby the newly introduced metric
> *total-bytes
> *has been renamed to *input-buffer-bytes-total.*
>
> Thanks!
> Sagar.
>
> On Wed, Sep 29, 2021 at 9:57 AM Sagar  wrote:
>
> > We have 3 binding votes: Sophie/Guozhang/Mathias
> > and 2 non-binding votes: Josep/Luke and no -1 votes.
> >
> > Marking this KIP as accepted.
> >
> > Thanks everyone!
> >
> > Thanks!
> > Sagar.
> >
> >
> >
> > On Wed, Sep 29, 2021 at 7:18 AM Matthias J. Sax 
> wrote:
> >
> >> +1 (binding)
> >>
> >> On 9/28/21 10:40 AM, Sagar wrote:
> >> > Hi All,
> >> >
> >> > Bumping this vote thread again!
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> > On Wed, Sep 8, 2021 at 1:19 PM Luke Chen  wrote:
> >> >
> >> >> Thanks for the KIP.
> >> >>
> >> >> + 1 (non-binding)
> >> >>
> >> >> Thanks.
> >> >> Luke
> >> >>
> >> >> On Wed, Sep 8, 2021 at 2:48 PM Josep Prat
>  >> >
> >> >> wrote:
> >> >>
> >> >>> +1 (non binding).
> >> >>>
> >> >>> Thanks for the KIP Sagar!
> >> >>> ———
> >> >>> Josep Prat
> >> >>>
> >> >>> Aiven Deutschland GmbH
> >> >>>
> >> >>> Immanuelkirchstraße 26, 10405 Berlin
> >> >>>
> >> >>> Amtsgericht Charlottenburg, HRB 209739 B
> >> >>>
> >> >>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >> >>>
> >> >>> m: +491715557497
> >> >>>
> >> >>> w: aiven.io
> >> >>>
> >> >>> e: josep.p...@aiven.io
> >> >>>
> >> >>> On Wed, Sep 8, 2021, 03:29 Sophie Blee-Goldman
> >> >>  >> >>>>
> >> >>> wrote:
> >> >>>
> >> >>>> +1 (binding)
> >> >>>>
> >> >>>> Thanks for the KIP!
> >> >>>>
> >> >>>> -Sophie
> >> >>>>
> >> >>>> On Tue, Sep 7, 2021 at 1:59 PM Guozhang Wang 
> >> >> wrote:
> >> >>>>
> >> >>>>> Thanks Sagar, +1 from me.
> >> >>>>>
> >> >>>>>
> >> >>>>> Guozhang
> >> >>>>>
> >> >>>>> On Sat, Sep 4, 2021 at 10:29 AM Sagar 
> >> >>> wrote:
> >> >>>>>
> >> >>>>>> Hi All,
> >> >>>>>>
> >> >>>>>> I would like to start a vote on the following KIP:
> >> >>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> >> >>>>>>
> >> >>>>>> Thanks!
> >> >>>>>> Sagar.
> >> >>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> --
> >> >>>>> -- Guozhang
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>
> >> >
> >>
> >
>


-- 
-- Guozhang


Re: [VOTE] KIP-591: Add Kafka Streams config to set default state store

2022-01-20 Thread Guozhang Wang
Thanks Luke! I'm +1 on the KIP.


Guozhang

On Wed, Jan 19, 2022 at 5:58 PM Luke Chen  wrote:

> Hi devs,
>
> I'd like to start a vote for the KIP-591: Add Kafka Streams config to set
> default state store. The goal is to allow users to set a default store in
> the config, so it can apply to all the streams.
>
> Detailed description can be found here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store
>
>
> Thank you.
> Luke
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-813 Shared State Stores

2022-01-20 Thread Guozhang Wang
Hello Daan,

Thanks for writing the KIP. I just read through it and just my 2c here: to
me it seems that one of the goal would be to "externalize" the internal
changelog topic of an application (say A) so that other consumers can
directly read them --- though technically without any auth, anyone knowing
the topic name would be able to write to it too, conceptually we would just
assume that app A is the only writer of that topic --- The question I had
is how much we want to externalize the topic. For example we can,
orthogonally to this KIP, just allow users to pass in a customized topic
name when constructing a state store, indicating the application A to use
that as the changelog, and since that topic is created outside of A and is
publicly visible to anyone else on that cluster, anyone --- including any
consumers, or streams apps. This is probably most flexible as for sharing,
but we are even less assured that if application A is the only writer to
that external topic unless we have explicit auth for A on that topic.

Aside of that, here are a few more detailed comments about the
implementation design itself following your current proposal:

1) How do we handle if the num.partitions of app A's store changelog is
different from the num.tasks of app B's sub-topology with that read-only
store? Or are we going to let each task of B keep a whole copy of the store
of A by reading all of its changelog partitions, like global stores?
2) Are we trying to synchronize the store updates from the changelog to app
B's processing timelines, or just like what we do for global stores that we
just update the read-only stores async?
3) If the answer to both of the above questions are the latter, then what's
the main difference of adding a read-only store v.s. adding a global store?

Guozhang



On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis 
wrote:

> Hey Matthias,
>
> Thank you for that feedback, certainly some things to think about. Let me
> add my thoughts:
>
> +1 on simplifying the motivation. Was aiming to add more context but I
> think you're right, bringing it back to the essence makes more sense.
>
> I also follow the reasoning of not having leader and follower. Makes sense
> to view it from a single app point of view.
>
> As for the API method and its parameters, I wanted to stay close to the
> API for adding a regular statestore, but I can perfectly find myself in
> defining an addReadOnlyStateStore() method instead.
>
> I agree the processor approach would be the most flexible one, and surely
> it allows you to use a processor to base the statestore off an existing
> topic. From what I understood from the codebase, there might be a problem
> when using that statestore. Any processor that would use that materialized,
> read-only statestore would need to wait for the store to be restored. I
> can't find a way to make that possible since processors can't wait for the
> statestore to be restored. Also, since the statestore would have logging
> disabled, it means there is no initial restoration going on. As you wrote,
> the DSL is already doing this, so I'm pretty sure I'm missing something,
> just unable to find what exactly.
>
> I will rewrite the parts in the KIP to make processor-based the preferred
> choice, along with the changes to the motivation etc. Only thing to figure
> out is that restoring behavior to be sure processors of the readonly
> statestore aren't working with stale data.
>
> D.
>
> -Original Message-
> From: Matthias J. Sax 
> Sent: 19 January 2022 21:31
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>
> Daan,
>
> thanks for the KIP. I personally find the motivation section a little bit
> confusing. If I understand the KIP correctly, you want to read a topic into
> a state store (ie, materialize it). This is already possible today.
>
> Of course, today a "second" changelog topic would be created. It seems the
> KIP aims to avoid the additional changelog topic, and to allow to re-use
> the original input topic (this optimization is already available for the
> DSL, but not for the PAPI).
>
> If my observation is correct, we can simplify the motivation accordingly
> (the fact that you want to use this feature to share state across different
> applications more efficiently seems to be secondary and we could omit it
> IMHO to keep the motivation focused).
>
> As a result, we also don't need to concept of "leader" and "follower".
> In the end, Kafka Streams cannot reason/enforce any usage patterns across
> different apps, but we can only guarantee stuff within a single application
> (ie, don't create a changelog but reuse an input topic as changelog). It
> would simplify the KIP if we remove these parts.
>
>
>
> For the API, I am wondering why you propose to pass in `processorNames`?
> To me, it seems more reasonable to pass a `ProcessorSupplier` instead
> (similar to what we do for `addGlobalStore`)? The provided `Processor` must
> implement a certain pattern, 

Re: Please add me to the contributors JIRA list

2022-01-15 Thread Guozhang Wang
Hello Sayantanu,

Thanks for your interests, I've just added you to the list.

Cheers,
Guozhang

On Sat, Jan 15, 2022 at 3:16 AM Sayantanu Dey 
wrote:

> Hi, devs,
> I was looking to contribute to Kafka, so can someone please add me to the
> contributors' list on JIRA?
> JIRA username: dsayan
>
> Thank you
>


-- 
-- Guozhang


[jira] [Created] (KAFKA-13561) Consider deprecating `StreamsBuilder#build(props)` function

2021-12-21 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13561:
-

 Summary: Consider deprecating `StreamsBuilder#build(props)` 
function
 Key: KAFKA-13561
 URL: https://issues.apache.org/jira/browse/KAFKA-13561
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


With 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store
 being accepted that introduced the new `StreamsBuilder(TopologyConfig)` 
constructor, we can consider deprecating the `StreamsBuilder#build(props)` 
function now. There are still a few things we'd need to do:

1. Copy the `StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG` to TopologyConfig.
2. Make sure the overloaded `StreamsBuilder()` constructor takes in default 
values of TopologyConfig.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-591: Add Kafka Streams config to set default state store

2021-12-21 Thread Guozhang Wang
Thanks Luke, I do not have any major comments on the wiki any more. BTW
thanks for making the "public StreamsBuilder(final TopologyConfig
topologyConfigs)" API public now, I think it will benefit a lot of future
works!

I think with the new API, we can deprecate the `build(props)` function in
StreamsBuilder now, and will file a separate JIRA for it.

Just a few nits:

1) There seems to be a typo at the end: "ROCK_DB"
2) Sometimes people refer to "store type" as kv-store, window-store etc;
maybe we can differentiate them a bit by calling the new API names
`StoreImplType`,
`default.dsl.store.impl.type` and `The default store implementation type
used by DSL operators`.

On Thu, Dec 16, 2021 at 2:29 AM Luke Chen  wrote:

> Hi Guozhang,
>
> I've updated the KIP to use `enum`, instead of store implementation, and
> some content accordingly.
> Please let me know if there's other comments.
>
>
> Thank you.
> Luke
>
> On Sun, Dec 12, 2021 at 3:55 PM Luke Chen  wrote:
>
> > Hi Guozhang,
> >
> > Thanks for your comments.
> > I agree that in the KIP, there's a trade-off regarding the API
> complexity.
> > With the store impl, we can support default custom stores, but introduce
> > more complexity for users, while with the enum types, users can configure
> > default built-in store types easily, but it can't work for custom stores.
> >
> > For me, I'm OK to narrow down the scope and introduce the default
> built-in
> > enum store types first.
> > And if there's further request, we can consider a better way to support
> > default store impl.
> >
> > I'll update the KIP next week, unless there are other opinions from other
> > members.
> >
> > Thank you.
> > Luke
> >
> > On Fri, Dec 10, 2021 at 6:33 AM Guozhang Wang 
> wrote:
> >
> >> Thanks Luke for the updated KIP.
> >>
> >> One major change the new proposal has it to change the original enum
> store
> >> type with a new interface. Where in the enum approach our internal
> >> implementations would be something like:
> >>
> >> "
> >> Stores#keyValueBytesStoreSupplier(storeImplTypes, storeName, ...)
> >> Stores#windowBytesStoreSupplier(storeImplTypes, storeName, ...)
> >> Stores#sessionBytesStoreSupplier(storeImplTypes, storeName, ...)
> >> "
> >>
> >> And inside the impl classes like here we would could directly do:
> >>
> >> "
> >> if ((supplier = materialized.storeSupplier) == null) {
> >> supplier =
> >> Stores.windowBytesStoreSupplier(materialized.storeImplType())
> >> }
> >> "
> >>
> >> While I understand the benefit of having an interface such that user
> >> customized stores could be used as the default store types as well,
> >> there's
> >> a trade-off I feel regarding the API complexity. Since with this
> approach,
> >> our API complexity granularity is in the order of "number of impl
> types" *
> >> "number of store types". This means that whenever we add new store types
> >> in
> >> the future, this API needs to be augmented and customized impl needs to
> be
> >> updated to support the new store types, in addition, not all custom impl
> >> types may support all store types, but with this interface they are
> forced
> >> to either support all or explicitly throw un-supported exceptions.
> >>
> >> The way I see a default impl type is that, they would be safe to use for
> >> any store types, and since store types are evolved by the library
> itself,
> >> the default impls would better be controlled by the library as well.
> >> Custom
> >> impl classes can choose to replace some of the stores explicitly, but
> may
> >> not be a best fit as the default impl classes --- if there are in the
> >> future, one way we can consider is to make Stores class extensible along
> >> with the enum so that advanced users can add more default impls,
> assuming
> >> such scenarios are not very common.
> >>
> >> So I'm personally still a bit learning towards the enum approach with a
> >> narrower scope, for its simplicity as an API and also its low
> maintenance
> >> cost in the future. Let me know what do you think?
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, Dec 1, 2021 at 6:48 PM Luke Chen  wrote:
> >>
> >> > Hi devs,
> >> >
> >> > I'd like to propose a KIP to allow users to set default store
> >> > implementation class (built-in RocksDB/InMemory, or custom one), and
> >> > default to RocksDB state store, to keep backward compatibility.
> >> >
> >> > Detailed description can be found here:
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store
> >> >
> >> > Any feedback and comments are welcome.
> >> >
> >> > Thank you.
> >> > Luke
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>


-- 
-- Guozhang


Re: [ANNOUNCE] New Kafka PMC member: David Jacot

2021-12-18 Thread Guozhang Wang
Congrats David!

On Sat, Dec 18, 2021 at 2:49 PM Matthias J. Sax  wrote:

> Congrats!
>
> On 12/17/21 15:46, Bill Bejeck wrote:
> > Congratulations David! Well deserved.
> >
> > -Bill
> >
> > On Fri, Dec 17, 2021 at 6:43 PM José Armando García Sancio
> >  wrote:
> >
> >> Congrats David!
> >>
> >> On Fri, Dec 17, 2021 at 3:09 PM Gwen Shapira  wrote:
> >>>
> >>> Hi everyone,
> >>>
> >>> David Jacot has been an Apache Kafka committer since Oct 2020 and has
> >> been contributing to the community consistently this entire time -
> >> especially notable the fact that he reviewed around 150 PRs in the last
> >> year. It is my pleasure to announce that David agreed to join the Kafka
> PMC.
> >>>
> >>> Congratulations, David!
> >>>
> >>> Gwen Shapira, on behalf of Apache Kafka PMC
> >>
> >>
> >>
> >> --
> >> -Jose
> >>
> >
>


-- 
-- Guozhang


Re: 并发问题请教

2021-12-16 Thread Guozhang Wang
Hello,

One blog post I can think of would be this:
https://www.confluent.io/blog/kafka-fastest-messaging-system/

Here's one Chinese translation version that I found:
https://www.sohu.com/a/417379110_355140

Hope it helps,
Guozhang

On Thu, Dec 16, 2021 at 9:27 PM 酒虫  wrote:

> 你好,
> 请问方便提供一些kafka集群的并发方面的性能测试资料或者其他相关的参考书籍么,比如 单机处理最大并发数之类的



-- 
-- Guozhang


Re: [VOTE] KIP-806: Add session and window query over KV-store in IQv2

2021-12-15 Thread Guozhang Wang
ndowRangeQuery.fromKey is to call SessionStore.fetch
> which
> > > returns a KeyValueIterator.
> > > * Combine WindowKeyQuery and WindowRangeQuery, e.g., by exposing the
> > > template type
> > > * Create a "Builder" interface for the query types to avoid blowing up
> the
> > > static methods.
> > >
> > > Since I think any of those tasks will require some more discussion,
> and in
> > > the interest of being pragmatic here, I suggest we defer those
> > > optimizations to future KIPs.
> > >
> > > Best,
> > >   Patrick
> > >
> > > On Tue, Dec 14, 2021 at 1:02 AM Guozhang Wang 
> wrote:
> > >
> > > > Hi John,
> > > >
> > > > Please see my follow-up comments inlined below.
> > > >
> > > > On Mon, Dec 13, 2021 at 2:26 PM John Roesler 
> wrote:
> > > >
> > > > > Hi Patrick, thanks for the KIP!
> > > > >
> > > > > I hope you, Guozhang, and Luke don't mind if I share some
> > > > > thoughts:
> > > > >
> > > > > 1. I think you just meant to remove that private constructor
> > > > > from the KIP, right?
> > > > >
> > > >
> > > > Yes.
> > > >
> > > >
> > > > >
> > > > > 2. I think WindowRangeQuery#withWindowRage(windowLower,
> > > > > windowUpper) is the version that yields an iterator over
> > > > > both keys and windows, so it's not totally redundant.
> > > > >
> > > > > However, it does seem like WindowRangeQuery#withKey is
> > > > > equivalent to WindowKeyQuery#withKey . I overlooked it
> > > > > before, but we probably don't need both.
> > > > >
> > > > > Stepping back, it seems like we could either just de-
> > > > > duplicate that specific withKey query or we could even just
> > > > > merge both use cases into the WindowRangeQuery. I've
> > > > > personally never been convinced that the extra complexity of
> > > > > having the WindowStoreIterator is worth the savings of not
> > > > > duplicating the key on each record. Maybe it would be if we
> > > > > separately deserialized the same key over and over again,
> > > > > but that seems optimizable.
> > > > >
> > > > > On the other hand, that's more work, and the focus for IQv2
> > > > > right now is to just get some MVP of queries implemented to
> > > > > cover basic use cases, so it might be worthwhile to keep it
> > > > > simple (by just dropping `WindowRangeQuery#withKey`). I'm
> > > > > happy with whatever call Patrick wants to make here, since
> > > > > it's his work to do.
> > > > >
> > > >
> > > > My original comments was actually referring to that
> `WindowRangeQuery` does
> > > > not allow users to specify a range of [keyFrom: windowFrom, keyTo:
> > > > windowTo], but only [key: windowFrom, key: windowTo], or [-INF:
> windowFrom,
> > > > INF: windowTo] if we do not specify the key but only do
> `withWindowRage`. I
> > > > was not sure if this is intentional by design, i.e. that the only
> > > > difference between the two classes is that the latter allows [-INF:
> > > > windowFrom, INF: windowTo], and and I was assuming it's now, but
> otherwise
> > > > these two are then very much alike.
> > > >
> > > > So I guess my question is that, is it by-design to not allow users do
> > > > something like `query.withWindowRange(..).withKeyRange(..)`?
> > > >
> > > >
> > > > >
> > > > > 3. I got the impression that WindowRangeQuery was intended
> > > > > for use with both Window and Session stores, but that might
> > > > > be my assumption.
> > > > >
> > > > >
> > > > Today the session store's query API is at least different on the
> names,
> > > > e.g. "findSessions", so I'm not sure if the proposal intend to
> replace
> > > > these functions with the WindowRangeQuery such that:
> > > >
> > > > sessionStore.findSessions(K, Ins, Ins) ->
> query.withKey().withWindowRange()
> > > > sessionStore.findSessions(K, K, Ins, Ins) ->
> > > > query.withKeyRange().withWindowRange()   // again, assuming we would
> want

Re: [VOTE] KIP-806: Add session and window query over KV-store in IQv2

2021-12-13 Thread Guozhang Wang
dowRangeQuery#withKey(k)`).
> Therefore, the iterator is just (windowStartTime, value)
> pairs.
> * WindowRangeQuery: the results may have different keys, so
> the iterator is (Windowed, value) pairs, where
> Windowed is basically a (windowStartTime, K key) pair.
>
> Therefore, we could support a WindowRangeQuery with a fixed
> key, but it would just be another way to express the same
> thing as the WindowKeyQuery with the same parameters.
>
>
As I suggested, we do not necessarily need to cover all existing cases
compared with the old API, but it's better to be very clear on what's
actually covered v.s. what's not. For example in WindowRangeQuery, I'd like
to clarify if all the following are intended in this KIP's scope:

* query.withKey().withWindowRange()
* query.withKey() // range all window
* query.withWindowRange() // range all keys.


> As in point 2, I do think it might be worth converging all
> use cases into the WindowRangeQuery, but we can also just
> shoot for parity now and defer elegance for the future.
>
> 6. I'll just add one question of my own here: If we do keep
> both query classes, then I think we would drop `withKey` and
> `getKey` from the WindowRangeQuery. But if we do wind up
> keeping the WindowRangeQuery class, I think we should
> reconsider the name of the getter, since there are other
> range queries that support a range of keys. Is there a
> getter name we can use that would still work if we come back
> later and add lower and upper key bounds?
>
> Thanks again for the KIP!
> John
>
> On Mon, 2021-12-13 at 16:35 +0800, Luke Chen wrote:
> > Hi Patrick,
> >
> > Thanks for the KIP!
> >
> > I have some comments, in addition to Guozhang's comments:
> > 4. The parameter names `windowLower` and `windowUpper` are kind of
> > ambiguous to me. Could we come up a better name for it, like
> > `windowStartTime`, `windowEndTime`, or even we don't need the "window"
> > name, just `startTime` and `endTime`?
> > 5. Why can't we support window range query with a key within a time
> range?
> > You might need to explain in the KIP.
> >
> > Thank you.
> > Luke
> >
> >
> > On Sat, Dec 11, 2021 at 7:54 AM Guozhang Wang 
> wrote:
> >
> > > Hi Patrick,
> > >
> > > I made a pass on the KIP and have a few comments below:
> > >
> > > 1. The `WindowRangeQuery` has a private constructor while the
> > > `WindowKeyQuery` has not, is that intentional?
> > >
> > > 2. The `WindowRangeQuery` seems not allowing to range over both window
> and
> > > key, but only window with a fixed key, in that case it seems pretty
> much
> > > the same as the other (ignoring the constructor), since we know we
> would
> > > only have a single `key` value in the returned iterator, and hence it
> seems
> > > returning in the form of `WindowStoreIterator` is also fine as the
> key
> > > is fixed and hence no need to maintain it in the returned iterator. I'm
> > > wondering should we actually support ranging over keys as well in
> > > `WindowRangeQuery`?
> > >
> > > 3. The KIP title mentioned both session and window, but the APIs only
> > > involves window stores; However the return type `WindowStoreIterator`
> is
> > > only for window stores not session stores, so I feel we would still
> have
> > > some differences for session window query interface?
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Dec 10, 2021 at 1:32 PM Patrick Stuedi
> > > 
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I would like to start the vote for KIP-806 that adds window and
> session
> > > > query support to query KV-stores using IQv2.
> > > >
> > > > The KIP can be found here:
> > > > https://cwiki.apache.org/confluence/x/LJaqCw
> > > >
> > > > Skipping the discussion phase as this KIP is following the same
> pattern
> > > as
> > > > the previously submitted KIP-805 (KIP:
> > > > https://cwiki.apache.org/confluence/x/85OqCw, Discussion:
> > > > https://tinyurl.com/msp5mcb2). Of course concerns/comments can
> still be
> > > > brought up in this thread.
> > > >
> > > > -Patrick
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
>
>

-- 
-- Guozhang


Re: [VOTE] KIP-778 KRaft upgrades

2021-12-11 Thread Guozhang Wang
Thanks David! +1.

Guozhang

On Fri, Dec 10, 2021 at 7:12 PM deng ziming 
wrote:

> Hi, David
>
> Looking forwarding to this feature
>
> +1 (non-binding)
>
> Thanks!
>
> Ziming Deng
>
> > On Dec 11, 2021, at 4:49 AM, David Arthur 
> wrote:
> >
> > Hey everyone, I'd like to start a vote for KIP-778 which adds support for
> > KRaft to KRaft upgrades.
> >
> > Notably in this KIP is the first use case of KIP-584 feature flags. As
> > such, there are some addendums to KIP-584 included.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades
> >
> > Thanks!
> > David
>
>

-- 
-- Guozhang


Re: [VOTE] KIP-806: Add session and window query over KV-store in IQv2

2021-12-10 Thread Guozhang Wang
Hi Patrick,

I made a pass on the KIP and have a few comments below:

1. The `WindowRangeQuery` has a private constructor while the
`WindowKeyQuery` has not, is that intentional?

2. The `WindowRangeQuery` seems not allowing to range over both window and
key, but only window with a fixed key, in that case it seems pretty much
the same as the other (ignoring the constructor), since we know we would
only have a single `key` value in the returned iterator, and hence it seems
returning in the form of `WindowStoreIterator` is also fine as the key
is fixed and hence no need to maintain it in the returned iterator. I'm
wondering should we actually support ranging over keys as well in
`WindowRangeQuery`?

3. The KIP title mentioned both session and window, but the APIs only
involves window stores; However the return type `WindowStoreIterator` is
only for window stores not session stores, so I feel we would still have
some differences for session window query interface?


Guozhang

On Fri, Dec 10, 2021 at 1:32 PM Patrick Stuedi 
wrote:

> Hi everyone,
>
> I would like to start the vote for KIP-806 that adds window and session
> query support to query KV-stores using IQv2.
>
> The KIP can be found here:
> https://cwiki.apache.org/confluence/x/LJaqCw
>
> Skipping the discussion phase as this KIP is following the same pattern as
> the previously submitted KIP-805 (KIP:
> https://cwiki.apache.org/confluence/x/85OqCw, Discussion:
> https://tinyurl.com/msp5mcb2). Of course concerns/comments can still be
> brought up in this thread.
>
> -Patrick
>


-- 
-- Guozhang


Re: [VOTE] KIP-805: Add range and scan query support in IQ v2

2021-12-10 Thread Guozhang Wang
Thanks Vicky,

I'd suggest we change the KIP title as "add range and scan query over
kv-store in IQv2" just for clarification, otherwise I'm +1.

Guozhang

On Wed, Dec 8, 2021 at 4:18 PM Matthias J. Sax  wrote:

> Thanks for the KIP.
>
> +1 (binding)
>
> On 12/5/21 7:03 PM, Luke Chen wrote:
> > Hi Vasiliki,
> >
> > Thanks for the KIP!
> > It makes sense to have the range and scan query in IQv2, as in IQv1.
> >
> > +1 (non-binding)
> >
> > Thank you.
> > Luke
> >
> > On Thu, Dec 2, 2021 at 5:41 AM John Roesler  wrote:
> >
> >> Thanks for the KIP, Vicky!
> >>
> >> I’m +1 (binding)
> >>
> >> -John
> >>
> >> On Tue, Nov 30, 2021, at 14:51, Vasiliki Papavasileiou wrote:
> >>> Hello everyone,
> >>>
> >>> I would like to start a vote for KIP-805 that adds range and scan
> >> KeyValue
> >>> queries in IQ2.
> >>>
> >>> The KIP can be found here:
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+support+in+IQ+v2
> >>>
> >>> Cheers!
> >>> Vicky
> >>
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-591: Add Kafka Streams config to set default state store

2021-12-09 Thread Guozhang Wang
Thanks Luke for the updated KIP.

One major change the new proposal has it to change the original enum store
type with a new interface. Where in the enum approach our internal
implementations would be something like:

"
Stores#keyValueBytesStoreSupplier(storeImplTypes, storeName, ...)
Stores#windowBytesStoreSupplier(storeImplTypes, storeName, ...)
Stores#sessionBytesStoreSupplier(storeImplTypes, storeName, ...)
"

And inside the impl classes like here we would could directly do:

"
if ((supplier = materialized.storeSupplier) == null) {
supplier = Stores.windowBytesStoreSupplier(materialized.storeImplType())
}
"

While I understand the benefit of having an interface such that user
customized stores could be used as the default store types as well, there's
a trade-off I feel regarding the API complexity. Since with this approach,
our API complexity granularity is in the order of "number of impl types" *
"number of store types". This means that whenever we add new store types in
the future, this API needs to be augmented and customized impl needs to be
updated to support the new store types, in addition, not all custom impl
types may support all store types, but with this interface they are forced
to either support all or explicitly throw un-supported exceptions.

The way I see a default impl type is that, they would be safe to use for
any store types, and since store types are evolved by the library itself,
the default impls would better be controlled by the library as well. Custom
impl classes can choose to replace some of the stores explicitly, but may
not be a best fit as the default impl classes --- if there are in the
future, one way we can consider is to make Stores class extensible along
with the enum so that advanced users can add more default impls, assuming
such scenarios are not very common.

So I'm personally still a bit learning towards the enum approach with a
narrower scope, for its simplicity as an API and also its low maintenance
cost in the future. Let me know what do you think?


Guozhang


On Wed, Dec 1, 2021 at 6:48 PM Luke Chen  wrote:

> Hi devs,
>
> I'd like to propose a KIP to allow users to set default store
> implementation class (built-in RocksDB/InMemory, or custom one), and
> default to RocksDB state store, to keep backward compatibility.
>
> Detailed description can be found here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store
>
> Any feedback and comments are welcome.
>
> Thank you.
> Luke
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-805: Add range and scan query support in IQ v2

2021-12-09 Thread Guozhang Wang
Hi Vicky,

Thanks for the KIP. Just for a bit more clarification, could you elaborate
an example for windowed stores, beyond a key-value store (I think the
`myStore` is for kv-store right?). Otherwise LGTM.


Guozhang

On Wed, Dec 8, 2021 at 4:18 PM Matthias J. Sax  wrote:

> Thanks for the details!
>
> I also chatted with John about it, and he filed
> https://issues.apache.org/jira/browse/KAFKA-13526 to incorporate some
> feedback as follow up work.
>
> IMHO, the hard coded query translation is not ideal and should be
> plugable. But for a v1 of IQv2 (pun intended) the hardcoded translation
> seems to be good enough.
>
>
> -Matthias
>
> On 12/8/21 9:37 AM, Vasiliki Papavasileiou wrote:
> > Hey Matthias,
> >
> > Thank you for looking into the KIP!
> >
> > We are adding raw versions of typed queries, like `RawRangeQuery` because
> > it simplifies internal query handling since the bytes stores only support
> > raw queries. A typed RangeQuery is handled by the `MeteredStore` which
> > creates a new `RawRangeQuery` to pass down to the wrapped stores. When it
> > gets the result back, it deserializes the data and creates a typed query
> > result to return to the user. So, the store's key serde are used to
> > translate typed `RangeQueries` into `RawRangeQueries` and it's value
> serde
> > are used to translate the result of the query on the way back. This
> allows
> > users to provide their own queries even if the MeteredStore has no
> > knowledge of them.
> >
> > I hope this answers your question. Let me know if you have any other
> > questions.
> >
> > Best,
> > Vicky
> >
> >
> > On Tue, Dec 7, 2021 at 12:46 AM Matthias J. Sax 
> wrote:
> >
> >> Thanks for the KIP. Overall, make sense.
> >>
> >> One question: What is the purpose to `RawRangeQuery`? Seems not very
> >> user friendly.
> >>
> >> -Matthias
> >>
> >>
> >> On 11/30/21 12:48 PM, Vasiliki Papavasileiou wrote:
> >>> Thank you John! Yes, that was a typo from copying and I fixed it.
> >>>
> >>> Since there have been no more comments, I will start the vote.
> >>>
> >>> Best,
> >>> Vicky
> >>>
> >>> On Tue, Nov 30, 2021 at 5:22 AM John Roesler 
> >> wrote:
> >>>
>  Thanks for the KIP, Vicky!
> 
>  This KIP will help fill in the parity gap between IQ and
>  IQv2.
> 
>  One thing I noticed, which looks like just a typo is that
>  the value type of the proposed RangeQuery should probably be
>  KeyValueIterator, right?
> 
>  Otherwise, it looks good to me!
> 
>  Thanks,
>  -John
> 
>  On Mon, 2021-11-29 at 12:20 +, Vasiliki Papavasileiou
>  wrote:
> > Hello everyone,
> >
> > I would like to start the discussion for KIP-805: Add range and scan
>  query
> > support in IQ v2
> >
> > The KIP can be found here:
> >
> 
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+support+in+IQ+v2
> >
> > Any suggestions are more than welcome.
> >
> > Many thanks,
> > Vicky
> 
> 
> >>>
> >>
> >
>


-- 
-- Guozhang


Re: [VOTE] KIP-792: Add "generation" field into consumer protocol

2021-12-09 Thread Guozhang Wang
Thanks Luke, in that case I'm +1 on this KIP.

On Thu, Dec 9, 2021 at 1:46 AM Luke Chen  wrote:

> Hi Guozhang,
>
> Thanks for your comment.
>
> > we need to make sure the old-versioned leader would be able to ignore the
> new
> field during an upgrade e.g. without crashing.
>
> Yes, I understand. I'll be careful to make sure it won't crash the old
> versioned leader.
> But basically, it won't, because we appended the new field into the last of
> the ConsumerProtocolSubscription, which means, when read/deserialize the
> Subscription metadata, the old versioned leader will just read the head
> part of the data.
>
> Thanks for the reminder!
>
> Luke
>
> On Thu, Dec 9, 2021 at 4:00 AM Guozhang Wang  wrote:
>
> > Hi Luke,
> >
> > Thanks for the KIP.
> >
> > One thing I'd like to double check is that, since the
> > ConsumerProtocolSubscription is not auto generated from the json file, we
> > need to make sure the old-versioned leader would be able to ignore the
> new
> > field during an upgrade e.g. without crashing. Other than that, the KIP
> > lgtm.
> >
> > Guozhang
> >
> > On Tue, Dec 7, 2021 at 6:16 PM Luke Chen  wrote:
> >
> > > Hi Colin,
> > >
> > > I'm not quite sure if I understand your thoughts correctly.
> > > If I was wrong, please let me know.
> > >
> > > Also, I'm not quite sure how I could lock this feature to a new IBP
> > > version.
> > > I saw "KIP-584: Versioning scheme for features" is still under
> > development.
> > > Not sure if I need to lock the IBP version, how should I do?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Tue, Dec 7, 2021 at 9:41 PM Luke Chen  wrote:
> > >
> > > > Hi Colin,
> > > >
> > > > Thanks for your comments. I've updated the KIP to mention about the
> KIP
> > > > won't affect current broker side behavior.
> > > >
> > > > > One scenario that we need to consider is what happens during a
> > rolling
> > > > upgrade. If the coordinator moves back and forth between brokers with
> > > > different IBPs, it seems that the same epoch numbers could be reused
> > for
> > > a
> > > > group, if things are done in the obvious manner (old IBP = don't read
> > or
> > > > write epoch, new IBP = do)
> > > >
> > > > I think this KIP doesn't care about the group epoch number at all.
> The
> > > > subscription metadata is passed from each member to group
> coordinator,
> > > and
> > > > then the group coordinator pass all of them back to the consumer
> lead.
> > So
> > > > even if the epoch number is reused in a group, it should be fine. On
> > the
> > > > other hand, the group coordinator will have no idea if the join group
> > > > request sent from consumer containing the new subscription
> "generation"
> > > > field or not, because group coordinator won't deserialize the
> metadata.
> > > >
> > > > I've added also added them into the KIP.
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Mon, Dec 6, 2021 at 10:39 AM Colin McCabe 
> > wrote:
> > > >
> > > >> Hi Luke,
> > > >>
> > > >> Thanks for the explanation.
> > > >>
> > > >> I don't see any description of how the broker decides to use the new
> > > >> version of ConsumerProtocolSubscription or not. This probably needs
> to
> > > be
> > > >> locked to a new IBP version.
> > > >>
> > > >> One scenario that we need to consider is what happens during a
> rolling
> > > >> upgrade. If the coordinator moves back and forth between brokers
> with
> > > >> different IBPs, it seems that the same epoch numbers could be reused
> > > for a
> > > >> group, if things are done in the obvious manner (old IBP = don't
> read
> > or
> > > >> write epoch, new IBP = do).
> > > >>
> > > >> best,
> > > >> Colin
> > > >>
> > > >>
> > > >> On Fri, Dec 3, 2021, at 18:46, Luke Chen wrote:
> > > >> > Hi Colin,
> > > >> > Thanks for your comment.
> > > >> >
> > > >> >> How are we going to avoid the situation where the broker
> restarts,
> > > and
> > &g

Re: [DISCUSS] KIP-807: Refactor KafkaStreams exposed metadata hierarchy

2021-12-09 Thread Guozhang Wang
Hi Josep,

Thanks for the proposed KIP. It looks good to me overall. One meta comment
is that in order to illustrate whether that fits more cleanly to the actual
metadata use cases, we could add a few examples each with its code snippet
based on the new APIs. The ones I'm having in mind are:

1) Give me the list of hosts for a specific store, and let me know who is
the active host and who are the standbys.
2) Give me the list of hosts for a specific store and a specific key of the
store, and let me know who is the active host and who are the standbys.
3) Give me all the embedded client ids of the current instance, across all
of its threads.
4) Let me know how many threads the instance has, and their current status.
5) Let me know for a given store, how far in terms of records is each of
the standbys lagging behind the active replica.


Guozhang

On Fri, Dec 3, 2021 at 11:15 AM Josep Prat 
wrote:

> Hi all,
>
> I've created KIP-807 to refactor KafaStreams exposed metadata hierarchy.
> Here you can find the document:
> https://cwiki.apache.org/confluence/x/TpiqCw
>
> Please take a look at the proposal and let me know what you think,
>
> Thanks in advance,
>
> --
>
> Josep Prat
>
> *Aiven Deutschland GmbH*
>
> Immanuelkirchstraße 26, 10405 Berlin
>
> Amtsgericht Charlottenburg, HRB 209739 B
>
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>
> *m:* +491715557497
>
> *w:* aiven.io
>
> *e:* josep.p...@aiven.io
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-796: Interactive Query v2

2021-12-08 Thread Guozhang Wang
Thanks for the clarification, it looks good to me now.

On Wed, Nov 17, 2021 at 9:21 PM John Roesler  wrote:

> Ah, sorry, Guozhang,
>
> It seem I was a bit too eager with starting the vote thread.
>
> 13: I think that makes perfect sense. I've updated the KIP.
>
> 14: Oof, I can't believe I overlooked those newer
> exceptions. Some of them will become exceptions in IQv2,
> whereas others will beceome individual partition QueryResult
> failures. Here is an accounting of what will become of those
> proposed exceptions:
>
> * StreamsNotStartedException: thrown when stream thread
> state is CREATED, the user can retry until to RUNNING.
>
> * StreamsRebalancingException: thrown when stream thread is
> not running and stream state is REBALANCING. This exception
> is no longer applicable. Regardless of the rebalanceing
> state of the store's task, the state will either be up to
> the requested bound or not.
>
> * StateStoreMigratedException: thrown when state store
> already closed and stream state is RUNNING. This is a per-
> partition failure, so it now maps to the
> FailureReason.NOT_PRESENT failure.
>
>
> * StateStoreNotAvailableException: thrown when state store
> closed and stream state is PENDING_SHUTDOWN / NOT_RUNNING /
> ERROR. I (subjectively) felt the name was ambiguous with
> respect to the prior condition in which a store partition is
> not locally available. This is replaced with the thrown
> exception, StreamsStoppedException (the JavaDoc states the
> that it is thrown when Streams is in any terminal state).
>
> * UnknownStateStoreException: thrown when passing an unknown
> state store. This is still a thown exception.
>
> * InvalidStateStorePartitionException: thrown when user
> requested partition is not available on the stream instance.
> If the partition actually does exist, then we will now
> return a per-partition FailureReason.NOT_PRESENT. If the
> requested partition is actually not present in the topology
> at all, then we will return the per-partition
> FailureReason.DOES_NOT_EXIST.
>
> Sorry for the oversight. The KIP has been updated.
>
> Thanks,
> -John
>
> On Wed, 2021-11-17 at 15:48 -0800, Guozhang Wang wrote:
> > Thanks John.
> >
> > I made another pass on the KIP and overall it already looks pretty good.
> I
> > just have a couple more minor comments:
> >
> > 13: What do you think about just removing the following function in
> > QueryResult
> >
> >   // returns a failed query result because caller requested a "latest"
> > bound, but the task was
> >   // not active and running.
> >   public static  QueryResult notActive(String currentState);
> >
> > Instead just use `notUpToBound` for the case when `latest` bound is
> > requested but the node is not the active replica. My main motivation is
> > trying to abstract away the notion of active/standby from the public APIs
> > itself, and hence capturing both this case as well as just a
> > normal "position bound not achieved" in the same return signal, until
> later
> > when we think it is indeed needed to separate them with different
> returns.
> >
> > 14: Regarding the possible exceptions being thrown from `query`, it seems
> > more exception types are possible from KIP-216:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
> ,
> > should we include all in the javadocs?
> >
> >
> > Guozhang
> >
> >
> >
> > On Wed, Nov 17, 2021 at 3:25 PM John Roesler 
> wrote:
> >
> > > Thanks for the reply, Guozhang!
> > >
> > > I have updated the KIP to tie up the remaining points that
> > > we have discussed. I really appreciate your time in refining
> > > the proposal. I included a quick summary of the final state
> > > of our discussion points below.
> > >
> > > Since it seems like this discussion thread is pretty
> > > convergent, I'll go ahead and start the voting thread soon.
> > >
> > > Thanks again!
> > > -John
> > >
> > > P.S.: the final state of our discussion points:
> > >
> > > 1. I removed serdesForStore from the proposal (and moved it
> > > to Rejected Alternatives)
> > >
> > > 2. Thanks for that reference. I had overlooked that
> > > implementation. I'd note that the ListValuesStore is
> > > currently only used in the KStream API, which doesn't
> > > support queries at all. Due to its interface, it could
> > > theoretically be used to materialize a KTabl

Re: [VOTE] KIP-792: Add "generation" field into consumer protocol

2021-12-08 Thread Guozhang Wang
Hi Luke,

Thanks for the KIP.

One thing I'd like to double check is that, since the
ConsumerProtocolSubscription is not auto generated from the json file, we
need to make sure the old-versioned leader would be able to ignore the new
field during an upgrade e.g. without crashing. Other than that, the KIP
lgtm.

Guozhang

On Tue, Dec 7, 2021 at 6:16 PM Luke Chen  wrote:

> Hi Colin,
>
> I'm not quite sure if I understand your thoughts correctly.
> If I was wrong, please let me know.
>
> Also, I'm not quite sure how I could lock this feature to a new IBP
> version.
> I saw "KIP-584: Versioning scheme for features" is still under development.
> Not sure if I need to lock the IBP version, how should I do?
>
> Thank you.
> Luke
>
> On Tue, Dec 7, 2021 at 9:41 PM Luke Chen  wrote:
>
> > Hi Colin,
> >
> > Thanks for your comments. I've updated the KIP to mention about the KIP
> > won't affect current broker side behavior.
> >
> > > One scenario that we need to consider is what happens during a rolling
> > upgrade. If the coordinator moves back and forth between brokers with
> > different IBPs, it seems that the same epoch numbers could be reused for
> a
> > group, if things are done in the obvious manner (old IBP = don't read or
> > write epoch, new IBP = do)
> >
> > I think this KIP doesn't care about the group epoch number at all. The
> > subscription metadata is passed from each member to group coordinator,
> and
> > then the group coordinator pass all of them back to the consumer lead. So
> > even if the epoch number is reused in a group, it should be fine. On the
> > other hand, the group coordinator will have no idea if the join group
> > request sent from consumer containing the new subscription "generation"
> > field or not, because group coordinator won't deserialize the metadata.
> >
> > I've added also added them into the KIP.
> >
> > Thank you.
> > Luke
> >
> > On Mon, Dec 6, 2021 at 10:39 AM Colin McCabe  wrote:
> >
> >> Hi Luke,
> >>
> >> Thanks for the explanation.
> >>
> >> I don't see any description of how the broker decides to use the new
> >> version of ConsumerProtocolSubscription or not. This probably needs to
> be
> >> locked to a new IBP version.
> >>
> >> One scenario that we need to consider is what happens during a rolling
> >> upgrade. If the coordinator moves back and forth between brokers with
> >> different IBPs, it seems that the same epoch numbers could be reused
> for a
> >> group, if things are done in the obvious manner (old IBP = don't read or
> >> write epoch, new IBP = do).
> >>
> >> best,
> >> Colin
> >>
> >>
> >> On Fri, Dec 3, 2021, at 18:46, Luke Chen wrote:
> >> > Hi Colin,
> >> > Thanks for your comment.
> >> >
> >> >> How are we going to avoid the situation where the broker restarts,
> and
> >> > the same generation number is reused?
> >> >
> >> > Actually, this KIP doesn't have anything to do with the brokers. The
> >> > "generation" field I added, is in the subscription metadata, which
> will
> >> not
> >> > be deserialized by brokers. The metadata is only deserialized by
> >> consumer
> >> > lead. And for the consumer lead, the only thing the lead cared about,
> is
> >> > the highest generation of the ownedPartitions among all the consumers.
> >> With
> >> > the highest generation of the ownedPartitions, the consumer lead can
> >> > distribute the partitions as sticky as possible, and most importantly,
> >> > without errors.
> >> >
> >> > That is, after this KIP, if the broker restarts, and the same
> generation
> >> > number is reused, it won't break current rebalance behavior. But it'll
> >> help
> >> > the consumer lead do the sticky assignments correctly.
> >> >
> >> > Thank you.
> >> > Luke
> >> >
> >> > On Fri, Dec 3, 2021 at 6:30 AM Colin McCabe 
> wrote:
> >> >
> >> >> How are we going to avoid the situation where the broker restarts,
> and
> >> the
> >> >> same generation number is reused?
> >> >>
> >> >> best,
> >> >> Colin
> >> >>
> >> >> On Tue, Nov 30, 2021, at 16:36, Luke Chen wrote:
> >> >> > Hi all,
> >> >> >
> >> >> > I'd like to start the vote for KIP-792: Add "generation" field into
> >> >> > consumer protocol.
> >> >> >
> >> >> > The goal of this KIP is to allow the assignor/consumer coordinator
> to
> >> >> have
> >> >> > a way to identify the out-of-date members/assignments, to avoid
> >> rebalance
> >> >> > stuck issues in current protocol.
> >> >> >
> >> >> > Detailed description can be found here:
> >> >> >
> >> >>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614
> >> >> >
> >> >> > Any feedback is welcome.
> >> >> >
> >> >> > Thank you.
> >> >> > Luke
> >> >>
> >>
> >
>


-- 
-- Guozhang


Re: Contributor access

2021-12-06 Thread Guozhang Wang
Hello Tamara,

Thanks for your interest, I've added you to the contributor list.

Guozhang

On Mon, Dec 6, 2021 at 9:24 PM Tamara Skokova
 wrote:

> Hi,
> I would like to contribute to Apache Kafka.
> Could you please grant me contributor access?
> ID: tamara_skokova
>


-- 
-- Guozhang


Re: [ANNOUNCE] New Kafka PMC Member: Tom Bentley

2021-11-18 Thread Guozhang Wang
Congrats Tom!

Guozhang

On Thu, Nov 18, 2021 at 7:49 AM Jun Rao  wrote:

> Hi, Everyone,
>
> Tom Bentley has been a Kafka committer since Mar. 15,  2021. He has been
> very instrumental to the community since becoming a committer. It's my
> pleasure to announce that Tom is now a member of Kafka PMC.
>
> Congratulations Tom!
>
> Jun
> on behalf of Apache Kafka PMC
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-796: Interactive Query v2

2021-11-17 Thread Guozhang Wang
Thanks John.

I made another pass on the KIP and overall it already looks pretty good. I
just have a couple more minor comments:

13: What do you think about just removing the following function in
QueryResult

  // returns a failed query result because caller requested a "latest"
bound, but the task was
  // not active and running.
  public static  QueryResult notActive(String currentState);

Instead just use `notUpToBound` for the case when `latest` bound is
requested but the node is not the active replica. My main motivation is
trying to abstract away the notion of active/standby from the public APIs
itself, and hence capturing both this case as well as just a
normal "position bound not achieved" in the same return signal, until later
when we think it is indeed needed to separate them with different returns.

14: Regarding the possible exceptions being thrown from `query`, it seems
more exception types are possible from KIP-216:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors,
should we include all in the javadocs?


Guozhang



On Wed, Nov 17, 2021 at 3:25 PM John Roesler  wrote:

> Thanks for the reply, Guozhang!
>
> I have updated the KIP to tie up the remaining points that
> we have discussed. I really appreciate your time in refining
> the proposal. I included a quick summary of the final state
> of our discussion points below.
>
> Since it seems like this discussion thread is pretty
> convergent, I'll go ahead and start the voting thread soon.
>
> Thanks again!
> -John
>
> P.S.: the final state of our discussion points:
>
> 1. I removed serdesForStore from the proposal (and moved it
> to Rejected Alternatives)
>
> 2. Thanks for that reference. I had overlooked that
> implementation. I'd note that the ListValuesStore is
> currently only used in the KStream API, which doesn't
> support queries at all. Due to its interface, it could
> theoretically be used to materialize a KTable, though it has
> no supplier provided in the typical Stores factory class.
>
> Regardless, I think that it would still be a similar story
> to the Segmented store. The ListValues store would simply
> choose to terminate the query on its own and not delegate to
> any of the wrapped KeyValue stores. It wouldn't matter that
> the wrapped stores have a query-handling facility of their
> own, if the wrapping store doesn't choose to delegate, the
> wrapped store will not try to execute any queries.
>
> Specifically regarding the key transformation that these
> "formatted" stores perform, when they handle the query, they
> would have the ability to execute the query in any way that
> makes sense OR to just reject the query if it doesn't make
> sense.
>
> 3, 4: nothing to do
>
> 5: I updated the KIP to specify the exceptions that may be
> thrown in `KafkaStreams#query` and to clarify that per-
> partition failures will be reported as per-partition failed
> QueryResult objects instead of thrown exceptions. That
> allows us to successfully serve some partitions of the
> request even if others fail.
>
> 6: I added a note that updating the metadata APIs is left
> for future work.
>
> 7: nothing to do
>
> 8. I went with StateQueryRequest and StateQueryResponse.
>
> 9, 10: nothing to do.
>
> 11: Ah, I see. That's a good point, but it's not fundamental
> to the framework. I think we can tackle it when we propose
> the actual queries.
>
> 12: Cool. I went ahead and dropped the "serdesForStore"
> method. I think you're onto something there, and we should
> tackle it separately when we propose the actual queries.
>
>
>
>
> On Tue, 2021-11-16 at 15:59 -0800, Guozhang Wang wrote:
> > Thanks John! Some more thoughts inlined below.
> >
> > On Mon, Nov 15, 2021 at 10:07 PM John Roesler 
> wrote:
> >
> > > Thanks for the review, Guozhang!
> > >
> > > 1. This is a great point. I fell into the age-old trap of
> > > only considering the simplest store type and forgot about
> > > this extra wrinkle of the "key schema" that we use in
> > > Windowed and Session stores.
> > >
> > > Depending on how we want to forge forward with our provided
> > > queries, I think it can still work out ok. The simplest
> > > solution is just to have windowed versions of our queries
> > > for use on the windowed stores. That should work naively
> > > because we're basically just preserving the existing
> > > interactions. It might not be ideal in the long run, but at
> > > least it lets us make IQv2 orthogonal from other efforts to
> > > simplify the stores themselves.
> > >

Re: [DISCUSS] KIP-796: Interactive Query v2

2021-11-16 Thread Guozhang Wang
nature is:
>  KeyQuery implements Query
>
> So, when you use that query, it does bind R to V, and the
> result will be a QueryResult.
>
>
Cool thanks. My main confusion comes from the inconsistency of key-query
and scan-query. The former implements Query as:

KeyQuery implements Query:  => binds V to R, and K unbound

Whereas the latter implements as:

ScanQuery implements Query>: => binds
KeyValueIterator to R, whereas K/V both unbound



>
> 12. I considered doing exactly that. The reason I shied away
> from it in general is that if you're going to have a "raw"
> query API, you also need to know the key serde before you do
> a query (otherwise you can't query at all!). So, bundling a
> serde with the response only really applies to the value.
>
>
See the other comment above: my thinking is actually that, for Query we
would, potentially always, prefer to have it as in deserialized object
format (except for partial match, which we can discuss separately), we only
need to consider whether the QueryResult should be in raw or in
deserialized format.


> It still might be a good idea, but since I was thinking I
> already needed a separate discovery method for the key
> serde, then I might as well just keep the key and value
> serdes together, rather than bundling the value serde with
> each value.
>
> I do think it would be neat to have queries that don't
> deserialize the value by default and give you the option to
> do it on demand, or maybe just de-structure some parts of
> the value out (eg just reading the timestamp without
> deserializing the rest of the value). But, now that I've
> started to think about dropping the "raw" query design from
> the scope of this KIP, I'm wondering if we can just consider
> this use case later. It does seem plausible that we could
> choose to bundle the serdes with the values for those
> queries without needing a change in this KIP's framework, at
> least.
>
>
> Whew! Thanks again for the great thoughts. I'll make the
> changes I mentioned tomorrow. Please let me know if you
> disagree with any of my responses!
>
> Thanks,
> -John
>
> On Mon, 2021-11-15 at 17:29 -0800, Guozhang Wang wrote:
> > Hello John,
> >
> > Great, great, great writeup! :) And thank you for bringing this up
> finally.
> > I have made a pass on the KIP as well as the POC PR of it, here are some
> > initial thoughts:
> >
> > First are some meta ones:
> >
> > 1. Today the serdes do not only happen at the metered-store layer,
> > unfortunately. For windowed / sessioned stores, and also some newly added
> > ones for stream-stream joins that are optimized for time-based range
> > queries, for example, the serdes are actually composite at multiple
> layers.
> > And the queries on the outer interface are also translated with serde
> > wrapped / stripped along the way in layers. To be more specific, today
> our
> > store hierarchy is like this:
> >
> > metered * -> cached -> logged * -> formatted * (e.g. segmenged,
> > list-valued) -> inner (rocksdb, in-memory)
> >
> > and serdes today could happen on the layers with * above, where each
> layer
> > is stuffing a bit more as prefix/suffix into the query bytes. This is not
> > really by design or ideal, but a result of history accumulated tech
> debts..
> > There's a related JIRA ticket for it:
> > https://issues.apache.org/jira/browse/KAFKA-13286. I guess my point is
> that
> > we need to be a bit careful regarding how to implement the
> > `KafkaStreams#serdesForStore(storeName)`, as we may expect some bumpy
> roads
> > moving forward.
> >
> > 2. Related to 1 above, I think we cannot always delegate the `query()`
> > implementation to the `inner` store layer, since some serde, or even some
> > computation logic happens at the outer, especially the `formatted` layer.
> > For example, besides the cached layer, the `formatted` layer also needs
> to
> > make sure the `query` object is being appropriately translated
> beforeMaterialized
> > handing it off downstreams to the inner store, and also need to translate
> > the `queryResult` a bit while handing it upwards in the hierarchy.
> >
> > 3. As we add more query types in the IQv2, the inner store's `query`
> > instantiation may be getting very clumsy with a large "switch" condition
> on
> > all the possible query types. Although custom stores could consider only
> > supporting a few, having the `default` case to ignore all others,
> built-in
> > stores may still need to exhaust all possible types. I'm wondering if
> it's
> > a good trade-off to make `Query` be m

Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-16 Thread Guozhang Wang
Yeah I agree that checking a majority of voters support the
metadata.version is sufficient. What I was originally considering is
whether (in the future) we could consider encoding the metadata.version
value in the vote request as well, so that the elected leader is supposed
to have a version that's supported by a majority of the quorum.

Guozhang

On Tue, Nov 16, 2021 at 2:02 PM Colin McCabe  wrote:

> On Tue, Nov 16, 2021, at 13:36, Guozhang Wang wrote:
> > Hi Colin,
> >
> > If we allow downgrades which would be appended in metadata.version, then
> > the length of the __cluster_medata log may not be safe to indicate higher
> > versions, since older version records could still be appended later than
> a
> > new version record right?
> >
>
> That's fair... the longer log could be at a lower metadata.version.
>
> However, can you think of a scenario where the upgrade / downgrade logic
> doesn't protect us here?
>
> Checking that a majority of the voters support the new metadata.version
> we're going to seems to cover all the cases I can think of. I guess there
> could be time-of-check, time-of-use race conditions, but they only happen
> if you are doing a feature downgrade at the same time as a software version
> downgrade. This is something the cluster management software should not do
> (I guess we should spell this out in the KIP) I think nobody would want to
> do that since it would mean rolling the cluster while you're messing with
> feature flags.
>
> best,
> Colin
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-16 Thread Guozhang Wang
Hi Colin,

If we allow downgrades which would be appended in metadata.version, then
the length of the __cluster_medata log may not be safe to indicate higher
versions, since older version records could still be appended later than a
new version record right?

On Tue, Nov 16, 2021 at 1:16 PM Colin McCabe  wrote:

> On Tue, Nov 16, 2021, at 06:36, David Arthur wrote:
> > An interesting case here is how to handle a version update if a majority
> of
> > the quorum supports it, but the leader doesn't. For example, if C1 was
> the
> > leader and an upgrade to version 4 was requested. Maybe this would
> trigger
> > C1 to resign and inform the client to retry the update later.
> >
>
> Hmm, wouldn't we want to just reject the version update in this case? I
> don't see what the advantage of allowing it would be.
>
> The reason for requiring a majority rather than all voters is mainly to
> cover the case where a voter is down, I thought. That clearly doesn't apply
> if the un-upgraded voter is the leader itself...
>
> >
> > We may eventually want to consider the metadata.version when electing a
> > leader, but as long as we have the majority requirement before
> committing a
> > new metadata.version, I think we should be safe.
> >
>
> Yeah, this is safe. If we elect a leader at metadata.version X then that
> means that a majority of the cluster is at least at version X. Proof by
> contradiction: assume that this is not the case. Then the newly elected
> leader must have a shorter __cluster_metadata log than a majority of the
> voters. But this is incompatible with winning a Raft election.
>
> In the case where the leader is "behind" some of the other voters, those
> voters will truncate their logs to match the new leader. This will
> downgrade them. Basically this is the case where the feature upgrade was
> proposed, but never fully completed.
>
> best,
> Colin
>
>
> > -David
> >
> > On Mon, Nov 15, 2021 at 12:52 PM Guozhang Wang 
> wrote:
> >
> >> Thanks David,
> >>
> >> 1. Got it. One thing I'm still not very clear is why it's sufficient to
> >> select a metadata.version which is supported by majority of the quorum,
> but
> >> not the whole quorum (i.e. choosing the lowest version among all the
> quorum
> >> members)? Since the leader election today does not take this value into
> >> consideration, we are not guaranteed that newly selected leaders would
> >> always be able to recognize and support the initialized metadata.version
> >> right?
> >>
> >> 2. Yeah I think I agree the behavior-but-not-RPC-change scenario is
> beyond
> >> the scope of this KIP, we can defer it to later discussions.
> >>
> >> On Mon, Nov 15, 2021 at 8:13 AM David Arthur
> >>  wrote:
> >>
> >> > Guozhang, thanks for the review!
> >> >
> >> > 1, As we've defined it so far, the initial metadata.version is set by
> an
> >> > operator via the "kafka-storage.sh" tool. It would be possible for
> >> > different values to be selected, but only the quorum leader would
> commit
> >> a
> >> > FeatureLevelRecord with the version they read locally. See the above
> >> reply
> >> > to Jun's question for a little more detail.
> >> >
> >> > We need to enable the KRaft RPCs regardless of metadata.version (vote,
> >> > heartbeat, fetch, etc) so that the quorum can be formed, a leader can
> be
> >> > elected, and followers can learn about the selected metadata.version.
> I
> >> > believe the quorum startup procedure would go something like:
> >> >
> >> > * Controller nodes start, read their logs, begin leader election
> >> > * While the elected node is becoming leader (in
> >> > QuorumMetaLogListener#handleLeaderChange), initialize
> metadata.version if
> >> > necessary
> >> > * Followers replicate the FeatureLevelRecord
> >> >
> >> > This (probably) means the quorum peers must continue to rely on
> >> ApiVersions
> >> > to negotiate compatible RPC versions and these versions cannot depend
> on
> >> > metadata.version.
> >> >
> >> > Does this make sense?
> >> >
> >> >
> >> > 2, ApiVersionResponse includes the broker's supported feature flags as
> >> well
> >> > as the cluster-wide finalized feature flags. We probably need to add
> >> > something like the feature flag "epoch" to this response payload in
>

Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-16 Thread Guozhang Wang
Thanks, I think having the leader election to consider the metadata.version
would be a good idea moving forward, but we do not need to include in this
KIP.

On Tue, Nov 16, 2021 at 6:37 AM David Arthur
 wrote:

> Guozhang,
>
> 1. By requiring a majority of all controller nodes to support the version
> selected by the leader, we increase the likelihood that the next leader
> will also support it. We can't guarantee that all nodes definitely support
> the selected metadata.version because there could always be an offline or
> partitioned peer that is running old software.
>
> If a controller running old software manages elected, we hit this case:
>
>  In the unlikely event that an active controller encounters an unsupported
> > metadata.version, it should resign and terminate.
>
>
> So, given this, we should be able to eventually elect a controller that
> does support the metadata.version.
>
>
> Consider controllers C1, C2, C3 with this arrangement:
>
> Node  SoftwareVer MaxMetadataVer
> C13.2 1
> C23.3 4
> C33.3 4
>
> If the current metadata.version is 1 and we're trying to upgrade to 4, we
> would allow it since two of the three nodes support it. If any one
> controller is down while we are attempting an upgrade, we would require
> that both of remaining alive nodes support the target metadata.version
> (since we require a majority of _all_ controller nodes, not just alive
> ones).
>
> An interesting case here is how to handle a version update if a majority of
> the quorum supports it, but the leader doesn't. For example, if C1 was the
> leader and an upgrade to version 4 was requested. Maybe this would trigger
> C1 to resign and inform the client to retry the update later.
>
> We may eventually want to consider the metadata.version when electing a
> leader, but as long as we have the majority requirement before committing a
> new metadata.version, I think we should be safe.
>
> -David
>
> On Mon, Nov 15, 2021 at 12:52 PM Guozhang Wang  wrote:
>
> > Thanks David,
> >
> > 1. Got it. One thing I'm still not very clear is why it's sufficient to
> > select a metadata.version which is supported by majority of the quorum,
> but
> > not the whole quorum (i.e. choosing the lowest version among all the
> quorum
> > members)? Since the leader election today does not take this value into
> > consideration, we are not guaranteed that newly selected leaders would
> > always be able to recognize and support the initialized metadata.version
> > right?
> >
> > 2. Yeah I think I agree the behavior-but-not-RPC-change scenario is
> beyond
> > the scope of this KIP, we can defer it to later discussions.
> >
> > On Mon, Nov 15, 2021 at 8:13 AM David Arthur
> >  wrote:
> >
> > > Guozhang, thanks for the review!
> > >
> > > 1, As we've defined it so far, the initial metadata.version is set by
> an
> > > operator via the "kafka-storage.sh" tool. It would be possible for
> > > different values to be selected, but only the quorum leader would
> commit
> > a
> > > FeatureLevelRecord with the version they read locally. See the above
> > reply
> > > to Jun's question for a little more detail.
> > >
> > > We need to enable the KRaft RPCs regardless of metadata.version (vote,
> > > heartbeat, fetch, etc) so that the quorum can be formed, a leader can
> be
> > > elected, and followers can learn about the selected metadata.version. I
> > > believe the quorum startup procedure would go something like:
> > >
> > > * Controller nodes start, read their logs, begin leader election
> > > * While the elected node is becoming leader (in
> > > QuorumMetaLogListener#handleLeaderChange), initialize metadata.version
> if
> > > necessary
> > > * Followers replicate the FeatureLevelRecord
> > >
> > > This (probably) means the quorum peers must continue to rely on
> > ApiVersions
> > > to negotiate compatible RPC versions and these versions cannot depend
> on
> > > metadata.version.
> > >
> > > Does this make sense?
> > >
> > >
> > > 2, ApiVersionResponse includes the broker's supported feature flags as
> > well
> > > as the cluster-wide finalized feature flags. We probably need to add
> > > something like the feature flag "epoch" to this response payload in
> order
> > > to see which broker is most up to date.
> > >
> > > If the new feature flag version included RPC changes, we are helped by
> > the
> > > fact tha

Re: [DISCUSS] KIP-796: Interactive Query v2

2021-11-15 Thread Guozhang Wang
Hello John,

Great, great, great writeup! :) And thank you for bringing this up finally.
I have made a pass on the KIP as well as the POC PR of it, here are some
initial thoughts:

First are some meta ones:

1. Today the serdes do not only happen at the metered-store layer,
unfortunately. For windowed / sessioned stores, and also some newly added
ones for stream-stream joins that are optimized for time-based range
queries, for example, the serdes are actually composite at multiple layers.
And the queries on the outer interface are also translated with serde
wrapped / stripped along the way in layers. To be more specific, today our
store hierarchy is like this:

metered * -> cached -> logged * -> formatted * (e.g. segmenged,
list-valued) -> inner (rocksdb, in-memory)

and serdes today could happen on the layers with * above, where each layer
is stuffing a bit more as prefix/suffix into the query bytes. This is not
really by design or ideal, but a result of history accumulated tech debts..
There's a related JIRA ticket for it:
https://issues.apache.org/jira/browse/KAFKA-13286. I guess my point is that
we need to be a bit careful regarding how to implement the
`KafkaStreams#serdesForStore(storeName)`, as we may expect some bumpy roads
moving forward.

2. Related to 1 above, I think we cannot always delegate the `query()`
implementation to the `inner` store layer, since some serde, or even some
computation logic happens at the outer, especially the `formatted` layer.
For example, besides the cached layer, the `formatted` layer also needs to
make sure the `query` object is being appropriately translated before
handing it off downstreams to the inner store, and also need to translate
the `queryResult` a bit while handing it upwards in the hierarchy.

3. As we add more query types in the IQv2, the inner store's `query`
instantiation may be getting very clumsy with a large "switch" condition on
all the possible query types. Although custom stores could consider only
supporting a few, having the `default` case to ignore all others, built-in
stores may still need to exhaust all possible types. I'm wondering if it's
a good trade-off to make `Query` be more restricted on extensibility to
have less exploding query type space, e.g. if a Query can only be extended
with some predefined dimensions like:

* query-field: key, non-key (some field extractor from the value bytes need
to be provided)
* query-scope: single, range
* query-match-type (only be useful for a range scope): prefix-match (e.g.
for a range key query, the provided is only a prefix, and all keys
containing this prefix should be returned), full-match
* query-value-type: object, raw-bytes

4. What's the expected usage for the execution info? Is it only for logging
purposes? If yes then I think not enforcing any string format is fine, that
the store layers can just attach any string information that they feel
useful.

5. I do not find any specific proposals for exception handling, what would
that look like? E.g. besides all the expected error cases like non-active,
how would we communicate other unexpected error cases such as store closed,
IO error, bad query parameters, etc?

6. Since we do not deprecate any existing APIs in this KIP, it's a bit hard
for readers to understand what is eventually going to be covered by IQv2.
For example, we know that eventually `KafkaStreams#store` would be gone,
but what about `KafkaStreams#queryMetadataForKey`, and
`#streamsMetadataForStore`, and also `allLocalStorePartitionLags`? I think
it would be great to mention the end world state with IQv2 even if the KIP
itself would not deprecate anything yet.

7. It seems people are still a bit confused about the
"Position/PositionBound" topics, and personally I think it's okay to
exclude them in this KIP just to keep its (already large) scope smaller.
Also after we started implementing the KIP in full, we may have learned new
things while fighting the details in the weeds, and that would be a better
timing for us to consider new parameters such as bounds, but also caching
bypassing, and other potential features as well.

Some minor ones:

8. What about just naming the new classes as `StateQueryRequest/Result`, or
`StoreQueryRequest/Result`? The word "interactive" is for describing its
semantics in docs, but I feel for class names we can use a more meaningful
prefix.

9. Should the RawKeyQuery be extending `KeyQuery`, or directly
implementing `Query?

10. Why do we need the new class "InteractiveQuerySerdes" along with
existing classes? In your PR it seems just using `StateSerdes` directly.

11. Why do we have a new template type "R" in the QueryResult class in
addition to ""? Should R always be equal to V?

12. Related to 10/11 above, what about letting the QueryResult to always be
returning values in raw bytes, along with the serdes? And then it's up to
the callers whether they want the bytes to be immediately deserialized or
want them to be written somewhere and deserialized 

Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-15 Thread Guozhang Wang
iscussion of case C from the above table
> > which I can amend. Does that cover your concerns, or am I missing
> something
> > else?
> >
> >
> > > it's inconvenient for a user to manually upgrade every feature version
> >
> > For this, we would probably want to extend the capabilities of KIP-584. I
> > don't think anything we've discussed for KIP-778 will preclude us from
> > adding some kind of auto-upgrade in the future.
> >
> > 21, "disable" sounds good to me. I agree "delete feature-x" sounds a bit
> > weird.
> >
> >
> >
> > On Mon, Nov 8, 2021 at 8:47 PM Guozhang Wang  wrote:
> >
> >> Hello David,
> >>
> >> Thanks for the very nice writeup! It helped me a lot to refresh my
> memory
> >> on KIP-630/590/584 :)
> >>
> >> I just had two clarification questions after reading through the KIP:
> >>
> >> 1. For the initialization procedure, do we guarantee that all the quorum
> >> nodes (inactive candidates and leaders, a.k.a. controllers) would always
> >> initialize with the same metadata.version? If yes, how is that
> guaranteed?
> >> More specifically, when a quorum candidate is starting up, would it
> avoid
> >> handling any controller requests (including APIVersionsRequest) from its
> >> peers in the quorum until it completes reading the local log? And even
> if
> >> yes, what would happen if there's no FeatureLevelRecord found, and
> >> different nodes read different values from their local meta.properties
> >> file
> >> or initializing from their binary's hard-code values?
> >>
> >> 2. This is not only for the metadata.version itself, but for general
> >> feature.versions: when a version is upgraded / downgraded, since brokers
> >> would read the FeatureLevelRecord at their own pace, there will always
> be
> >> a
> >> race window when some brokers has processed the record and completed the
> >> upgrade while others have not, hence may behave differently --- I'm
> >> thinking for the future like the specific replica selector to allow
> >> fetching from follower, and even more advanced selectors --- i.e. should
> >> we
> >> consider letting clients to only talk to brokers with the highest
> metadata
> >> log offsets for example?
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >>
> >> On Fri, Nov 5, 2021 at 3:18 PM Jun Rao 
> wrote:
> >>
> >> > Hi, David,
> >> >
> >> > Thanks for the reply.
> >> >
> >> > 16. My first concern is that the KIP picks up meta.version
> >> inconsistently
> >> > during the deployment. If a new cluster is started, we pick up the
> >> highest
> >> > version. If we upgrade, we leave the feature version unchanged.
> >> > Intuitively, it seems that independent of how a cluster is deployed,
> we
> >> > should always pick the same feature version. I think we need to think
> >> > this through in this KIP. My second concern is that as a particular
> >> version
> >> > matures, it's inconvenient for a user to manually upgrade every
> feature
> >> > version. As long as we have a path to achieve that in the future, we
> >> don't
> >> > need to address that in this KIP.
> >> >
> >> > 21. "./kafka-features.sh delete": Deleting a feature seems a bit weird
> >> > since the logic is always there. Would it be better to use disable?
> >> >
> >> > Jun
> >> >
> >> > On Fri, Nov 5, 2021 at 8:11 AM David Arthur
> >> >  wrote:
> >> >
> >> > > Colin and Jun, thanks for the additional comments!
> >> > >
> >> > > Colin:
> >> > >
> >> > > > We've been talking about having an automated RPC compatibility
> >> checker
> >> > >
> >> > > Do we have a way to mark fields in schemas as deprecated? It can
> stay
> >> in
> >> > > the RPC, it just complicates the logic a bit.
> >> > >
> >> > > > It would be nice if the active controller could validate that a
> >> > majority
> >> > > of the quorum could use the proposed metadata.version. The active
> >> > > controller should have this information, right? If we don't have
> >> recent
> >> > > information  from a quorum o

Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-08 Thread Guozhang Wang
Hello David,

Thanks for the very nice writeup! It helped me a lot to refresh my memory
on KIP-630/590/584 :)

I just had two clarification questions after reading through the KIP:

1. For the initialization procedure, do we guarantee that all the quorum
nodes (inactive candidates and leaders, a.k.a. controllers) would always
initialize with the same metadata.version? If yes, how is that guaranteed?
More specifically, when a quorum candidate is starting up, would it avoid
handling any controller requests (including APIVersionsRequest) from its
peers in the quorum until it completes reading the local log? And even if
yes, what would happen if there's no FeatureLevelRecord found, and
different nodes read different values from their local meta.properties file
or initializing from their binary's hard-code values?

2. This is not only for the metadata.version itself, but for general
feature.versions: when a version is upgraded / downgraded, since brokers
would read the FeatureLevelRecord at their own pace, there will always be a
race window when some brokers has processed the record and completed the
upgrade while others have not, hence may behave differently --- I'm
thinking for the future like the specific replica selector to allow
fetching from follower, and even more advanced selectors --- i.e. should we
consider letting clients to only talk to brokers with the highest metadata
log offsets for example?


Guozhang




On Fri, Nov 5, 2021 at 3:18 PM Jun Rao  wrote:

> Hi, David,
>
> Thanks for the reply.
>
> 16. My first concern is that the KIP picks up meta.version inconsistently
> during the deployment. If a new cluster is started, we pick up the highest
> version. If we upgrade, we leave the feature version unchanged.
> Intuitively, it seems that independent of how a cluster is deployed, we
> should always pick the same feature version. I think we need to think
> this through in this KIP. My second concern is that as a particular version
> matures, it's inconvenient for a user to manually upgrade every feature
> version. As long as we have a path to achieve that in the future, we don't
> need to address that in this KIP.
>
> 21. "./kafka-features.sh delete": Deleting a feature seems a bit weird
> since the logic is always there. Would it be better to use disable?
>
> Jun
>
> On Fri, Nov 5, 2021 at 8:11 AM David Arthur
>  wrote:
>
> > Colin and Jun, thanks for the additional comments!
> >
> > Colin:
> >
> > > We've been talking about having an automated RPC compatibility checker
> >
> > Do we have a way to mark fields in schemas as deprecated? It can stay in
> > the RPC, it just complicates the logic a bit.
> >
> > > It would be nice if the active controller could validate that a
> majority
> > of the quorum could use the proposed metadata.version. The active
> > controller should have this information, right? If we don't have recent
> > information  from a quorum of voters, we wouldn't be active.
> >
> > I believe we should have this information from the ApiVersionsResponse.
> It
> > would be good to do this validation to avoid a situation where a
> > quorum leader can't be elected due to unprocessable records.
> >
> > > Do we need delete as a command separate from downgrade?
> >
> > I think from an operator's perspective, it is nice to distinguish between
> > changing a feature flag and unsetting it. It might be surprising to an
> > operator to see the flag's version set to nothing when they requested the
> > downgrade to version 0 (or less).
> >
> > > it seems like we should spell out that metadata.version begins at 1 in
> > KRaft clusters
> >
> > I added this text:
> >
> > Introduce an IBP version to indicate the lowest software version that
> > > supports *metadata.version*. Below this IBP, the *metadata.version* is
> > > undefined and will not be examined. At or above this IBP, the
> > > *metadata.version* must be *0* for ZooKeeper clusters and will be
> > > initialized as *1* for KRaft clusters.
> >
> >
> > > We probably also want an RPC implemented by both brokers and
> controllers
> > that will reveal the min and max supported versions for each feature
> level
> > supported by the server
> >
> > This is available in ApiVersionsResponse (we include the server's
> supported
> > features as well as the cluster's finalized features)
> >
> > 
> >
> > Jun:
> >
> > 12. I've updated the KIP with AdminClient changes
> >
> > 14. You're right, it looks like I missed a few sections regarding
> snapshot
> > generation. I've corrected it
> >
> > 16. This feels more like an enhancement to KIP-584. I agree it could be
> > useful, but perhaps we could address it separately from KRaft upgrades?
> >
> > 20. Indeed snapshots are not strictly necessary during an upgrade, I've
> > reworded this
> >
> >
> > Thanks!
> > David
> >
> >
> > On Thu, Nov 4, 2021 at 6:51 PM Jun Rao  wrote:
> >
> > > Hi, David, Jose and Colin,
> > >
> > > Thanks for the reply. A few more comments.
> > >
> > > 12. It seems that we 

Re: [VOTE] KIP-791: Add Record Metadata to State Store Context

2021-11-08 Thread Guozhang Wang
+1, thanks Patrick!


Guozhang

On Mon, Nov 8, 2021 at 5:44 AM Vasiliki Papavasileiou
 wrote:

> Hi Patrick,
>
> Having the recordMetadata available in the state stores is fundamental for
> the consistency work and the proposed approach is reasonable.
>
> +1 (non-binding)
>
> Thank you,
> Vicky
>
> On Mon, Nov 8, 2021 at 10:00 AM Luke Chen  wrote:
>
> > Hi Patrick,
> > Thanks for the KIP.
> > Adding RecordMetadata into StateStoreContext for offset updating makes
> > sense to me.
> >
> > +1 (non-binding)
> >
> > Thank you.
> > Luke
> >
> >
> > On Mon, Nov 8, 2021 at 5:18 PM Patrick Stuedi
>  > >
> > wrote:
> >
> > > Hi all,
> > >
> > > Thanks for the feedback on KIP-791, I have updated the KIP and would
> like
> > > to start the voting.
> > >
> > > The KIP can be found here:
> > > https://cwiki.apache.org/confluence/x/I5BnCw
> > >
> > > Please vote in this thread.
> > >
> > > Thanks!
> > > -Patrick
> > >
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-791: Add Record Metadata to State Store Context

2021-11-03 Thread Guozhang Wang
Thanks Patrick,

I looked at the KIP and it looks good to me overall. I think we need to
double check whether the record metadata reflect the "last processed
record" or the "currently processed record" where the latter may not have
been completely processed. In `ProcessorContext#recordMetadata` it returns
the latter, but that may not be the preferred case if you want to build the
consistency reasoning on top of.

Otherwise, LGTM.


Guozhang

On Wed, Nov 3, 2021 at 1:44 PM Patrick Stuedi 
wrote:

> Hi everyone,
>
> I would like to start the discussion for KIP-791: Add Record Metadata to
> State Store Context.
>
> The KIP can be found here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-791:+Add+Record+Metadata+to+State+Store+Context
>
> Any feedback will be highly appreciated.
>
> Many thanks,
>  Patrick
>


-- 
-- Guozhang


Re: Wiki Permissions

2021-11-03 Thread Guozhang Wang
Hello Diego,

I saw your id has already been in the contributors list.

Cheers,
Guozhang

On Wed, Nov 3, 2021 at 10:02 AM Diego Erdody  wrote:

> Hello,
>
> Can I please have "permissions to contribute to Apache Kafka".
> Context: propose a new KIP.
> User (both jira and wiki): erdody.
> Thanks!
>
> Diego
>


-- 
-- Guozhang


Re: [VOTE] Add TaskId field to StreamsException

2021-10-19 Thread Guozhang Wang
Thanks for the KIP Sophie, I'm big +1 on this idea :)

On Tue, Oct 19, 2021 at 6:48 AM Bill Bejeck  wrote:

> Thanks for the KIP Sophie, I think this will be helpful.
>
> +1(binding)
>
> -Bill
>
> On Mon, Oct 18, 2021 at 5:04 PM John Roesler  wrote:
>
> > Thanks, Sophie,
> >
> > +1 from me.
> >
> > I was a little apprehensive about the potential to break
> > exception handler logic, but I agree with your comment in
> > the "compatibility" section: Identifying the causes of
> > exceptions is always kind of chaotic and this seems like
> > more of an improvement than anything.
> >
> > Regardless, it'll be really nice to get some information
> > about the context of the exception.
> >
> > Thanks!
> > -John
> >
> > On Mon, 2021-10-18 at 13:56 -0500, Walker Carlson wrote:
> > > Hey Sophie,
> > >
> > > +1 for me, I think that is would only help.
> > >
> > > Walker
> > >
> > > On Mon, Oct 18, 2021 at 1:45 AM Luke Chen  wrote:
> > >
> > > > Hi Sophie,
> > > > Add taskId to make the exception much clear is a good improvement.
> > > > + 1 (non-binding)
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Mon, Oct 18, 2021 at 12:10 PM Sophie Blee-Goldman
> > > >  wrote:
> > > >
> > > > > Hey all,
> > > > >
> > > > > I'd like to kick off the vote on this small KIP which adds a TaskId
> > field
> > > > > to the StreamsException class. Please take a look and cast your
> vote
> > when
> > > > > you have a chance.
> > > > >
> > > > > Links:
> > > > >
> > > > >- KIP-783: Add TaskId field to StreamsException
> > > > >
> > > > >- PR #11405 
> > > > >
> > > > >
> > > > > Thanks!
> > > > > Sophie
> > > > >
> > > >
> >
> >
> >
>


-- 
-- Guozhang


[jira] [Resolved] (KAFKA-13319) Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty

2021-10-12 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13319.
---
Fix Version/s: 3.1.0
 Assignee: Guozhang Wang  (was: Ryan)
   Resolution: Fixed

> Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty
> ---
>
> Key: KAFKA-13319
> URL: https://issues.apache.org/jira/browse/KAFKA-13319
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>    Assignee: Guozhang Wang
>Priority: Major
>  Labels: newbie
> Fix For: 3.1.0
>
>
> If a user calls `Producer.sendOffsetsToTransaction` with an empty map of 
> offsets, we can shortcut return and skip the logic to add the offsets topic 
> to the transaction. The main benefit is avoiding the unnecessary accumulation 
> of markers in __consumer_offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13371) Consider consolidating Joined / StreamJoined / TableJoined

2021-10-12 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13371:
-

 Summary: Consider consolidating Joined / StreamJoined / TableJoined
 Key: KAFKA-13371
 URL: https://issues.apache.org/jira/browse/KAFKA-13371
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


This is an idea while reviewing KAFKA-13261 (adding TabledJoined). We have now 
three control objects: Joined, StreamJoined, TableJoined. All of them extends 
NamedOperations and hence has the `name` field inherited which would be used 
for the processor node's name and potentially store names. In addition to that

* Joined: used in stream-table joins. Contains key and two value serdes used 
for serializing the bytes for repartitioning (however since today we only 
repartition one side if needed, the other value serde is never used). 
* StreamJoined: used in stream-stream joins. It includes the serdes, AND also 
the store suppliers and other control variables on the store names.
* TableJoined: used in table-table foreign key joins. It does not include any 
serdes but includes the partitioner information.

The main difference between these different constructs are:

* KTables themselves have embedded a materialized mechanism via 
`valueGetterSupplier` whenever they are created, either from source, or from 
aggregate / join operators, so they do not need extra materialization 
indicators when participated in a follow-up join --- i.e. they either are 
already materialized from the operators that generate them, or they will 
"grandfather" back to the upstream KTable on the fly with a logical view when 
that view is being fetched via the `ValueGetterSupplier`. On the other hand, 
KStreams do not have materialization mechanism inherently and hence operators 
that do need to materialize the streams then need to provide such methods.
* Table-table foreign-key join has a special needs for partitioners.

[~vvcephei] has a good proposal for 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar and 
as part of that proposal we could consider adding partitioner for source 
streams / tables and inherit throughout the topology pipeline. Following that 
idea, we can consider consolidating the above "Joined" objects by isolating the 
materialization / partitioner variables. More specifically, here's a concrete 
proposal:

1) `StreamsBuilder.table/stream` would pass in an optional partitioner.
2) And similarly all operators that changes the key would allow an optional 
partitioner:
2.a) `KStream.repartition/groupBy` and `KTable.groupBy` would allow an optional 
partitioner in `Repartitioned`, as piggy-backed we would also deprecate 
`Grouped` with `Repartitioned` since the latter would subsume the former.
2.b) `KStream.map/flatMap/selectKey` stays as is, and similar to serdes, these 
operators would stop the inheritance of partitioners of the upstream entities.
3) `Repartition` would also add the key/value serdes used for serializing for 
the repartition topics.
4) `KStream.join(KTable)` and `KStream.join(KStream)` would pass in an optional 
`Repartitioned` in addition to `Joined` which can be used to encode the 
partitioner info.
5) Foreign-key `KTable.join(KTable)` would pass in an optional `Repartitioned` 
which can be used to encode the partitioner info.
7) As a result of all above points, we can then reduce `StreamJoined` / 
`TableJoined` / `Joined` since all their enwrapped control objects are not 
separated in `Repartitioned` and `Materialized`: note that for `StreamJoined`, 
the store suppliers / names / configs would now be wrapped in two Materialized 
objects which would still not be exposed for IQ.







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning

2021-10-08 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13268.
---
Resolution: Duplicate

> Add more integration tests for Table Table FK joins with repartitioning
> ---
>
> Key: KAFKA-13268
> URL: https://issues.apache.org/jira/browse/KAFKA-13268
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>    Reporter: Guozhang Wang
>Assignee: Victoria Xia
>Priority: Major
>
> We should add to the FK join multipartition integration test with a 
> Repartitioned for:
> 1) just the new partition count
> 2) a custom partitioner
> This is to test if there's a bug where the internal topics don't pick up a 
> partitioner provided that way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13356) Use "delete" retention policy only for stream-stream join windowed stores

2021-10-06 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13356:
-

 Summary: Use "delete" retention policy only for stream-stream join 
windowed stores
 Key: KAFKA-13356
 URL: https://issues.apache.org/jira/browse/KAFKA-13356
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today stream-stream join associated window stores, like any other window 
stores, use "delete,compact" as their retention policies. However, since today 
we add sequence number to disable de-duplication of keys, "compaction" would 
never be able to compact any keys, but only result in 1) CPU waste on the 
cleaner thread on brokers, 2) some additional feature of brokers that relies on 
"delete" policy to not be able to apply.

Until we change the store format potentially in the future to not use sequence 
number for disable de-duping, we could consider just changing the policy to 
"delete" for stream-stream join's window store for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator

2021-10-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13349:
-

 Summary: Allow Iterator.remove on KeyValueIterator
 Key: KAFKA-13349
 URL: https://issues.apache.org/jira/browse/KAFKA-13349
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today Stream's state store's range iterator does not support `remove`. We could 
consider adding such support for all the built-in state stores:

* RocksDB's native iterator does not support removal, but we can always do a 
delete(key) concurrently while the iterator is open on the snapshot.
* In-Memory: straight forward implementation.

The benefit of that is then for range-and-delete truncation operation we do not 
necessarily have to be cautious about concurrent modification exceptions. This 
could also help GC with in-memory stores.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-775: Custom partitioners in foreign key joins

2021-09-29 Thread Guozhang Wang
+1. Thanks Victoria!

On Tue, Sep 28, 2021 at 2:40 PM John Roesler  wrote:

> +1 (binding)
>
> Thanks, Victoria!
>
> -John
>
> On Tue, Sep 28, 2021, at 16:29, Adam Bellemare wrote:
> > +1 (non-binding)
> >
> > Glad to see this in here :)
> >
> > On Tue, Sep 28, 2021 at 5:11 PM Bill Bejeck  wrote:
> >
> >> +1 (binding)
> >>
> >> On Tue, Sep 28, 2021 at 12:59 PM Matthias J. Sax 
> wrote:
> >>
> >> > +1 (binding)
> >> >
> >> > On 9/28/21 8:29 AM, Victoria Xia wrote:
> >> > > Hi all,
> >> > >
> >> > > I'd like to start a vote for KIP-775 for adding Kafka Streams
> support
> >> for
> >> > > foreign key joins on tables with custom partitioners:
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins
> >> > >
> >> > > Thanks,
> >> > > Victoria
> >> > >
> >> >
> >>
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-775: Custom partitioners in foreign key joins

2021-09-22 Thread Guozhang Wang
Thanks Victoria for writing the KIP! I think this is a miss when we
designed KIP-213 and should be fixed in syntax. Regarding how large its
scope should be, here's my thoughts:

1) Today Streams does not take any indicator on how the input stream/table
are partitioned, instead it simply assumes that the input stream/table is
always partitioned by the key already. This is by-design that we let the
users to make sure this is always the case; for internal repartitioning
like `groupBy`s, Streams does guarantee the the repartition topics are
partitioned by the new grouping keys, but nevertheless the assumption still
holds: for all stream/table entities defined from a input topic (whether
external or internal), that topic is partitioned by the stream/table key,
and hence at the moment we do not require any partitioners to be passed in
since we do not need it.

2) For all operators that need to write to a partition, today the
partitioners are either defined by the operator logic itself (think:
groupBys, where partitioner is hard-coded as by the grouping-key), or
user-customized (think: the #through/to APIs). We do not have any scenarios
where we need to "inherit" partitioners from parent operators, until
FK-joins. Here, we need to make sure: a) the left table A's input topic and
the internal response topic are co-partitioned; b) the right table B's
input topic and the internal subscription topic are co-partitioned. Of
course, if both left table A and right table B's input topic are
partitioned by the default partitioner, then both holds. But the assumption
above only requires that the "topic is partitioned by the key", not
requiring "the topic is partitioned by the key, following exactly the
default partitioner mechanism" (e.g. in
https://issues.apache.org/jira/browse/KAFKA-13261, the issue arises when a
different partitioner which is not based on hashing of the bytes is used,
which still guarantees "the input topic is partitioned by key").

So I feel that if we feel the assumption 1) above in Streams should still
hold in the long run, it's not very meaningful to require the source tables
to indicate their partitioners, but only require the FK join operators
itself to make sure the co-partition conditions a) and b) above holds. Of
course the easiest way is to require users to pass-in the partitioners for
those two internal topics, which they have to make sure are the same as the
input partitioners. We can also have a bit more complicated approach to
have some "inheritance" rule for partitioners when they are given at the
sink (we have similar rules for serde inheritance throughout the topology),
but that only fixes for "#through / #repartition" cases, but not fixes for
source "builder#table" cases -- i.e. we still need to require users to
indicate partitioners which we can hopefully inherit within the topology.

I agree that for IQ, it would be beneficial if we have the partitioner for
each table/stream entities so that IQ itself does not need the partitioners
to be specified, but to make this fix work, we'd probably need both 1)
source table/stream partitioner specification and 2) inheritance rule
(otherwise we'd have to enforce users to specify the same for #repartition
etc as well?). Given that rationale, I'm slightly leaning towards the
current proposal in the KIP doc, i.e. just fixing for this FK operator
only, with the easy approach that requires users themselves to set
partitioners accordingly.



Guozhang



On Wed, Sep 22, 2021 at 10:29 AM John Roesler  wrote:

> Thanks for the KIP, Victoria!
>
> This is a great catch. As fas as my memory serves, we
> totally missed this case in the design of FK joins.
>
> tl;dr: I'm wondering if it might be better to instead
> introduce a way to specify the partitioning scheme when we
> create a KTable and then just use it when we join, rather
> than to specify it in the join itself.
>
> I was initially surprised to see the proposal to add these
> partitioners in the join operation itself rather than
> inheriting them from the tables on both sides, but I
> reviewed the Streams DSL and see that we _cannot_ inherit it
> because almost all the time, the input KTables' partitioning
> is not known!
>
> It seems like the only times we specify a partitioner on a
> DSL object is:
> 1. when we produce to an output topic via KStream#to or
> KStream#through.
> 2. when we repartition via KStream#repartition
>
> These are both specifying the partitioner to use in output
> operations (ie, we are telling Streams the partition *to
> use*); there's currently only one situation in which we have
> to _inform_ streams about the partitioning of a KTable or
> KStream:
> 3. when we issue a key-based query via IQ, we need to know
> the partitioner, so the IQ interface allows us to pass in a
> custom partitioner with the query.
>
> This is a bit weird. Taking a step back, the partitioning
> scheme is a property of the table itself, not of the query
> (or the join). Specifying a table property 

Re: [ANNOUNCE] Apache Kafka 3.0.0

2021-09-22 Thread Guozhang Wang
Kudos to Konstantine! Congrats to everyone.

On Tue, Sep 21, 2021 at 9:01 AM Konstantine Karantasis <
kkaranta...@apache.org> wrote:

> The Apache Kafka community is pleased to announce the release for Apache
> Kafka 3.0.0
>
> It is a major release that includes many new features, including:
>
> * The deprecation of support for Java 8 and Scala 2.12.
> * Kafka Raft support for snapshots of the metadata topic and other
> improvements in the self-managed quorum.
> * Deprecation of message formats v0 and v1.
> * Stronger delivery guarantees for the Kafka producer enabled by default.
> * Optimizations in OffsetFetch and FindCoordinator requests.
> * More flexible MirrorMaker 2 configuration and deprecation of MirrorMaker
> 1.
> * Ability to restart a connector's tasks on a single call in Kafka Connect.
> * Connector log contexts and connector client overrides are now enabled by
> default.
> * Enhanced semantics for timestamp synchronization in Kafka Streams.
> * Revamped public API for Stream's TaskId.
> * Default serde becomes null in Kafka Streams and several other
> configuration changes.
>
> You may read a more detailed list of features in the 3.0.0 blog post:
> https://blogs.apache.org/kafka/
>
> All of the changes in this release can be found in the release notes:
> https://downloads.apache.org/kafka/3.0.0/RELEASE_NOTES.html
>
> You can download the source and binary release (Scala 2.12 and 2.13) from:
> https://kafka.apache.org/downloads#3.0.0
>
>
> ---
>
>
> Apache Kafka is a distributed streaming platform with four core APIs:
>
>
> ** The Producer API allows an application to publish a stream of records to
> one or more Kafka topics.
>
> ** The Consumer API allows an application to subscribe to one or more
> topics and process the stream of records produced to them.
>
> ** The Streams API allows an application to act as a stream processor,
> consuming an input stream from one or more topics and producing an
> output stream to one or more output topics, effectively transforming the
> input streams to output streams.
>
> ** The Connector API allows building and running reusable producers or
> consumers that connect Kafka topics to existing applications or data
> systems. For example, a connector to a relational database might
> capture every change to a table.
>
>
> With these APIs, Kafka can be used for two broad classes of application:
>
> ** Building real-time streaming data pipelines that reliably get data
> between systems or applications.
>
> ** Building real-time streaming applications that transform or react
> to the streams of data.
>
>
> Apache Kafka is in use at large and small companies worldwide, including
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
>
> A big thank you for the following 141 authors and reviewers to this
> release!
>
> A. Sophie Blee-Goldman, Adil Houmadi, Akhilesh Dubey, Alec Thomas,
> Alexander Iskuskov, Almog Gavra, Alok Nikhil, Alok Thatikunta, Andrew Lee,
> Bill Bejeck, Boyang Chen, Bruno Cadonna, CHUN-HAO TANG, Cao Manh Dat, Cheng
> Tan, Chia-Ping Tsai, Chris Egerton, Colin P. McCabe, Cong Ding, Daniel
> Urban, Daniyar Yeralin, David Arthur, David Christle, David Jacot, David
> Mao, David Osvath, Davor Poldrugo, Dejan Stojadinović, Dhruvil Shah, Diego
> Erdody, Dong Lin, Dongjoon Hyun, Dániel Urbán, Edoardo Comar, Edwin Hobor,
> Eric Beaudet, Ewen Cheslack-Postava, Gardner Vickers, Gasparina Damien,
> Geordie, Greg Harris, Gunnar Morling, Guozhang Wang, Gwen (Chen) Shapira,
> Ignacio Acuña Frías, Igor Soarez, Ismael Juma, Israel Ekpo, Ivan Ponomarev,
> Ivan Yurchenko, Jason Gustafson, Jeff Kim, Jim Galasyn, Jim Hurne, JoelWee,
> John Gray, John Roesler, Jorge Esteban Quilcate Otoya, Josep Prat, José
> Armando García Sancio, Juan Gonzalez-Zurita, Jun Rao, Justin Mclean,
> Justine Olshan, Kahn Cheny, Kalpesh Patel, Kamal Chandraprakash,
> Konstantine Karantasis, Kowshik Prakasam, Leah Thomas, Lee Dongjin, Lev
> Zemlyanov, Liu Qiang, Lucas Bradstreet, Luke Chen, Manikumar Reddy, Marco
> Aurelio Lotz, Matthew de Detrich, Matthias J. Sax, Michael G. Noll, Michael
> Noll, Mickael Maison, Nathan Lincoln, Niket Goel, Nikhil Bhatia, Omnia G H
> Ibrahim, Peng Lei, Phil Hardwick, Rajini Sivaram, Randall Hauch, Rohan
> Desai, Rohit Deshpande, Rohit Sachan, Ron Dagostino, Ryan Dielhenn, Ryanne
> Dolan, Sanjana Kaundinya, Sarwar Bhuiyan, Satish Duggana, Scott Hendricks,
> Sergio Peña, Shao Yang Hong, Shay Elkin, Stanislav Vodetskyi, Sven Erik
> Knop, Tom Bentley, UnityLung, Uwe Eisele, Vahid Has

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-09-20 Thread Guozhang Wang
Hi Sagar,

Thanks for the added metrics, about its name, if it is proposed as a
task-level config, then we do not need to prefix its name as `task-`. But
on the other hand, it's better to give the full description of the metrics,
like its type name / tag maps / recording levels etc, an example is here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB

Guozhang

On Mon, Sep 20, 2021 at 10:04 AM Sagar  wrote:

> Hi All,
>
> Bumping this thread again.
>
> Thanks!
> Sagar.
>
> On Sat, Sep 11, 2021 at 2:04 PM Sagar  wrote:
>
> > Hi Mathias,
> >
> > I missed out on the metrics part.
> >
> > I have added the new metric in the proposed changes section along with
> the
> > small re-wording that you talked about.
> >
> > Let me know if that makes sense.
> >
> > Thanks!
> > Sagar.
> >
> > On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax 
> wrote:
> >
> >> Thanks for the KIP.
> >>
> >> There was some discussion about adding a metric on the thread, but the
> >> KIP does not contain anything about it. Did we drop this suggestion or
> >> was the KIP not updated accordingly?
> >>
> >>
> >> Nit:
> >>
> >> > This would be a global config applicable per processing topology
> >>
> >> Can we change this to `per Kafka Streams instance.`
> >>
> >> Atm, a Stream instance executes a single topology, so it does not make
> >> any effective difference right now. However, it seems better (more
> >> logical) to bind the config to the instance (not the topology the
> >> instance executes).
> >>
> >>
> >> -Matthias
> >>
> >> On 9/2/21 6:08 AM, Sagar wrote:
> >> > Thanks Guozhang and Luke.
> >> >
> >> > I have updated the KIP with all the suggested changes.
> >> >
> >> > Do you think we could start voting for this?
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen  wrote:
> >> >
> >> >> Thanks for the KIP. Overall LGTM.
> >> >>
> >> >> Just one thought, if we "rename" the config directly as mentioned in
> >> the
> >> >> KIP, would that break existing applications?
> >> >> Should we deprecate the old one first, and make the old/new names
> >> co-exist
> >> >> for some period of time?
> >> >>
> >> >> Public Interfaces
> >> >>
> >> >>- Adding a new config *input.buffer.max.bytes *applicable at a
> >> topology
> >> >>level. The importance of this config would be *Medium*.
> >> >>- Renaming *cache.max.bytes.buffering* to
> >> *statestore.cache.max.bytes*.
> >> >>
> >> >>
> >> >>
> >> >> Thank you.
> >> >> Luke
> >> >>
> >> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang 
> >> wrote:
> >> >>
> >> >>> Currently the state store cache size default value is 10MB today,
> >> which
> >> >>> arguably is rather small. So I'm thinking maybe for this config
> >> default
> >> >> to
> >> >>> 512MB.
> >> >>>
> >> >>> Other than that, LGTM.
> >> >>>
> >> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar 
> >> >> wrote:
> >> >>>
> >> >>>> Thanks Guozhang and Sophie.
> >> >>>>
> >> >>>> Yeah a small default value would lower the throughput. I didn't
> quite
> >> >>>> realise it earlier. It's slightly hard to predict this value so I
> >> would
> >> >>>> guess around 1/2 GB to 1 GB? WDYT?
> >> >>>>
> >> >>>> Regarding the renaming of the config and the new metric, sure would
> >> >>> include
> >> >>>> it in the KIP.
> >> >>>>
> >> >>>> Lastly, importance would also. be added. I guess Medium should be
> ok.
> >> >>>>
> >> >>>> Thanks!
> >> >>>> Sagar.
> >> >>>>
> >> >>>>
> >> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> >> >>>>  wrote:
> >> >>>>
&

[jira] [Resolved] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13301.
---
Fix Version/s: 3.1.0
   Resolution: Fixed

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Fix For: 3.1.0
>
> Attachments: image-2021-09-15-15-37-25-561.png, 
> image-2021-09-15-15-39-00-179.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded

2021-09-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13216.
---
Resolution: Fixed

> Streams left/outer joins cause new internal changelog topic to grow unbounded
> -
>
> Key: KAFKA-13216
> URL: https://issues.apache.org/jira/browse/KAFKA-13216
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sergio Peña
>    Assignee: Guozhang Wang
>Priority: Critical
> Fix For: 3.1.0
>
>
> This bug is caused by the improvements made in 
> https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with 
> stream-stream left/outer joins. The issue is only caused when a stream-stream 
> left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` 
> API that specifies the window time + grace period. This new API was added in 
> AK 3.0. No previous users are affected.
> The issue causes that the internal changelog topic used by the new 
> OUTERSHARED window store keeps growing unbounded as new records come. The 
> topic is never cleaned up nor compacted even if tombstones are written to 
> delete the joined and/or expired records from the window store. The problem 
> is caused by a parameter required in the window store to retain duplicates. 
> This config causes that tombstones records have a new sequence ID as part of 
> the key ID in the changelog making those keys unique. Thus causing the 
> cleanup policy not working.
> In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of 
> {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old 
> semantics and is thus not affected while the new API enable the new 
> semantics; the problem is that we deprecated the old API and thus tell users 
> that they should switch to the new broken API.
> We have two ways forward:
>  * Fix the bug (non trivial)
>  * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to 
> use the new but broken API)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS

2021-09-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13249.
---
Fix Version/s: 3.1.0
   Resolution: Fixed

> Checkpoints do not contain latest offsets on shutdown when using EOS
> 
>
> Key: KAFKA-13249
> URL: https://issues.apache.org/jira/browse/KAFKA-13249
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0, 2.7.0, 2.8.0
>Reporter: Oliver Hutchison
>Assignee: Oliver Hutchison
>Priority: Major
> Fix For: 3.1.0
>
>
> When using EOS the {{.checkpoint}} file created when a stateful streams app 
> is shutdown does not always contain changelog offsets which represent the 
> latest state of the state store. The offsets can often be behind the end of 
> the changelog - sometimes quite significantly.
> This leads to a state restore being required when the streams app restarts 
> after shutting down cleanly as streams thinks (based on the incorrect offsets 
> in the checkpoint) that the state store is not up to date with the changelog. 
> This is increasing the time we see it takes to do a clean restart of a single 
> instance streams app from around 10 second to sometime over 2 minutes in our 
> case.
> I suspect the bug appears because an assumption about the {{commitNeeded}} 
> field in the following method in {{StreamTask}}:
> {code:java}
> protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
>   // commitNeeded indicates we may have processed some records since last 
> commit
>   // and hence we need to refresh checkpointable offsets regardless whether 
> we should checkpoint or not
>   if (commitNeeded) {
> stateMgr.updateChangelogOffsets(checkpointableOffsets());
>   }
>   super.maybeWriteCheckpoint(enforceCheckpoint);
> }
> {code}
> In a steady state case for a simple single instance single thread stream app 
> where an app simply starts, runs and then shuts down the {{if 
> (commitNeeded)}} test always fails when running with EOS which results in the 
> latest checkpoint offsets never getting updated into the {{stateMgr}}.
> Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this 
> is the case as there's only 1 place in the code which calls 
> {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final 
> boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} 
> state.
> {code:java}
> case RUNNING:
>   if (enforceCheckpoint || !eosEnabled) {
> maybeWriteCheckpoint(enforceCheckpoint);
>   }
>   log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", 
> state(), eosEnabled, enforceCheckpoint);
>   break;
> {code}
> We can see from this code that {{maybeWriteCheckpoint}} will only ever to 
> called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as 
> we're running with EOS.
> So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? 
> Again looking only at the steady state case we find that it's only called 
> from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from 
> {{TaskManager.shutdown}}.
> The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it 
> happens *after* all active tasks have commited. Which means that 
> {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test 
> back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the 
> latest offsets stored into the state manager.
> I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to 
> be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always 
> update the changelog offserts before we write the checkpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


<    1   2   3   4   5   6   7   8   9   10   >