[jira] [Commented] (KAFKA-10219) KStream API support for multiple cluster broker
[ https://issues.apache.org/jira/browse/KAFKA-10219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17148818#comment-17148818 ] Boyang Chen commented on KAFKA-10219: - Thanks for the proposal, the use case you proposed is reasonable. However, we need to better clarify the feature we are going to introduce and the challenges we are facing, such as: 1. What does "multiple clusters" suggest? Do we support all input topics in cluster A and all output topics in cluster B, or a mixing of topics in random cluster A, B, C which needs to be automatically detected by Streams? 2. How do we allocate internal topics? Which cluster should the changelog/repartition topics go to, input topic cluster, or the output one? 3. How do we support Exactly-once? Right now the entire framework assumes a single cluster context. When switching to multiple cluster, we could no longer guarantee exactly-once because we may spam our transaction across multiple clusters, and we don't have a centralized coordinator to track the progress. > KStream API support for multiple cluster broker > --- > > Key: KAFKA-10219 > URL: https://issues.apache.org/jira/browse/KAFKA-10219 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sachin Kurle >Priority: Major > > we are trying to consume from cluster A broker from KStream api and produce > to cluster B broker.. we have configuration as boot strap server in consumer > and producer configuration but kstream api is picking randomly bootstrap > server cluster A or B -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9009) Flaky Test kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete
[ https://issues.apache.org/jira/browse/KAFKA-9009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9009: --- Affects Version/s: 2.6.0 2.5.0 > Flaky Test > kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete > -- > > Key: KAFKA-9009 > URL: https://issues.apache.org/jira/browse/KAFKA-9009 > Project: Kafka > Issue Type: Test > Components: core >Affects Versions: 2.5.0, 2.6.0 >Reporter: Bill Bejeck >Priority: Major > Labels: flaky-test > > Failure seen in > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/25644/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/testMetricsDuringTopicCreateDelete/] > > {noformat} > Error Messagejava.lang.AssertionError: assertion failed: > UnderReplicatedPartitionCount not 0: 1Stacktracejava.lang.AssertionError: > assertion failed: UnderReplicatedPartitionCount not 0: 1 > at scala.Predef$.assert(Predef.scala:170) > at > kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:123) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTest
[jira] [Commented] (KAFKA-9009) Flaky Test kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete
[ https://issues.apache.org/jira/browse/KAFKA-9009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17149560#comment-17149560 ] Boyang Chen commented on KAFKA-9009: Failed again:[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3200/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/testMetricsDuringTopicCreateDelete/] java.lang.AssertionError: assertion failed: UnderReplicatedPartitionCount not 0: 1 at kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:122) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.
[jira] [Commented] (KAFKA-9831) Failing test: EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]
[ https://issues.apache.org/jira/browse/KAFKA-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17150516#comment-17150516 ] Boyang Chen commented on KAFKA-9831: Failed again: [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1377/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_/] h3. Error Message java.lang.AssertionError: Expected: <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 91), KeyValue(0, 105)]> but: was <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28)]> h3. Stacktrace java.lang.AssertionError: Expected: <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 91), KeyValue(0, 105)]> but: was <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28)]> > Failing test: > EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta] > -- > > Key: KAFKA-9831 > URL: https://issues.apache.org/jira/browse/KAFKA-9831 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: John Roesler >Assignee: Matthias J. Sax >Priority: Major > Attachments: one.stdout.txt, two.stdout.txt > > > I've seen this fail twice in a row on the same build, but with different > errors. Stacktraces follow; stdout is attached. > One: > {noformat} > java.lang.AssertionError: Did not receive all 40 records from topic > singlePartitionOutputTopic within 6 ms > Expected: is a value equal to or greater than <40> > but: <39> was less than <40> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:517) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:513) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:491) > at > org.apache.kafka.streams.integration.EosIntegrationTest.readResult(EosIntegrationTest.java:766) > at > org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:473) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRun
[jira] [Assigned] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-6520: -- Assignee: Milind Jain > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Milind Jain >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-6520: -- Assignee: Vince Mu (was: Milind Jain) > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Vince Mu >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17150519#comment-17150519 ] Boyang Chen commented on KAFKA-6520: [~VinceMu] Gave you contributor permission and assign the ticket to you. > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Vince Mu >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10237) Properly handle in-memory stores OOM
Boyang Chen created KAFKA-10237: --- Summary: Properly handle in-memory stores OOM Key: KAFKA-10237 URL: https://issues.apache.org/jira/browse/KAFKA-10237 Project: Kafka Issue Type: Improvement Components: streams Reporter: Boyang Chen We have seen the in-memory store buffered too much data and eventually get OOM. Generally speaking, OOM has no real indication of the underlying problem and increases the difficulty for user debugging, since the failed thread may not be the actual culprit which causes the explosion. If we could get better protection to avoid hitting memory limit, or at least giving out a clear guide, the end user debugging would be much simpler. To make it work, we need to enforce a certain memory limit below heap size and take actions when hitting it. The first question would be, whether we set a numeric limit, such as 100MB or 500MB, or a percentile limit, such as 60% or 80% of total memory. The second question is about the action itself. One approach would be crashing the store immediately and inform the user to increase their application capacity. The second approach would be opening up an on-disk store spontaneously and offload the data to it. Personally I'm in favor of approach #2 because it has minimum impact to the on-going application. However it is more complex and potentially requires significant works to define the proper behavior such as the default store configuration, how to manage its lifecycle, etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10239) The groupInstanceId field in DescribeGroup response should be ignorable
[ https://issues.apache.org/jira/browse/KAFKA-10239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-10239: --- Assignee: Boyang Chen > The groupInstanceId field in DescribeGroup response should be ignorable > --- > > Key: KAFKA-10239 > URL: https://issues.apache.org/jira/browse/KAFKA-10239 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Jason Gustafson >Assignee: Boyang Chen >Priority: Critical > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > We noticed the following error in the logs in the handling of a DescribeGroup > request: > ``` > org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to > write a non-default groupInstanceId at version 3 > ``` > The problem is that the field is not marked as ignorable. So if the user is > relying on static membership and uses an older AdminClient, they will see > this error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10241) Pursue a better way to cover ignorable RPC fields
Boyang Chen created KAFKA-10241: --- Summary: Pursue a better way to cover ignorable RPC fields Key: KAFKA-10241 URL: https://issues.apache.org/jira/browse/KAFKA-10241 Project: Kafka Issue Type: Improvement Components: clients, core Reporter: Boyang Chen We have hit case such as https://issues.apache.org/jira/browse/KAFKA-10239 where we accidentally include a non-ignorable field into the returned response, and eventually crash older clients who doesn't support this field. It would be good to add a generic test suite to cover all existing and new RPC changes to ensure that we don't have a chance to put a non-ignorable field for older version of clients. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10239) The groupInstanceId field in DescribeGroup response should be ignorable
[ https://issues.apache.org/jira/browse/KAFKA-10239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10239. - Resolution: Fixed > The groupInstanceId field in DescribeGroup response should be ignorable > --- > > Key: KAFKA-10239 > URL: https://issues.apache.org/jira/browse/KAFKA-10239 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Jason Gustafson >Assignee: Boyang Chen >Priority: Critical > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > We noticed the following error in the logs in the handling of a DescribeGroup > request: > ``` > org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to > write a non-default groupInstanceId at version 3 > ``` > The problem is that the field is not marked as ignorable. So if the user is > relying on static membership and uses an older AdminClient, they will see > this error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10242) Adding metrics to track the total count of idempotent producers that Broker need to track
[ https://issues.apache.org/jira/browse/KAFKA-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153229#comment-17153229 ] Boyang Chen commented on KAFKA-10242: - Is this really critical? I think we could downgrade to major. Besides, we need to kick off a KIP discussion if we haven't done yet, and link it to the JIRA. > Adding metrics to track the total count of idempotent producers that Broker > need to track > - > > Key: KAFKA-10242 > URL: https://issues.apache.org/jira/browse/KAFKA-10242 > Project: Kafka > Issue Type: Improvement > Components: producer >Affects Versions: 2.5.0 >Reporter: Ming Liu >Priority: Critical > Labels: needs-kip > Fix For: 2.7.0 > > > We found it is very useful to track the total number of idempotent producers > that broker is tracking. > In our production environment, we have many idempotent producers for a > cluster and sometimes that number increased to very high number which > requires some attention to mitigate. > This is especially true for client (< 2.4) where the client retry might > generate too many different idempotent producers which can trigger broker GC. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10270) Add a broker to controller channel manager to redirect AlterConfig
Boyang Chen created KAFKA-10270: --- Summary: Add a broker to controller channel manager to redirect AlterConfig Key: KAFKA-10270 URL: https://issues.apache.org/jira/browse/KAFKA-10270 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10270) Add a broker to controller channel manager to redirect AlterConfig
[ https://issues.apache.org/jira/browse/KAFKA-10270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10270: Description: Per KIP-590 requirement, we need to have a dedicate communication channel from broker to the controller. > Add a broker to controller channel manager to redirect AlterConfig > -- > > Key: KAFKA-10270 > URL: https://issues.apache.org/jira/browse/KAFKA-10270 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Per KIP-590 requirement, we need to have a dedicate communication channel > from broker to the controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
Boyang Chen created KAFKA-10284: --- Summary: Group membership update due to static member rejoin should be persisted Key: KAFKA-10284 URL: https://issues.apache.org/jira/browse/KAFKA-10284 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.5.0, 2.4.0, 2.3.0, 2.6.0 Reporter: Boyang Chen Assignee: Boyang Chen Fix For: 2.6.1 For known static members rejoin, we would update its corresponding member.id without triggering a new rebalance. This serves the purpose for avoiding unnecessary rebalance for static membership, as well as fencing purpose if some still uses the old member.id. The bug is that we don't actually persist the membership update, so if no upcoming rebalance gets triggered, this new member.id information will get lost during group coordinator immigration, thus bringing up the zombie member identity. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159616#comment-17159616 ] Boyang Chen commented on KAFKA-10284: - [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1042] > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10284: Description: For known static members rejoin, we would update its corresponding member.id without triggering a new rebalance. This serves the purpose for avoiding unnecessary rebalance for static membership, as well as fencing purpose if some still uses the old member.id. The bug is that we don't actually persist the membership update, so if no upcoming rebalance gets triggered, this new member.id information will get lost during group coordinator immigration, thus bringing up the zombie member identity. The bug find credit goes to [~hachikuji] was: For known static members rejoin, we would update its corresponding member.id without triggering a new rebalance. This serves the purpose for avoiding unnecessary rebalance for static membership, as well as fencing purpose if some still uses the old member.id. The bug is that we don't actually persist the membership update, so if no upcoming rebalance gets triggered, this new member.id information will get lost during group coordinator immigration, thus bringing up the zombie member identity. > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17160216#comment-17160216 ] Boyang Chen commented on KAFKA-10284: - If we have a combined scenario like below: # A static member X joins the group and updates member.id to M1, then gets stuck # Another static member Y with the same instance.id joins and updates member.id to M2, while starts working and commit offsets # The group coordinator migrates, and the member.id for the same static member rewinds to M1 # The static member X goes back online, and validated. It would try to fetch from Y's committed offset In this flow, I don't think we are violating the offset committing policy here. The only downside I could think of is that there is only one member Y who will get fenced by itself after the immigration as stated in the KIP. [~guozhang] > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17160216#comment-17160216 ] Boyang Chen edited comment on KAFKA-10284 at 7/17/20, 10:28 PM: If we have a combined scenario like below: # A static member X joins the group and updates member.id to M1, then gets stuck # Another static member Y with the same instance.id joins and updates member.id to M2, while starts working and commit offsets # The group coordinator migrates, and the member.id for the same static member rewinds to M1 # The static member X goes back online, and validated. It would try to fetch from Y's committed offset In this flow, I don't think we are violating the offset committing policy here. The only downside I could think of is that there is only one member Y who will get fenced by itself after the immigration as stated in the description. [~guozhang] was (Author: bchen225242): If we have a combined scenario like below: # A static member X joins the group and updates member.id to M1, then gets stuck # Another static member Y with the same instance.id joins and updates member.id to M2, while starts working and commit offsets # The group coordinator migrates, and the member.id for the same static member rewinds to M1 # The static member X goes back online, and validated. It would try to fetch from Y's committed offset In this flow, I don't think we are violating the offset committing policy here. The only downside I could think of is that there is only one member Y who will get fenced by itself after the immigration as stated in the KIP. [~guozhang] > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163871#comment-17163871 ] Boyang Chen commented on KAFKA-10284: - [~akshaysh] I didn't see any trace that the group coordinator gets migrated in the pasted ticket, so it might be a separate issue. [~ableegoldman] Well, the symptom matches, but I don't know for sure if this is the same cause :) > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
Boyang Chen created KAFKA-10307: --- Summary: Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable Key: KAFKA-10307 URL: https://issues.apache.org/jira/browse/KAFKA-10307 Project: Kafka Issue Type: Bug Reporter: Boyang Chen We have spotted a cycled topology for the foreign-key join test *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug in the algorithm or the test only. Used [https://zz85.github.io/kafka-streams-viz/] to visualize: {code:java} Sub-topology: 0 Source: KTABLE-SOURCE-19 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic]) --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 Source: KTABLE-SOURCE-32 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic]) --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 Source: KSTREAM-SOURCE-01 (topics: [table1]) --> KTABLE-SOURCE-02 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: [table1-STATE-STORE-00]) --> KTABLE-FK-JOIN-OUTPUT-21 <-- KTABLE-SOURCE-19 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: [INNER-store1]) --> KTABLE-FK-JOIN-OUTPUT-34 <-- KTABLE-SOURCE-32 Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1]) --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2]) --> KTABLE-TOSTREAM-35 <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 Processor: KTABLE-SOURCE-02 (stores: [table1-STATE-STORE-00]) --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 <-- KSTREAM-SOURCE-01 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: []) --> KTABLE-SINK-11 <-- KTABLE-SOURCE-02 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: []) --> KTABLE-SINK-24 <-- KTABLE-FK-JOIN-OUTPUT-21 Processor: KTABLE-TOSTREAM-35 (stores: []) --> KSTREAM-SINK-36 <-- KTABLE-FK-JOIN-OUTPUT-34 Sink: KSTREAM-SINK-36 (topic: output-) <-- KTABLE-TOSTREAM-35 Sink: KTABLE-SINK-11 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic) <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 Sink: KTABLE-SINK-24 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic) <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 Sub-topology: 1 Source: KSTREAM-SOURCE-04 (topics: [table2]) --> KTABLE-SOURCE-05 Source: KTABLE-SOURCE-12 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 <-- KTABLE-SOURCE-12 Processor: KTABLE-SOURCE-05 (stores: [table2-STATE-STORE-03]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 <-- KSTREAM-SOURCE-04 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: [table2-STATE-STORE-03]) --> KTABLE-SINK-18 <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) --> KTABLE-SINK-18 <-- KTABLE-SOURCE-05 Sink: KTABLE-SINK-18 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic) <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 Sub-topology: 2 Source: KSTREAM-SOURCE-07 (topics: [table3]) --> KTABLE-SOURCE-08 Source: KTABLE-SOURCE-25 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-27 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-27 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-26]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-28 <-- KTABLE-SOURCE-25 Processor: KTABLE-SOURCE-08 (stores: [table3-STATE-STORE-06]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-29 <-- KSTREAM-SOURCE-07 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-28 (stores: [table3-STATE-STORE-0
[jira] [Created] (KAFKA-10868) Avoid double wrapping KafkaException
Boyang Chen created KAFKA-10868: --- Summary: Avoid double wrapping KafkaException Key: KAFKA-10868 URL: https://issues.apache.org/jira/browse/KAFKA-10868 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10868) Avoid double wrapping KafkaException
[ https://issues.apache.org/jira/browse/KAFKA-10868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10868: Description: Today certain exceptions get double wraps of KafkaException. We should remove those cases > Avoid double wrapping KafkaException > > > Key: KAFKA-10868 > URL: https://issues.apache.org/jira/browse/KAFKA-10868 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Priority: Major > > Today certain exceptions get double wraps of KafkaException. We should remove > those cases -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12161) Raft observers should not require an id to fetch
[ https://issues.apache.org/jira/browse/KAFKA-12161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17261072#comment-17261072 ] Boyang Chen commented on KAFKA-12161: - I was wondering whether we could just get a random UUID for observer when we do the tooling? > Raft observers should not require an id to fetch > > > Key: KAFKA-12161 > URL: https://issues.apache.org/jira/browse/KAFKA-12161 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > It is useful to allow observers to replay the metadata log without requiring > a replica id. For example, this can be used by tools in order to inspect the > current metadata state. In order to support this, we should modify > `KafkaRaftClient` so that the broker id is not required. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12215) Broker could cache its overlapping ApiVersions with active controller
Boyang Chen created KAFKA-12215: --- Summary: Broker could cache its overlapping ApiVersions with active controller Key: KAFKA-12215 URL: https://issues.apache.org/jira/browse/KAFKA-12215 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Right now, each ApiVersionRequest would need to compute overlapping api versions with controller every time: https://issues.apache.org/jira/browse/KAFKA-10674 Unless the active controller is changed, the computation result should always be the same. We may consider cache the result and only update it when the controller change happens. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12169) Consumer can not know paritions chage when client leader restart with static membership protocol
[ https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273328#comment-17273328 ] Boyang Chen commented on KAFKA-12169: - In general, the leader should be able to detect metadata discrepancy between its remembered topic metadata and broker side metadata. I don't think we have any test case to cover both the topic partition change and leader rejoin at the same time, so it's possible and needs some verification. > Consumer can not know paritions chage when client leader restart with static > membership protocol > > > Key: KAFKA-12169 > URL: https://issues.apache.org/jira/browse/KAFKA-12169 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.5.1, 2.6.1 >Reporter: zou shengfu >Priority: Major > > Background: > Kafka consumer services run with static membership and cooperative rebalance > protocol on kubernetes, and services often restart because of operation. When > we added partitions from 1000 to 2000 for the topic, client leader restart > with unknown member id at the same time, we found the consumers do not > tigger rebalance and still consume 1000 paritions > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9689) Automatic broker version detection to initialize stream client
[ https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273379#comment-17273379 ] Boyang Chen commented on KAFKA-9689: I agree with A) since it is internal struct. > Automatic broker version detection to initialize stream client > -- > > Key: KAFKA-9689 > URL: https://issues.apache.org/jira/browse/KAFKA-9689 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > > Eventually we shall deprecate the flag to suppress EOS thread producer > feature, instead we take version detection approach on broker to decide which > semantic to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12260) PartitionsFor should not return null value
Boyang Chen created KAFKA-12260: --- Summary: PartitionsFor should not return null value Key: KAFKA-12260 URL: https://issues.apache.org/jira/browse/KAFKA-12260 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Boyang Chen consumer.partitionsFor() could return null value when topic was not found. This was not properly documented and was error-prone when the return type was a list. We should fix the logic internally to prevent partitionsFor returning null result. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12260) PartitionsFor should not return null value
[ https://issues.apache.org/jira/browse/KAFKA-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-12260: --- Assignee: Boyang Chen > PartitionsFor should not return null value > -- > > Key: KAFKA-12260 > URL: https://issues.apache.org/jira/browse/KAFKA-12260 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Minor > > consumer.partitionsFor() could return null value when topic was not found. > This was not properly documented and was error-prone when the return type was > a list. We should fix the logic internally to prevent partitionsFor returning > null result. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689916#comment-16689916 ] Boyang Chen commented on KAFKA-7610: Sounds good Jason! I think combining `group.max.size` and member id requirement in join group request should be sufficient for the solving the above scenario. I will open a separate Jira for group size, in the meanwhile I think the conclusion is that Jason's original proposal 2 should be sufficient to mitigate the problem. > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7641) Add `group.max.size` to cap group metadata on broker
Boyang Chen created KAFKA-7641: -- Summary: Add `group.max.size` to cap group metadata on broker Key: KAFKA-7641 URL: https://issues.apache.org/jira/browse/KAFKA-7641 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7641: --- Summary: Add `consumer.group.max.size` to cap consumer metadata size on broker (was: Add `group.max.size` to cap group metadata on broker) > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7641: --- Description: In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason concluded an edge case of current consumer protocol which could cause memory burst on broker side: ```the case we observed in practice was caused by a consumer that was slow to rejoin the group after a rebalance had begun. At the same time, there were new members that were trying to join the group for the first time. The request timeout was significantly lower than the rebalance timeout, so the JoinGroup of the new members kept timing out. The timeout caused a retry and the group size eventually become quite large because we could not detect the fact that the new members were no longer there.``` Since many disorganized join group requests are spamming the group metadata, we should define a cap on broker side to avoid one consumer group from growing too large. So far I feel it's appropriate to introduce this as a server config since most times this value is only dealing with error scenarios. > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689925#comment-16689925 ] Boyang Chen commented on KAFKA-7641: [~hachikuji] [~enether] > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7641: --- Description: In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason concluded an edge case of current consumer protocol which could cause memory burst on broker side: ```the case we observed in practice was caused by a consumer that was slow to rejoin the group after a rebalance had begun. At the same time, there were new members that were trying to join the group for the first time. The request timeout was significantly lower than the rebalance timeout, so the JoinGroup of the new members kept timing out. The timeout caused a retry and the group size eventually become quite large because we could not detect the fact that the new members were no longer there.``` Since many disorganized join group requests are spamming the group metadata, we should define a cap on broker side to avoid one consumer group from growing too large. So far I feel it's appropriate to introduce this as a server config since most times this value is only dealing with error scenarios, client users shouldn't worry about this config. was: In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason concluded an edge case of current consumer protocol which could cause memory burst on broker side: ```the case we observed in practice was caused by a consumer that was slow to rejoin the group after a rebalance had begun. At the same time, there were new members that were trying to join the group for the first time. The request timeout was significantly lower than the rebalance timeout, so the JoinGroup of the new members kept timing out. The timeout caused a retry and the group size eventually become quite large because we could not detect the fact that the new members were no longer there.``` Since many disorganized join group requests are spamming the group metadata, we should define a cap on broker side to avoid one consumer group from growing too large. So far I feel it's appropriate to introduce this as a server config since most times this value is only dealing with error scenarios. > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-7641: -- Assignee: Boyang Chen > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690102#comment-16690102 ] Boyang Chen commented on KAFKA-7641: [~enether] Thanks! I think we need to propose two KIPs here: 1. require member id during new member join group request 2. enforce group.max.size. I could take this one as a practice for going through the consumer protocol change, do you want to work on 2. here? > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690102#comment-16690102 ] Boyang Chen edited comment on KAFKA-7641 at 11/16/18 10:50 PM: --- [~enether] Thanks! I think we need to propose two KIPs here: 1. require member id during new member join group request 2. enforce group.max.size. I could take this one as a practice for going through the consumer protocol change, do you want to work on #2 here? was (Author: bchen225242): [~enether] Thanks! I think we need to propose two KIPs here: 1. require member id during new member join group request 2. enforce group.max.size. I could take this one as a practice for going through the consumer protocol change, do you want to work on 2. here? > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692496#comment-16692496 ] Boyang Chen commented on KAFKA-7641: [Stanislav Kozlovski|x-note://3AF09C0A-0D9D-4EFC-8F4D-4C82DC12DCE6/jira/secure/ViewProfile.jspa?name=enether] So sorry Stanislav for the confusion on my wording! Stanislav and I synced offline that I would be working on this Jira while he will take the member id change. Thanks a lot for your understanding here! > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692496#comment-16692496 ] Boyang Chen edited comment on KAFKA-7641 at 11/20/18 12:52 AM: --- [~enether] So sorry Stanislav for the confusion on my wording! Stanislav and I synced offline that I would be working on this Jira while he will take the member id change. Thanks a lot for your understanding here! was (Author: bchen225242): [Stanislav Kozlovski|x-note://3AF09C0A-0D9D-4EFC-8F4D-4C82DC12DCE6/jira/secure/ViewProfile.jspa?name=enether] So sorry Stanislav for the confusion on my wording! Stanislav and I synced offline that I would be working on this Jira while he will take the member id change. Thanks a lot for your understanding here! > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7566) Add sidecar job to leader (or a random single follower) only
[ https://issues.apache.org/jira/browse/KAFKA-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692501#comment-16692501 ] Boyang Chen commented on KAFKA-7566: [~guozhang] Yep, for example our KStream application publishes data snapshots every 10 minute to S3. Periodically we want to gc the outdated ones which could be done on a separate thread. The caveat is that the gc job will be scheduled on all instances and sometimes cause race conditions. We could use zk to mitigate the issue, but just want to reach out to see if there would be some similar request on stream side, which is a leader-only, non-blocking main processing feature. > Add sidecar job to leader (or a random single follower) only > > > Key: KAFKA-7566 > URL: https://issues.apache.org/jira/browse/KAFKA-7566 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Priority: Minor > > Hey there, > recently we need to add an archive job to a streaming application. The caveat > is that we need to make sure only one instance is doing this task to avoid > potential race condition, and we also don't want to schedule it as a regular > stream task so that we will be blocking normal streaming operation. > Although we could do so by doing a zk lease, I'm raising the case here since > this could be some potential use case for streaming job also. For example, > there are some `leader specific` operation we could schedule in DSL instead > of adhoc manner. > Let me know if you think this makes sense to you, thank you! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695627#comment-16695627 ] Boyang Chen commented on KAFKA-7610: [~guozhang] [~hachikuji] Do you have any concern on 2) (requiring member id when allowing new member to join the group)? If not, I could go ahead and start a KIP. > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7610: --- Component/s: consumer > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-7610: -- Assignee: Boyang Chen > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Boyang Chen >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss
[ https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698803#comment-16698803 ] Boyang Chen commented on KAFKA-7672: Thanks Linyue for reporting and reproducing the bug! In fact, we have also detected similar issue in our streaming use cases, fixing this would be of great value. For implementation detail, let's discuss some strong consistency guarantee instead of thread sleep, because I believe data loss is a non-tolerable issue for many use cases. cc [~ishiihara] [~shnguyen] > The local state not fully restored after KafkaStream rebalanced, resulting in > data loss > --- > > Key: KAFKA-7672 > URL: https://issues.apache.org/jira/browse/KAFKA-7672 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0 >Reporter: linyue li >Priority: Major > Fix For: 2.1.0 > > > Normally, when a task is mitigated to a new thread and no checkpoint file was > found under its task folder, Kafka Stream needs to restore the local state > for remote changelog topic completely and then resume running. However, in > some scenarios, we found that Kafka Stream *NOT* restore this state even no > checkpoint was found, but just clean the state folder and transition to > running state directly, resulting the historic data loss. > To be specific, I will give the detailed logs for Kafka Stream in our project > to show this scenario: > {quote}2018-10-23 08:27:07,684 INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Revoking previously assigned partitions [AuditTrailBatch-0-5] > 2018-10-23 08:27:07,684 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to > PARTITIONS_REVOKED > 2018-10-23 08:27:10,856 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > (Re-)joining group > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Successfully joined group with generation 323 > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1] > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to > PARTITIONS_ASSIGNED > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1* > 2018-10-23 08:27:53,622 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms. > 2018-10-23 08:27:54,357 INFO > org.apache.kafka.streams.processor.internals.StoreChangelogReader - > stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task > 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* > *Reinitializing the task and restore its state from the beginning.* > 2018-10-23 08:27:54,357 INFO > org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting > offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.* > 2018-10-23 08:27:54,653 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to > RUNNING* > {quote} > From the logs above, we can get the procedure for thread > AuditTrailBatch-StreamThread-1: > # the previous running task assigned to thread 1 is task 0_5 (the > corresponding partition is AuditTrailBatch-0-5) > # group begins to rebalance, the new task 1_1 is assigned to thread 1. > # no checkpoint was found under 1_1 state folder, so reset the offset to 0 > and clean the local state folder. > # thread 1 transitions to RUNNING state directly without the restoration for > task 1_1, so the historic data for state 1_1 is lost for thread 1. > *ThoubleShoot* > To investigate the cause for this issue, we analysis the source code in > KafkaStream and found the key is the variable named "completedRestor
[jira] [Commented] (KAFKA-7566) Add sidecar job to leader (or a random single follower) only
[ https://issues.apache.org/jira/browse/KAFKA-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702931#comment-16702931 ] Boyang Chen commented on KAFKA-7566: I see your point. Feel free to close this Jira because I feel currently there is no strong need for this work. > Add sidecar job to leader (or a random single follower) only > > > Key: KAFKA-7566 > URL: https://issues.apache.org/jira/browse/KAFKA-7566 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Priority: Minor > > Hey there, > recently we need to add an archive job to a streaming application. The caveat > is that we need to make sure only one instance is doing this task to avoid > potential race condition, and we also don't want to schedule it as a regular > stream task so that we will be blocking normal streaming operation. > Although we could do so by doing a zk lease, I'm raising the case here since > this could be some potential use case for streaming job also. For example, > there are some `leader specific` operation we could schedule in DSL instead > of adhoc manner. > Let me know if you think this makes sense to you, thank you! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling
Boyang Chen created KAFKA-7728: -- Summary: Add JoinReason to the join group request for better rebalance handling Key: KAFKA-7728 URL: https://issues.apache.org/jira/browse/KAFKA-7728 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Recently [~mgharat] and I discussed about the current rebalance logic on leader join group request handling. So far we blindly trigger rebalance when the leader rejoins. The caveat is that KIP-345 is not covering this effort and if a consumer group is not using sticky assignment but using other strategy like round robin, the redundant rebalance could still shuffle the topic partitions around consumers. (for example mirror maker application) I checked on broker side and here is what we currently do: {code:java} if (group.isLeader(memberId) || !member.matches(protocols)) // force a rebalance if a member has changed metadata or if the leader sends JoinGroup. // The latter allows the leader to trigger rebalances for changes affecting assignment // which do not affect the member metadata (such as topic metadata changes for the consumer) {code} Based on the broker logic, we only need to trigger rebalance for leader rejoin when the topic metadata change has happened. I also looked up the ConsumerCoordinator code on client side, and found out the metadata monitoring logic here: {code:java} public boolean rejoinNeededOrPending() { ... // we need to rejoin if we performed the assignment and metadata has changed if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) return true; }{code} I guess instead of just returning true, we could introduce a new enum field called JoinReason which could indicate the purpose of the rejoin. Thus we don't need to do a full rebalance when the leader is just in rolling bounce. We could utilize this information I guess. Just add another enum field into the join group request called JoinReason so that we know whether leader is rejoining due to topic metadata change. If yes, we trigger rebalance obviously; if no, we shouldn't trigger rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling
[ https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-7728: -- Assignee: Mayuresh Gharat > Add JoinReason to the join group request for better rebalance handling > -- > > Key: KAFKA-7728 > URL: https://issues.apache.org/jira/browse/KAFKA-7728 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Mayuresh Gharat >Priority: Major > Labels: consumer, mirror-maker, needs-kip > > Recently [~mgharat] and I discussed about the current rebalance logic on > leader join group request handling. So far we blindly trigger rebalance when > the leader rejoins. The caveat is that KIP-345 is not covering this effort > and if a consumer group is not using sticky assignment but using other > strategy like round robin, the redundant rebalance could still shuffle the > topic partitions around consumers. (for example mirror maker application) > I checked on broker side and here is what we currently do: > > {code:java} > if (group.isLeader(memberId) || !member.matches(protocols)) > // force a rebalance if a member has changed metadata or if the leader sends > JoinGroup. > // The latter allows the leader to trigger rebalances for changes affecting > assignment > // which do not affect the member metadata (such as topic metadata changes > for the consumer) {code} > Based on the broker logic, we only need to trigger rebalance for leader > rejoin when the topic metadata change has happened. I also looked up the > ConsumerCoordinator code on client side, and found out the metadata > monitoring logic here: > {code:java} > public boolean rejoinNeededOrPending() { > ... > // we need to rejoin if we performed the assignment and metadata has changed > if (assignmentSnapshot != null && > !assignmentSnapshot.equals(metadataSnapshot)) > return true; > }{code} > I guess instead of just returning true, we could introduce a new enum field > called JoinReason which could indicate the purpose of the rejoin. Thus we > don't need to do a full rebalance when the leader is just in rolling bounce. > We could utilize this information I guess. Just add another enum field into > the join group request called JoinReason so that we know whether leader is > rejoining due to topic metadata change. If yes, we trigger rebalance > obviously; if no, we shouldn't trigger rebalance. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling
[ https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722715#comment-16722715 ] Boyang Chen commented on KAFKA-7728: [~guozhang] [~mjsax] [~hachikuji] [~enether] Do you have time to add some discussion to this thread? Thank you! > Add JoinReason to the join group request for better rebalance handling > -- > > Key: KAFKA-7728 > URL: https://issues.apache.org/jira/browse/KAFKA-7728 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Mayuresh Gharat >Priority: Major > Labels: consumer, mirror-maker, needs-kip > > Recently [~mgharat] and I discussed about the current rebalance logic on > leader join group request handling. So far we blindly trigger rebalance when > the leader rejoins. The caveat is that KIP-345 is not covering this effort > and if a consumer group is not using sticky assignment but using other > strategy like round robin, the redundant rebalance could still shuffle the > topic partitions around consumers. (for example mirror maker application) > I checked on broker side and here is what we currently do: > > {code:java} > if (group.isLeader(memberId) || !member.matches(protocols)) > // force a rebalance if a member has changed metadata or if the leader sends > JoinGroup. > // The latter allows the leader to trigger rebalances for changes affecting > assignment > // which do not affect the member metadata (such as topic metadata changes > for the consumer) {code} > Based on the broker logic, we only need to trigger rebalance for leader > rejoin when the topic metadata change has happened. I also looked up the > ConsumerCoordinator code on client side, and found out the metadata > monitoring logic here: > {code:java} > public boolean rejoinNeededOrPending() { > ... > // we need to rejoin if we performed the assignment and metadata has changed > if (assignmentSnapshot != null && > !assignmentSnapshot.equals(metadataSnapshot)) > return true; > }{code} > I guess instead of just returning true, we could introduce a new enum field > called JoinReason which could indicate the purpose of the rejoin. Thus we > don't need to do a full rebalance when the leader is just in rolling bounce. > We could utilize this information I guess. Just add another enum field into > the join group request called JoinReason so that we know whether leader is > rejoining due to topic metadata change. If yes, we trigger rebalance > obviously; if no, we shouldn't trigger rebalance. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling
[ https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16729949#comment-16729949 ] Boyang Chen commented on KAFKA-7728: Thanks Mayuresh for the great summary! Just pinging on this thread again to get more discussions going on [~guozhang] [~mjsax] [~hachikuji] [~enether] > Add JoinReason to the join group request for better rebalance handling > -- > > Key: KAFKA-7728 > URL: https://issues.apache.org/jira/browse/KAFKA-7728 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Mayuresh Gharat >Priority: Major > Labels: consumer, mirror-maker, needs-kip > > Recently [~mgharat] and I discussed about the current rebalance logic on > leader join group request handling. So far we blindly trigger rebalance when > the leader rejoins. The caveat is that KIP-345 is not covering this effort > and if a consumer group is not using sticky assignment but using other > strategy like round robin, the redundant rebalance could still shuffle the > topic partitions around consumers. (for example mirror maker application) > I checked on broker side and here is what we currently do: > > {code:java} > if (group.isLeader(memberId) || !member.matches(protocols)) > // force a rebalance if a member has changed metadata or if the leader sends > JoinGroup. > // The latter allows the leader to trigger rebalances for changes affecting > assignment > // which do not affect the member metadata (such as topic metadata changes > for the consumer) {code} > Based on the broker logic, we only need to trigger rebalance for leader > rejoin when the topic metadata change has happened. I also looked up the > ConsumerCoordinator code on client side, and found out the metadata > monitoring logic here: > {code:java} > public boolean rejoinNeededOrPending() { > ... > // we need to rejoin if we performed the assignment and metadata has changed > if (assignmentSnapshot != null && > !assignmentSnapshot.equals(metadataSnapshot)) > return true; > }{code} > I guess instead of just returning true, we could introduce a new enum field > called JoinReason which could indicate the purpose of the rejoin. Thus we > don't need to do a full rebalance when the leader is just in rolling bounce. > We could utilize this information I guess. Just add another enum field into > the join group request called JoinReason so that we know whether leader is > rejoining due to topic metadata change. If yes, we trigger rebalance > obviously; if no, we shouldn't trigger rebalance. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling
[ https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730594#comment-16730594 ] Boyang Chen commented on KAFKA-7728: [~enether] Thanks for the thoughts! I think the compatibility should be considered as the JoinGroupRequest version will be bumped. I already come up some common join reasons for a potential enum type: {code:java} public enum JoinGroupReason { BLIND("blind"), // Join request from a start-up consumer SELF_META_CHANGE("self_meta_change"), // The consumer metadata has changed TOPIC_METAD_CHANGE("topic_meta_change"); // The topic metadata changed (must be from the leader) } {code} the self metadata change might be trivial to realize now, but I think it would be better to discuss more scenarios before we finalize anything. Let's brainstorm on more join reasons that will be helpful for the broker to make decision. > Add JoinReason to the join group request for better rebalance handling > -- > > Key: KAFKA-7728 > URL: https://issues.apache.org/jira/browse/KAFKA-7728 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Mayuresh Gharat >Priority: Major > Labels: consumer, mirror-maker, needs-kip > > Recently [~mgharat] and I discussed about the current rebalance logic on > leader join group request handling. So far we blindly trigger rebalance when > the leader rejoins. The caveat is that KIP-345 is not covering this effort > and if a consumer group is not using sticky assignment but using other > strategy like round robin, the redundant rebalance could still shuffle the > topic partitions around consumers. (for example mirror maker application) > I checked on broker side and here is what we currently do: > > {code:java} > if (group.isLeader(memberId) || !member.matches(protocols)) > // force a rebalance if a member has changed metadata or if the leader sends > JoinGroup. > // The latter allows the leader to trigger rebalances for changes affecting > assignment > // which do not affect the member metadata (such as topic metadata changes > for the consumer) {code} > Based on the broker logic, we only need to trigger rebalance for leader > rejoin when the topic metadata change has happened. I also looked up the > ConsumerCoordinator code on client side, and found out the metadata > monitoring logic here: > {code:java} > public boolean rejoinNeededOrPending() { > ... > // we need to rejoin if we performed the assignment and metadata has changed > if (assignmentSnapshot != null && > !assignmentSnapshot.equals(metadataSnapshot)) > return true; > }{code} > I guess instead of just returning true, we could introduce a new enum field > called JoinReason which could indicate the purpose of the rejoin. Thus we > don't need to do a full rebalance when the leader is just in rolling bounce. > We could utilize this information I guess. Just add another enum field into > the join group request called JoinReason so that we know whether leader is > rejoining due to topic metadata change. If yes, we trigger rebalance > obviously; if no, we shouldn't trigger rebalance. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7725) Add a delay for further CG rebalances, beyond KIP-134 group.initial.rebalance.delay.ms
[ https://issues.apache.org/jira/browse/KAFKA-7725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730800#comment-16730800 ] Boyang Chen commented on KAFKA-7725: Thanks [~astubbs] for sharing this issue. Will add this to the KIP-345 Jira list. > Add a delay for further CG rebalances, beyond KIP-134 > group.initial.rebalance.delay.ms > -- > > Key: KAFKA-7725 > URL: https://issues.apache.org/jira/browse/KAFKA-7725 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer, core >Affects Versions: 2.1.0 >Reporter: Antony Stubbs >Priority: Major > > KIP-134 group.initial.rebalance.delay.ms was a good start, but there are much > bigger problems where after a system is up and running, consumers can leave > and join in large amounts, causing rebalance storms. One example is > Mesosphere deploying new versions of an app - say there are 10 instances, > then 10 more instances are deployed with the new version, then the old 10 are > scaled down. Ideally this would be 1 or 2 rebalances, instead of 20. > The trade off is that if the delay is 5 seconds, every consumer joining > within that window would extend it by another 5 seconds, potentially causing > partitions to never be processed. To mitigate this, either a max rebalance > delay could also be added, or multiple consumers joining won't extend the > rebalance delay, so that it's always a max of 5 seconds. > Related: [KIP-345: Introduce static membership protocol to reduce consumer > rebalances|https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances] > KAFKA-7018: persist memberId for consumer restart -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7018) persist memberId for consumer restart
[ https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7018: --- Pull request: https://github.com/apache/kafka/pull/5176 > persist memberId for consumer restart > - > > Key: KAFKA-7018 > URL: https://issues.apache.org/jira/browse/KAFKA-7018 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In group coordinator, there is a logic to neglect join group request from > existing follower consumers: > {code:java} > case Empty | Stable => > if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { > // if the member id is unknown, register the member to the group > addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, > clientHost, protocolType, protocols, group, responseCallback) > } else { > val member = group.get(memberId) > if (group.isLeader(memberId) || !member.matches(protocols)) { > // force a rebalance if a member has changed metadata or if the leader > sends JoinGroup. > // The latter allows the leader to trigger rebalances for changes > affecting assignment > // which do not affect the member metadata (such as topic metadata > changes for the consumer) > updateMemberAndRebalance(group, member, protocols, responseCallback) > } else { > // for followers with no actual change to their metadata, just return > group information > // for the current generation which will allow them to issue SyncGroup > responseCallback(JoinGroupResult( > members = Map.empty, > memberId = memberId, > generationId = group.generationId, > subProtocol = group.protocolOrNull, > leaderId = group.leaderOrNull, > error = Errors.NONE)) > } > {code} > While looking at the AbstractCoordinator, I found that the generation was > hard-coded as > NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the > first join group request. This means we will treat the restarted consumer as > a new member, so the rebalance will be triggered until session timeout. > I'm trying to clarify the following things before we extend the discussion: > # Whether my understanding of the above logic is right (Hope [~mjsax] could > help me double check) > # Whether it makes sense to persist last round of memberId for consumers? We > currently only need this feature in stream application, but will do no harm > if we also use it for consumer in general. This would be a nice-to-have > feature on consumer restart when we configured the loading-previous-memberId > to true. If we failed, simply use the UNKNOWN_MEMBER_ID > # The behavior could also be changed on the broker side, but I suspect it is > very risky. So far client side change should be the least effort. The end > goal is to avoid excessive rebalance from the same consumer restart, so if > you feel server side change could also help, we could further discuss. > Thank you for helping out! [~mjsax] [~guozhang] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling
[ https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730594#comment-16730594 ] Boyang Chen edited comment on KAFKA-7728 at 12/31/18 10:50 PM: --- [~enether] Thanks for the thoughts! I think the compatibility should be considered as the JoinGroupRequest version will be bumped. I already come up some common join reasons for a potential enum type: {code:java} public enum JoinGroupReason { BLIND("blind"), // Join request from a start-up consumer SELF_META_CHANGE("self_meta_change"), // The consumer metadata has changed TOPIC_METAD_CHANGE("topic_meta_change"); // The topic metadata changed (must be from the leader) } {code} the self metadata change might be used for user to indicate some status updates (for example Stream task replay ready) for broker to judge whether rebalance should be triggered. It would be better to discuss more scenarios before we finalize anything. Let's brainstorm on more join reasons that will be helpful for the broker to make decision. was (Author: bchen225242): [~enether] Thanks for the thoughts! I think the compatibility should be considered as the JoinGroupRequest version will be bumped. I already come up some common join reasons for a potential enum type: {code:java} public enum JoinGroupReason { BLIND("blind"), // Join request from a start-up consumer SELF_META_CHANGE("self_meta_change"), // The consumer metadata has changed TOPIC_METAD_CHANGE("topic_meta_change"); // The topic metadata changed (must be from the leader) } {code} the self metadata change might be trivial to realize now, but I think it would be better to discuss more scenarios before we finalize anything. Let's brainstorm on more join reasons that will be helpful for the broker to make decision. > Add JoinReason to the join group request for better rebalance handling > -- > > Key: KAFKA-7728 > URL: https://issues.apache.org/jira/browse/KAFKA-7728 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Mayuresh Gharat >Priority: Major > Labels: consumer, mirror-maker, needs-kip > > Recently [~mgharat] and I discussed about the current rebalance logic on > leader join group request handling. So far we blindly trigger rebalance when > the leader rejoins. The caveat is that KIP-345 is not covering this effort > and if a consumer group is not using sticky assignment but using other > strategy like round robin, the redundant rebalance could still shuffle the > topic partitions around consumers. (for example mirror maker application) > I checked on broker side and here is what we currently do: > > {code:java} > if (group.isLeader(memberId) || !member.matches(protocols)) > // force a rebalance if a member has changed metadata or if the leader sends > JoinGroup. > // The latter allows the leader to trigger rebalances for changes affecting > assignment > // which do not affect the member metadata (such as topic metadata changes > for the consumer) {code} > Based on the broker logic, we only need to trigger rebalance for leader > rejoin when the topic metadata change has happened. I also looked up the > ConsumerCoordinator code on client side, and found out the metadata > monitoring logic here: > {code:java} > public boolean rejoinNeededOrPending() { > ... > // we need to rejoin if we performed the assignment and metadata has changed > if (assignmentSnapshot != null && > !assignmentSnapshot.equals(metadataSnapshot)) > return true; > }{code} > I guess instead of just returning true, we could introduce a new enum field > called JoinReason which could indicate the purpose of the rejoin. Thus we > don't need to do a full rebalance when the leader is just in rolling bounce. > We could utilize this information I guess. Just add another enum field into > the join group request called JoinReason so that we know whether leader is > rejoining due to topic metadata change. If yes, we trigger rebalance > obviously; if no, we shouldn't trigger rebalance. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling
[ https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16733633#comment-16733633 ] Boyang Chen commented on KAFKA-7728: [~guozhang]Thanks for the confirmation! [~mgharat] Do you want to start a KIP to reach wider discussion? I think this is a very valuable change that could greatly improve global resource rebalancing. > Add JoinReason to the join group request for better rebalance handling > -- > > Key: KAFKA-7728 > URL: https://issues.apache.org/jira/browse/KAFKA-7728 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Mayuresh Gharat >Priority: Major > Labels: consumer, mirror-maker, needs-kip > > Recently [~mgharat] and I discussed about the current rebalance logic on > leader join group request handling. So far we blindly trigger rebalance when > the leader rejoins. The caveat is that KIP-345 is not covering this effort > and if a consumer group is not using sticky assignment but using other > strategy like round robin, the redundant rebalance could still shuffle the > topic partitions around consumers. (for example mirror maker application) > I checked on broker side and here is what we currently do: > > {code:java} > if (group.isLeader(memberId) || !member.matches(protocols)) > // force a rebalance if a member has changed metadata or if the leader sends > JoinGroup. > // The latter allows the leader to trigger rebalances for changes affecting > assignment > // which do not affect the member metadata (such as topic metadata changes > for the consumer) {code} > Based on the broker logic, we only need to trigger rebalance for leader > rejoin when the topic metadata change has happened. I also looked up the > ConsumerCoordinator code on client side, and found out the metadata > monitoring logic here: > {code:java} > public boolean rejoinNeededOrPending() { > ... > // we need to rejoin if we performed the assignment and metadata has changed > if (assignmentSnapshot != null && > !assignmentSnapshot.equals(metadataSnapshot)) > return true; > }{code} > I guess instead of just returning true, we could introduce a new enum field > called JoinReason which could indicate the purpose of the rejoin. Thus we > don't need to do a full rebalance when the leader is just in rolling bounce. > We could utilize this information I guess. Just add another enum field into > the join group request called JoinReason so that we know whether leader is > rejoining due to topic metadata change. If yes, we trigger rebalance > obviously; if no, we shouldn't trigger rebalance. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss
[ https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16734415#comment-16734415 ] Boyang Chen commented on KAFKA-7672: [~guozhang] Are you suggesting sth similar to this? https://github.com/apache/kafka/compare/trunk...abbccdda:bug_fix > The local state not fully restored after KafkaStream rebalanced, resulting in > data loss > --- > > Key: KAFKA-7672 > URL: https://issues.apache.org/jira/browse/KAFKA-7672 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0 >Reporter: linyue li >Assignee: linyue li >Priority: Major > Labels: bug > Fix For: 2.2.0, 2.1.1 > > > Normally, when a task is mitigated to a new thread and no checkpoint file was > found under its task folder, Kafka Stream needs to restore the local state > for remote changelog topic completely and then resume running. However, in > some scenarios, we found that Kafka Stream *NOT* restore this state even no > checkpoint was found, but just clean the state folder and transition to > running state directly, resulting the historic data loss. > To be specific, I will give the detailed logs for Kafka Stream in our project > to show this scenario: > {quote}2018-10-23 08:27:07,684 INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Revoking previously assigned partitions [AuditTrailBatch-0-5] > 2018-10-23 08:27:07,684 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to > PARTITIONS_REVOKED > 2018-10-23 08:27:10,856 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > (Re-)joining group > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Successfully joined group with generation 323 > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1] > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to > PARTITIONS_ASSIGNED > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1* > 2018-10-23 08:27:53,622 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms. > 2018-10-23 08:27:54,357 INFO > org.apache.kafka.streams.processor.internals.StoreChangelogReader - > stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task > 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* > *Reinitializing the task and restore its state from the beginning.* > 2018-10-23 08:27:54,357 INFO > org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting > offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.* > 2018-10-23 08:27:54,653 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to > RUNNING* > {quote} > From the logs above, we can get the procedure for thread > AuditTrailBatch-StreamThread-1: > # the previous running task assigned to thread 1 is task 0_5 (the > corresponding partition is AuditTrailBatch-0-5) > # group begins to rebalance, the new task 1_1 is assigned to thread 1. > # no checkpoint was found under 1_1 state folder, so reset the offset to 0 > and clean the local state folder. > # thread 1 transitions to RUNNING state directly without the restoration for > task 1_1, so the historic data for state 1_1 is lost for thread 1. > *ThoubleShoot* > To investigate the cause for this issue, we analysis the source code in > KafkaStream and found the key is the variable named "completedRestorers". > This is the definition of the variable: > {code:java} > private final Set completedRestorers = new HashSet<>();{code} > Each thread object has its own completedRestorers, which
[jira] [Commented] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss
[ https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16734504#comment-16734504 ] Boyang Chen commented on KAFKA-7672: [~guozhang] Yes the second issue. So since `atomicMoveWithFallback` is using NIO, will the operation introduce race condition for next task to read the checkpoint file if it doesn't exist already? > The local state not fully restored after KafkaStream rebalanced, resulting in > data loss > --- > > Key: KAFKA-7672 > URL: https://issues.apache.org/jira/browse/KAFKA-7672 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0 >Reporter: linyue li >Assignee: linyue li >Priority: Major > Labels: bug > Fix For: 2.2.0, 2.1.1 > > > Normally, when a task is mitigated to a new thread and no checkpoint file was > found under its task folder, Kafka Stream needs to restore the local state > for remote changelog topic completely and then resume running. However, in > some scenarios, we found that Kafka Stream *NOT* restore this state even no > checkpoint was found, but just clean the state folder and transition to > running state directly, resulting the historic data loss. > To be specific, I will give the detailed logs for Kafka Stream in our project > to show this scenario: > {quote}2018-10-23 08:27:07,684 INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Revoking previously assigned partitions [AuditTrailBatch-0-5] > 2018-10-23 08:27:07,684 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to > PARTITIONS_REVOKED > 2018-10-23 08:27:10,856 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > (Re-)joining group > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Successfully joined group with generation 323 > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1] > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to > PARTITIONS_ASSIGNED > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1* > 2018-10-23 08:27:53,622 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms. > 2018-10-23 08:27:54,357 INFO > org.apache.kafka.streams.processor.internals.StoreChangelogReader - > stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task > 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* > *Reinitializing the task and restore its state from the beginning.* > 2018-10-23 08:27:54,357 INFO > org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting > offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.* > 2018-10-23 08:27:54,653 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to > RUNNING* > {quote} > From the logs above, we can get the procedure for thread > AuditTrailBatch-StreamThread-1: > # the previous running task assigned to thread 1 is task 0_5 (the > corresponding partition is AuditTrailBatch-0-5) > # group begins to rebalance, the new task 1_1 is assigned to thread 1. > # no checkpoint was found under 1_1 state folder, so reset the offset to 0 > and clean the local state folder. > # thread 1 transitions to RUNNING state directly without the restoration for > task 1_1, so the historic data for state 1_1 is lost for thread 1. > *ThoubleShoot* > To investigate the cause for this issue, we analysis the source code in > KafkaStream and found the key is the variable named "completedRestorers". > This is the definition of the variable: > {code:java} > private final Set completedRestorers =
[jira] [Resolved] (KAFKA-4835) Allow users control over repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-4835. Resolution: Fixed > Allow users control over repartitioning > --- > > Key: KAFKA-4835 > URL: https://issues.apache.org/jira/browse/KAFKA-4835 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Michal Borowiecki >Priority: Major > Labels: needs-kip > > From > https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030 > ...it would be good to provide users more control over the repartitioning. > My use case is as follows (unrelated bits omitted for brevity): > {code} > KTable loggedInCustomers = builder > .stream("customerLogins") > .groupBy((key, activity) -> > activity.getCustomerRef()) > .reduce((first,second) -> second, loginStore()); > > builder > .stream("balanceUpdates") > .map((key, activity) -> new KeyValue<>( > activity.getCustomerRef(), > activity)) > .join(loggedInCustomers, (activity, session) -> ... > .to("sessions"); > {code} > Both "groupBy" and "map" in the underlying implementation set the > repartitionRequired flag (since the key changes), and the aggregation/join > that follows will create the repartitioned topic. > However, in our case I know that both input streams are already partitioned > by the customerRef value, which I'm mapping into the key (because it's > required by the join operation). > So there are 2 unnecessary intermediate topics created with their associated > overhead, while the ultimate goal is simply to do a join on a value that we > already use to partition the original streams anyway. > (Note, we don't have the option to re-implement the original input streams to > make customerRef the message key.) > I think it would be better to allow the user to decide (from their knowledge > of the incoming streams) whether a repartition is mandatory on aggregation > and join operations (overloaded version of the methods with the > repartitionRequired flag exposed maybe?) > An alternative would be to allow users to perform a join on a value other > than the key (a keyValueMapper parameter to join, like the one used for joins > with global tables), but I expect that to be more involved and error-prone to > use for people who don't understand the partitioning requirements well > (whereas it's safe for global tables). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-4835) Allow users control over repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reopened KAFKA-4835: > Allow users control over repartitioning > --- > > Key: KAFKA-4835 > URL: https://issues.apache.org/jira/browse/KAFKA-4835 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Michal Borowiecki >Priority: Major > Labels: needs-kip > > From > https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030 > ...it would be good to provide users more control over the repartitioning. > My use case is as follows (unrelated bits omitted for brevity): > {code} > KTable loggedInCustomers = builder > .stream("customerLogins") > .groupBy((key, activity) -> > activity.getCustomerRef()) > .reduce((first,second) -> second, loginStore()); > > builder > .stream("balanceUpdates") > .map((key, activity) -> new KeyValue<>( > activity.getCustomerRef(), > activity)) > .join(loggedInCustomers, (activity, session) -> ... > .to("sessions"); > {code} > Both "groupBy" and "map" in the underlying implementation set the > repartitionRequired flag (since the key changes), and the aggregation/join > that follows will create the repartitioned topic. > However, in our case I know that both input streams are already partitioned > by the customerRef value, which I'm mapping into the key (because it's > required by the join operation). > So there are 2 unnecessary intermediate topics created with their associated > overhead, while the ultimate goal is simply to do a join on a value that we > already use to partition the original streams anyway. > (Note, we don't have the option to re-implement the original input streams to > make customerRef the message key.) > I think it would be better to allow the user to decide (from their knowledge > of the incoming streams) whether a repartition is mandatory on aggregation > and join operations (overloaded version of the methods with the > repartitionRequired flag exposed maybe?) > An alternative would be to allow users to perform a join on a value other > than the key (a keyValueMapper parameter to join, like the one used for joins > with global tables), but I expect that to be more involved and error-prone to > use for people who don't understand the partitioning requirements well > (whereas it's safe for global tables). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16735067#comment-16735067 ] Boyang Chen commented on KAFKA-4835: [~mjsax] Oh I thought I accidentally closed it before. Reopening! > Allow users control over repartitioning > --- > > Key: KAFKA-4835 > URL: https://issues.apache.org/jira/browse/KAFKA-4835 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Michal Borowiecki >Priority: Major > Labels: needs-kip > > From > https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030 > ...it would be good to provide users more control over the repartitioning. > My use case is as follows (unrelated bits omitted for brevity): > {code} > KTable loggedInCustomers = builder > .stream("customerLogins") > .groupBy((key, activity) -> > activity.getCustomerRef()) > .reduce((first,second) -> second, loginStore()); > > builder > .stream("balanceUpdates") > .map((key, activity) -> new KeyValue<>( > activity.getCustomerRef(), > activity)) > .join(loggedInCustomers, (activity, session) -> ... > .to("sessions"); > {code} > Both "groupBy" and "map" in the underlying implementation set the > repartitionRequired flag (since the key changes), and the aggregation/join > that follows will create the repartitioned topic. > However, in our case I know that both input streams are already partitioned > by the customerRef value, which I'm mapping into the key (because it's > required by the join operation). > So there are 2 unnecessary intermediate topics created with their associated > overhead, while the ultimate goal is simply to do a join on a value that we > already use to partition the original streams anyway. > (Note, we don't have the option to re-implement the original input streams to > make customerRef the message key.) > I think it would be better to allow the user to decide (from their knowledge > of the incoming streams) whether a repartition is mandatory on aggregation > and join operations (overloaded version of the methods with the > repartitionRequired flag exposed maybe?) > An alternative would be to allow users to perform a join on a value other > than the key (a keyValueMapper parameter to join, like the one used for joins > with global tables), but I expect that to be more involved and error-prone to > use for people who don't understand the partitioning requirements well > (whereas it's safe for global tables). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss
[ https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16735422#comment-16735422 ] Boyang Chen commented on KAFKA-7672: [~guozhang] Thanks for the explanation! It's a little bit confusing why we don't write checkpoint file when EOS is on... anyway is this the fix you are talking about: https://github.com/apache/kafka/compare/trunk...abbccdda:bug_fix > The local state not fully restored after KafkaStream rebalanced, resulting in > data loss > --- > > Key: KAFKA-7672 > URL: https://issues.apache.org/jira/browse/KAFKA-7672 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0 >Reporter: linyue li >Assignee: linyue li >Priority: Major > Labels: bug > Fix For: 2.2.0, 2.1.1 > > > Normally, when a task is mitigated to a new thread and no checkpoint file was > found under its task folder, Kafka Stream needs to restore the local state > for remote changelog topic completely and then resume running. However, in > some scenarios, we found that Kafka Stream *NOT* restore this state even no > checkpoint was found, but just clean the state folder and transition to > running state directly, resulting the historic data loss. > To be specific, I will give the detailed logs for Kafka Stream in our project > to show this scenario: > {quote}2018-10-23 08:27:07,684 INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Revoking previously assigned partitions [AuditTrailBatch-0-5] > 2018-10-23 08:27:07,684 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to > PARTITIONS_REVOKED > 2018-10-23 08:27:10,856 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > (Re-)joining group > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Successfully joined group with generation 323 > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1] > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to > PARTITIONS_ASSIGNED > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1* > 2018-10-23 08:27:53,622 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms. > 2018-10-23 08:27:54,357 INFO > org.apache.kafka.streams.processor.internals.StoreChangelogReader - > stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task > 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* > *Reinitializing the task and restore its state from the beginning.* > 2018-10-23 08:27:54,357 INFO > org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting > offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.* > 2018-10-23 08:27:54,653 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to > RUNNING* > {quote} > From the logs above, we can get the procedure for thread > AuditTrailBatch-StreamThread-1: > # the previous running task assigned to thread 1 is task 0_5 (the > corresponding partition is AuditTrailBatch-0-5) > # group begins to rebalance, the new task 1_1 is assigned to thread 1. > # no checkpoint was found under 1_1 state folder, so reset the offset to 0 > and clean the local state folder. > # thread 1 transitions to RUNNING state directly without the restoration for > task 1_1, so the historic data for state 1_1 is lost for thread 1. > *ThoubleShoot* > To investigate the cause for this issue, we analysis the source code in > KafkaStream and found the key is the variable named "completedRestorers". > This is the definition of the variable: > {code:java} > priv
[jira] [Commented] (KAFKA-7798) Expose embedded client context from KafkaStreams threadMetadata
[ https://issues.apache.org/jira/browse/KAFKA-7798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16737595#comment-16737595 ] Boyang Chen commented on KAFKA-7798: Great speed! > Expose embedded client context from KafkaStreams threadMetadata > --- > > Key: KAFKA-7798 > URL: https://issues.apache.org/jira/browse/KAFKA-7798 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > A KafkaStreams client today contains multiple embedded clients: producer, > consumer and admin client. Currently these client's context like client id > are not exposed via KafkaStreams. This ticket proposes to expose those > context information at the per-thread basis (since each thread has its own > embedded clients) via ThreadMetadata. > This also has an interplay with KIP-345: as we add group.instance.id in that > KIP, this information should also be exposed as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7806) Windowed Aggregations should wrap default key serde if none is specified
[ https://issues.apache.org/jira/browse/KAFKA-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738562#comment-16738562 ] Boyang Chen commented on KAFKA-7806: Thanks John for bringing this up! FYI, [~shnguyen] has contributed a [KIP-393|https://cwiki.apache.org/confluence/display/KAFKA/KIP-393%3A+Time+windowed+serde+to+properly+deserialize+changelog+input+topic] which should be helpful here to wrap the default serde. > Windowed Aggregations should wrap default key serde if none is specified > > > Key: KAFKA-7806 > URL: https://issues.apache.org/jira/browse/KAFKA-7806 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > In Streams, windowing a stream by either time or session windows causes the > stream's keys to be transformed from `K` to `Windowed`. > Since this is a well defined transition, it's not necessary for developers to > explicitly provide a `Serde>`. For convenience, Streams, which > already knows the key serde (`Serde`) automatically wraps it in case it's > needed by downstream operators. > However, this automatic wrapping only takes place if the key serde has been > explicitly provided in the topology. If the topology relies on the > `default.key.serde` configuration, no wrapping takes place, and downstream > operators will encounter a ClassCastException trying to cast a `Windowed` > (the windowed key) to whatever type the default serde handles (which is the > key wrapped inside the windowed key). > Specifically, they key serde forwarding logic is: > in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`: > `materializedInternal.keySerde() != null ? new > FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : > null` > and in > `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`: > `materializedInternal.keySerde() != null ? new > WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null` > > This pattern of not "solidifying" the default key serde is common in Streams. > Not all operators need a serde, and the default serde may not be applicable > to all operators. So, it would be a mistake to arbitrary operators to grab > the default serde and pass it downstream as if it had been explicitly set. > > However, in this case specifically, all windowed aggregations are stateful, > so if we don't have an explicit key serde at this point, we know that we have > used the default serde in the window store. If the default serde were > incorrect, an exception would be thrown by the windowed aggregation itself. > So it actually is safe to wrap the default serde in a windowed serde and pass > it downstream, which would result in a better development experience. > > Unfortunately, the default serde is set via config, but the windowed serde > wrapping happens during DSL building, when the config is not generally > available. Therefore, we would need a special windowed serde wrapper that > signals that it wraps the default serde, which would be fully resolved during > operators' init call. > For example, something of this nature: > `materializedInternal.keySerde() != null ? new > FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : > FullTimeWindowedSerde.wrapDefault(windows.size())` > etc. > > Complicating the situation slightly, all the windowed serializers and > deserializers will resolve a runtime inner class using > `default.windowed.key.serde.inner` if given a null inner serde to wrap. > However, at this point in the topology build, we do know that the windowed > aggregation has specifically used the `default.key.serde`, not the > `default.windowed.key.serde.inner` to persist its state to the window store, > therefore, it should be correct to wrap the default key serde specifically > and not use the `default.windowed.key.serde.inner`. > > In addition to fixing this for TimeWindowed and SessionWindowed streams, we > need to have good test coverage of the new code. There is clearly a blind > spot in the tests, or we would have noticed this sooner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss
[ https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738973#comment-16738973 ] Boyang Chen commented on KAFKA-7672: Thanks for the thorough investigation and summary here [~guozhang]! I updated the fix based on my understanding: [https://github.com/apache/kafka/pull/6115/|https://github.com/apache/kafka/pull/6115/files] take another look when you got time, thank you! > The local state not fully restored after KafkaStream rebalanced, resulting in > data loss > --- > > Key: KAFKA-7672 > URL: https://issues.apache.org/jira/browse/KAFKA-7672 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0 >Reporter: linyue li >Assignee: linyue li >Priority: Major > Labels: bug > Fix For: 2.2.0, 2.1.1 > > > Normally, when a task is mitigated to a new thread and no checkpoint file was > found under its task folder, Kafka Stream needs to restore the local state > for remote changelog topic completely and then resume running. However, in > some scenarios, we found that Kafka Stream *NOT* restore this state even no > checkpoint was found, but just clean the state folder and transition to > running state directly, resulting the historic data loss. > To be specific, I will give the detailed logs for Kafka Stream in our project > to show this scenario: > {quote}2018-10-23 08:27:07,684 INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Revoking previously assigned partitions [AuditTrailBatch-0-5] > 2018-10-23 08:27:07,684 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to > PARTITIONS_REVOKED > 2018-10-23 08:27:10,856 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > (Re-)joining group > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Successfully joined group with generation 323 > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] > Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1] > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to > PARTITIONS_ASSIGNED > 2018-10-23 08:27:53,153 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1* > 2018-10-23 08:27:53,622 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms. > 2018-10-23 08:27:54,357 INFO > org.apache.kafka.streams.processor.internals.StoreChangelogReader - > stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task > 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* > *Reinitializing the task and restore its state from the beginning.* > 2018-10-23 08:27:54,357 INFO > org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting > offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.* > 2018-10-23 08:27:54,653 INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to > RUNNING* > {quote} > From the logs above, we can get the procedure for thread > AuditTrailBatch-StreamThread-1: > # the previous running task assigned to thread 1 is task 0_5 (the > corresponding partition is AuditTrailBatch-0-5) > # group begins to rebalance, the new task 1_1 is assigned to thread 1. > # no checkpoint was found under 1_1 state folder, so reset the offset to 0 > and clean the local state folder. > # thread 1 transitions to RUNNING state directly without the restoration for > task 1_1, so the historic data for state 1_1 is lost for thread 1. > *ThoubleShoot* > To investigate the cause for this issue, we analysis the source code in > KafkaStream and found the key is the variable named "completedRestorers". > This is the definition of the variabl
[jira] [Created] (KAFKA-7816) Windowed topic should have window size as part of the metadata
Boyang Chen created KAFKA-7816: -- Summary: Windowed topic should have window size as part of the metadata Key: KAFKA-7816 URL: https://issues.apache.org/jira/browse/KAFKA-7816 Project: Kafka Issue Type: Improvement Components: consumer, streams Reporter: Boyang Chen Assignee: Boyang Chen Currently the Kafka window store topics require a windowed serde to properly deserialize the records. One of the required config is `window.size.ms`, which indicates the diff between (window.end - window.start). For space efficiency, KStream only stores the windowed record with window start time, because as long as the restore consumer knows size of the window, it would properly derive the window end time by adding window.size.ms to window start time. However, this makes the reuse of window topic very hard because another user has to config the correct window size in order to deserialize the data. When we extract the customized consumer as a template, every time new user has to define their own window size. If we do wild-card matching consumer, things could be even worse to work because different topics may have different window size and user has to read through the application code to find that info. To make the decoding of window topic easier, we are proposing to add a new config to TopicMetadata called `windowSize` which could be used for applications to properly deserialize the data without requirement to config a window size. This could also make client side serde API easier. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7820) distinct count kafka streams api
[ https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16742549#comment-16742549 ] Boyang Chen commented on KAFKA-7820: Hey Vinoth, thanks for proposing this! Based on your use case, I'm wondering whether we could repartition the input with all the cared fields are a compound key, and aggregate based on the key? That should be able to fulfill your requirement. > distinct count kafka streams api > > > Key: KAFKA-7820 > URL: https://issues.apache.org/jira/browse/KAFKA-7820 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Vinoth Rajasekar >Priority: Minor > > we are using Kafka streams for our real-time analytic use cases. most of our > use cases involved with doing distinct count on certain fields. > currently we do distinct count by storing the hash map value of the data in a > set and do a count as event flows in. There are lot of challenges doing this > using application memory, because storing the hashmap value and counting them > is limited by the allotted memory size. When we get high volume or spike in > traffic hash map of the distinct count fields grows beyond allotted memory > size leading to issues. > other issue is when we scale the app, we need to use global ktables so we > get all the values for doing distinct count and this adds back pressure in > the cluster or we have to re-partition the topic and do count on the key. > Can we have feature, where the distinct count is supported by through streams > api at the framework level, rather than dealing it with application level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7824) Require member.id for initial join group request
Boyang Chen created KAFKA-7824: -- Summary: Require member.id for initial join group request Key: KAFKA-7824 URL: https://issues.apache.org/jira/browse/KAFKA-7824 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Boyang Chen Assignee: Boyang Chen For request with unknown member id, broker will blindly accept the new join group request, store the member metadata and return a UUID to consumer. The edge case is that if initial join group request keeps failing due to connection timeout, or the consumer keeps restarting, or the max.poll.interval.ms configured on client is set to infinite (no rebalance timeout kicking in to clean up the member metadata map), there will be accumulated MemberMetadata info within group metadata cache which will eventually burst broker memory. The detection and fencing of invalid join group request is crucial for broker stability. The proposed solution is to require one more bounce for the consumer to use a valid member.id to join the group. Details in this [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7824) Require member.id for initial join group request
[ https://issues.apache.org/jira/browse/KAFKA-7824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7824: --- Issue Type: Sub-task (was: Improvement) Parent: KAFKA-7610 > Require member.id for initial join group request > > > Key: KAFKA-7824 > URL: https://issues.apache.org/jira/browse/KAFKA-7824 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > For request with unknown member id, broker will blindly accept the new join > group request, store the member metadata and return a UUID to consumer. The > edge case is that if initial join group request keeps failing due to > connection timeout, or the consumer keeps restarting, or the > max.poll.interval.ms configured on client is set to infinite (no rebalance > timeout kicking in to clean up the member metadata map), there will be > accumulated MemberMetadata info within group metadata cache which will > eventually burst broker memory. The detection and fencing of invalid join > group request is crucial for broker stability. > > The proposed solution is to require one more bounce for the consumer to use a > valid member.id to join the group. Details in this > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7820) distinct count kafka streams api
[ https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743479#comment-16743479 ] Boyang Chen commented on KAFKA-7820: Thanks [~vinubarro] for more details. I think we need to further understand the use case before we decide whether we need to add a new API. # About repartition, since KStream does aggregation on partition level, so it is required to have the same key hashing to the same partition. My question is that how many unique keys we are having here (the combo out of all 15-20 fields)? If the total number of the keys are not that big, it should be ok # We don't need to have multiple KTables to solve the problem. We could just get a common aggregation key and do the counts, if you are referring to one single streaming instance. At Pinterest, we are building a generic API on top of #aggregate() by extracting necessary fields to generate a superset join key through our thrift structure. In conclusion, with proper repartition applied, count() should be suffice for our use case. If we want the field extraction on framework level, we need to have the API support multiple data types (Json, Avro, Thrift). Let me know if this answers your question. > distinct count kafka streams api > > > Key: KAFKA-7820 > URL: https://issues.apache.org/jira/browse/KAFKA-7820 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Vinoth Rajasekar >Priority: Minor > Labels: needs-kip > > we are using Kafka streams for our real-time analytic use cases. most of our > use cases involved with doing distinct count on certain fields. > currently we do distinct count by storing the hash map value of the data in a > set and do a count as event flows in. There are lot of challenges doing this > using application memory, because storing the hashmap value and counting them > is limited by the allotted memory size. When we get high volume or spike in > traffic hash map of the distinct count fields grows beyond allotted memory > size leading to issues. > other issue is when we scale the app, we need to use global ktables so we > get all the values for doing distinct count and this adds back pressure in > the cluster or we have to re-partition the topic and do count on the key. > Can we have feature, where the distinct count is supported by through streams > api at the framework level, rather than dealing it with application level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7820) distinct count kafka streams api
[ https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743479#comment-16743479 ] Boyang Chen edited comment on KAFKA-7820 at 1/16/19 12:09 AM: -- Thanks [~vinubarro] for more details. I think we need to further understand the use case before we decide whether we need to add a new API. # About repartition, since KStream does aggregation on partition level, so it is required to have the same key hashing to the same partition. My question is that how many unique keys we are having here (the combo out of all 15-20 fields)? If the total number of the keys are not that big, it should be ok # We don't need to have multiple KTables to solve the problem. We could just get a common aggregation key and do the counts, if you are referring to one single streaming instance. At Pinterest, we are building a generic API on top of #aggregate() by extracting necessary fields to generate a superset join key through our thrift structure. In conclusion, with proper repartition applied, count() should be suffice for most use cases. To add support for field extraction on framework level, we need to have the API support multiple data types (Json, Avro, Thrift). Let me know if this answers your question. was (Author: bchen225242): Thanks [~vinubarro] for more details. I think we need to further understand the use case before we decide whether we need to add a new API. # About repartition, since KStream does aggregation on partition level, so it is required to have the same key hashing to the same partition. My question is that how many unique keys we are having here (the combo out of all 15-20 fields)? If the total number of the keys are not that big, it should be ok # We don't need to have multiple KTables to solve the problem. We could just get a common aggregation key and do the counts, if you are referring to one single streaming instance. At Pinterest, we are building a generic API on top of #aggregate() by extracting necessary fields to generate a superset join key through our thrift structure. In conclusion, with proper repartition applied, count() should be suffice for our use case. If we want the field extraction on framework level, we need to have the API support multiple data types (Json, Avro, Thrift). Let me know if this answers your question. > distinct count kafka streams api > > > Key: KAFKA-7820 > URL: https://issues.apache.org/jira/browse/KAFKA-7820 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Vinoth Rajasekar >Priority: Minor > Labels: needs-kip > > we are using Kafka streams for our real-time analytic use cases. most of our > use cases involved with doing distinct count on certain fields. > currently we do distinct count by storing the hash map value of the data in a > set and do a count as event flows in. There are lot of challenges doing this > using application memory, because storing the hashmap value and counting them > is limited by the allotted memory size. When we get high volume or spike in > traffic hash map of the distinct count fields grows beyond allotted memory > size leading to issues. > other issue is when we scale the app, we need to use global ktables so we > get all the values for doing distinct count and this adds back pressure in > the cluster or we have to re-partition the topic and do count on the key. > Can we have feature, where the distinct count is supported by through streams > api at the framework level, rather than dealing it with application level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7816) Windowed topic should have window size as part of the metadata
[ https://issues.apache.org/jira/browse/KAFKA-7816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747680#comment-16747680 ] Boyang Chen commented on KAFKA-7816: [~guozhang] [~mjsax] WDYT? > Windowed topic should have window size as part of the metadata > -- > > Key: KAFKA-7816 > URL: https://issues.apache.org/jira/browse/KAFKA-7816 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently the Kafka window store topics require a windowed serde to properly > deserialize the records. One of the required config is `window.size.ms`, > which indicates the diff between (window.end - window.start). For space > efficiency, KStream only stores the windowed record with window start time, > because as long as the restore consumer knows size of the window, it would > properly derive the window end time by adding window.size.ms to window start > time. > However, this makes the reuse of window topic very hard because another user > has to config the correct window size in order to deserialize the data. When > we extract the customized consumer as a template, every time new user has to > define their own window size. If we do wild-card matching consumer, things > could be even worse to work because different topics may have different > window size and user has to read through the application code to find that > info. > To make the decoding of window topic easier, we are proposing to add a new > config to TopicMetadata called `windowSize` which could be used for > applications to properly deserialize the data without requirement to config a > window size. This could also make client side serde API easier. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7816) Windowed topic should have window size as part of the metadata
[ https://issues.apache.org/jira/browse/KAFKA-7816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16748373#comment-16748373 ] Boyang Chen commented on KAFKA-7816: Yea, good point here [~guozhang], from the same windowed topic, allowing user to interpret the data in customized way also makes sense. Let me close this Jira in a couple of days if no other opinion is bumped. > Windowed topic should have window size as part of the metadata > -- > > Key: KAFKA-7816 > URL: https://issues.apache.org/jira/browse/KAFKA-7816 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently the Kafka window store topics require a windowed serde to properly > deserialize the records. One of the required config is `window.size.ms`, > which indicates the diff between (window.end - window.start). For space > efficiency, KStream only stores the windowed record with window start time, > because as long as the restore consumer knows size of the window, it would > properly derive the window end time by adding window.size.ms to window start > time. > However, this makes the reuse of window topic very hard because another user > has to config the correct window size in order to deserialize the data. When > we extract the customized consumer as a template, every time new user has to > define their own window size. If we do wild-card matching consumer, things > could be even worse to work because different topics may have different > window size and user has to read through the application code to find that > info. > To make the decoding of window topic easier, we are proposing to add a new > config to TopicMetadata called `windowSize` which could be used for > applications to properly deserialize the data without requirement to config a > window size. This could also make client side serde API easier. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7853) Refactor ConsumerCoordinator/AbstractCoordinator to reduce constructor parameter list
Boyang Chen created KAFKA-7853: -- Summary: Refactor ConsumerCoordinator/AbstractCoordinator to reduce constructor parameter list Key: KAFKA-7853 URL: https://issues.apache.org/jira/browse/KAFKA-7853 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen The parameter lists for class ConsumerCoordinator/AbstractCoordinator are growing over time. We should think of reducing the parameter size by introducing some intermediate data structs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7853) Refactor ConsumerCoordinator/AbstractCoordinator to reduce constructor parameter list
[ https://issues.apache.org/jira/browse/KAFKA-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-7853: -- Assignee: Boyang Chen > Refactor ConsumerCoordinator/AbstractCoordinator to reduce constructor > parameter list > - > > Key: KAFKA-7853 > URL: https://issues.apache.org/jira/browse/KAFKA-7853 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > The parameter lists for class ConsumerCoordinator/AbstractCoordinator are > growing over time. We should think of reducing the parameter size by > introducing some intermediate data structs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7857) Add AbstractCoordinatorConfig class to consolidate consumer coordinator configs between Consumer and Connect
Boyang Chen created KAFKA-7857: -- Summary: Add AbstractCoordinatorConfig class to consolidate consumer coordinator configs between Consumer and Connect Key: KAFKA-7857 URL: https://issues.apache.org/jira/browse/KAFKA-7857 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Assignee: Boyang Chen Right now there are a lot of duplicate configuration concerning client coordinator shared across ConsumerConfig and DistributedConfig (connect config). It makes sense to extract all coordinator related configs into a separate config class to reduce code redundancy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7859) Replace LeaveGroup request/response with automated protocol
Boyang Chen created KAFKA-7859: -- Summary: Replace LeaveGroup request/response with automated protocol Key: KAFKA-7859 URL: https://issues.apache.org/jira/browse/KAFKA-7859 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7858) Replace JoinGroup request/response with automated protocol
Boyang Chen created KAFKA-7858: -- Summary: Replace JoinGroup request/response with automated protocol Key: KAFKA-7858 URL: https://issues.apache.org/jira/browse/KAFKA-7858 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7862) Modify JoinGroup logic to incorporate group.instance.id change
Boyang Chen created KAFKA-7862: -- Summary: Modify JoinGroup logic to incorporate group.instance.id change Key: KAFKA-7862 URL: https://issues.apache.org/jira/browse/KAFKA-7862 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen The step one for KIP-345 join group logic change to corporate with static membership. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6995) Make config "internal.leave.group.on.close" public
[ https://issues.apache.org/jira/browse/KAFKA-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-6995. Resolution: Won't Fix We are planning to deprecate this config with the introduction of KIP-345. > Make config "internal.leave.group.on.close" public > -- > > Key: KAFKA-6995 > URL: https://issues.apache.org/jira/browse/KAFKA-6995 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Labels: needs-kip > > We are proposing to make the config "internal.leave.group.on.close" public. > The reason is that for heavy state application the sticky assignment won't > work because each stream worker will leave group during rolling restart, and > there is a possibility that some members are left and rejoined while others > are still awaiting restart. This would then cause multiple rebalance because > after the ongoing rebalance is done, we are expecting late members to rejoin > and move state from `stable` to `prepareBalance`. To solve this problem, > heavy state application needs to use this config to avoid member list update, > so that at most one rebalance will be triggered at a proper time when all the > members are rejoined during rolling restart. This should just be one line > change. > Code here: > * internal.leave.group.on.close > * Whether or not the consumer should leave the group on close. If set to > false then a rebalance > * won't occur until session.timeout.ms expires. > * > * > * Note: this is an internal configuration and could be changed in the future > in a backward incompatible way > * > */ > static final String LEAVE_GROUP_ON_CLOSE_CONFIG = > "internal.leave.group.on.close"; -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7859) Replace LeaveGroup request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-7859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-7859. Resolution: Fixed > Replace LeaveGroup request/response with automated protocol > --- > > Key: KAFKA-7859 > URL: https://issues.apache.org/jira/browse/KAFKA-7859 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7816) Windowed topic should have window size as part of the metadata
[ https://issues.apache.org/jira/browse/KAFKA-7816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-7816. Resolution: Won't Fix Not a real use case > Windowed topic should have window size as part of the metadata > -- > > Key: KAFKA-7816 > URL: https://issues.apache.org/jira/browse/KAFKA-7816 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently the Kafka window store topics require a windowed serde to properly > deserialize the records. One of the required config is `window.size.ms`, > which indicates the diff between (window.end - window.start). For space > efficiency, KStream only stores the windowed record with window start time, > because as long as the restore consumer knows size of the window, it would > properly derive the window end time by adding window.size.ms to window start > time. > However, this makes the reuse of window topic very hard because another user > has to config the correct window size in order to deserialize the data. When > we extract the customized consumer as a template, every time new user has to > define their own window size. If we do wild-card matching consumer, things > could be even worse to work because different topics may have different > window size and user has to read through the application code to find that > info. > To make the decoding of window topic easier, we are proposing to add a new > config to TopicMetadata called `windowSize` which could be used for > applications to properly deserialize the data without requirement to config a > window size. This could also make client side serde API easier. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7899) Command line tool to invalidate group metadata for clean assignment
Boyang Chen created KAFKA-7899: -- Summary: Command line tool to invalidate group metadata for clean assignment Key: KAFKA-7899 URL: https://issues.apache.org/jira/browse/KAFKA-7899 Project: Kafka Issue Type: New Feature Components: consumer, streams Affects Versions: 1.1.0 Reporter: Boyang Chen Assignee: Boyang Chen Right now the group metadata will affect consumers under sticky assignment, since it persists previous topic partition assignment which affects the judgement of consumer leader. Specifically for KStream applications (under 1.1), if we are scaling up the cluster, it is hard to balance the traffic since most tasks would still go to "previous round active" assignments, even though we hope them to move towards other hosts. It would be preferable to have a tool to invalidate the group metadata stored on broker, so that for sticky assignment we could have a clean start. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7899) Command line tool to invalidate group metadata for clean assignment
[ https://issues.apache.org/jira/browse/KAFKA-7899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-7899. Resolution: Invalid It appears that the metadata should be generated by the client instead of broker. So basically this is invalid issue. > Command line tool to invalidate group metadata for clean assignment > --- > > Key: KAFKA-7899 > URL: https://issues.apache.org/jira/browse/KAFKA-7899 > Project: Kafka > Issue Type: New Feature > Components: consumer, streams >Affects Versions: 1.1.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Right now the group metadata will affect consumers under sticky assignment, > since it persists previous topic partition assignment which affects the > judgement of consumer leader. Specifically for KStream applications (under > 1.1), if we are scaling up the cluster, it is hard to balance the traffic > since most tasks would still go to "previous round active" assignments, even > though we hope them to move towards other hosts. > It would be preferable to have a tool to invalidate the group metadata stored > on broker, so that for sticky assignment we could have a clean start. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7903) Replace OffsetCommit request/response with automated protocol
Boyang Chen created KAFKA-7903: -- Summary: Replace OffsetCommit request/response with automated protocol Key: KAFKA-7903 URL: https://issues.apache.org/jira/browse/KAFKA-7903 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7995) Augment singleton protocol type to list for Kafka Consumer
Boyang Chen created KAFKA-7995: -- Summary: Augment singleton protocol type to list for Kafka Consumer Key: KAFKA-7995 URL: https://issues.apache.org/jira/browse/KAFKA-7995 Project: Kafka Issue Type: Improvement Components: consumer, core Reporter: Boyang Chen Right now Kafka consumer protocol uses a singleton marker to distinguish Kafka Connect worker and normal consumer. This is not upgrade-friendly approach since the protocol type could potential change over time. A better approach is to support multiple candidacies so that the no downtime protocol type switch could achieve. For example, if we are trying to upgrade a Kafka Streams application towards a protocol type called "stream", right now there is no way to do this without downtime since broker will reject changing protocol type to a different one unless the group is back to empty. If we allow new member to provide a list of protocol type instead ("consumer", "stream"), there would be no compatibility issue. Alternative approach is to invent an admin API to change group's protocol type on runtime. However, the burden introduced on administrator is not trivial, since we need to guarantee the operation series to be correct, otherwise we will see limp-upgrade experience in the midpoint, for example while we are changing protocol type there was unexpected rebalance that causes old members join failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7995) Augment singleton protocol type to list for Kafka Consumer
[ https://issues.apache.org/jira/browse/KAFKA-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-7995: -- Assignee: (was: Boyang Chen) > Augment singleton protocol type to list for Kafka Consumer > > > Key: KAFKA-7995 > URL: https://issues.apache.org/jira/browse/KAFKA-7995 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > Right now Kafka consumer protocol uses a singleton marker to distinguish > Kafka Connect worker and normal consumer. This is not upgrade-friendly > approach since the protocol type could potential change over time. A better > approach is to support multiple candidacies so that the no downtime protocol > type switch could achieve. > For example, if we are trying to upgrade a Kafka Streams application towards > a protocol type called "stream", right now there is no way to do this without > downtime since broker will reject changing protocol type to a different one > unless the group is back to empty. If we allow new member to provide a list > of protocol type instead ("consumer", "stream"), there would be no > compatibility issue. > Alternative approach is to invent an admin API to change group's protocol > type on runtime. However, the burden introduced on administrator is not > trivial, since we need to guarantee the operation series to be correct, > otherwise we will see limp-upgrade experience in the midpoint, for example > while we are changing protocol type there was unexpected rebalance that > causes old members join failure. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7995) Augment singleton protocol type to list for Kafka Consumer
[ https://issues.apache.org/jira/browse/KAFKA-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-7995: -- Assignee: Boyang Chen > Augment singleton protocol type to list for Kafka Consumer > > > Key: KAFKA-7995 > URL: https://issues.apache.org/jira/browse/KAFKA-7995 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Labels: newbie > > Right now Kafka consumer protocol uses a singleton marker to distinguish > Kafka Connect worker and normal consumer. This is not upgrade-friendly > approach since the protocol type could potential change over time. A better > approach is to support multiple candidacies so that the no downtime protocol > type switch could achieve. > For example, if we are trying to upgrade a Kafka Streams application towards > a protocol type called "stream", right now there is no way to do this without > downtime since broker will reject changing protocol type to a different one > unless the group is back to empty. If we allow new member to provide a list > of protocol type instead ("consumer", "stream"), there would be no > compatibility issue. > Alternative approach is to invent an admin API to change group's protocol > type on runtime. However, the burden introduced on administrator is not > trivial, since we need to guarantee the operation series to be correct, > otherwise we will see limp-upgrade experience in the midpoint, for example > while we are changing protocol type there was unexpected rebalance that > causes old members join failure. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8019) Better Scaling Experience for KStream
Boyang Chen created KAFKA-8019: -- Summary: Better Scaling Experience for KStream Key: KAFKA-8019 URL: https://issues.apache.org/jira/browse/KAFKA-8019 Project: Kafka Issue Type: New Feature Reporter: Boyang Chen Assignee: Boyang Chen In our day-to-day work, we found it really hard to scale up a stateful stream application when its state store is very heavy. The caveat is that when the newly spinned hosts take ownership of some active tasks, so that they need to use non-trivial amount of time to restore the state store from changelog topic. The reassigned tasks would be available for unpredicted long time, which is not favorable. Secondly the current global rebalance stops the entire application process, which in a rolling host swap scenario would suggest an infinite resource shuffling without actual progress. Following the community's [cooperative rebalancing|https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing%3A+Support+and+Policies] proposal, we need to build something similar for KStream to better handle the auto scaling experience. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7566) Add sidecar job to leader (or a random single follower) only
Boyang Chen created KAFKA-7566: -- Summary: Add sidecar job to leader (or a random single follower) only Key: KAFKA-7566 URL: https://issues.apache.org/jira/browse/KAFKA-7566 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Hey there, recently we need to add an archive job to a streaming application. The caveat is that we need to make sure only one instance is doing this task to avoid potential race condition, and we also don't want to schedule it as a regular stream task so that we will be blocking normal streaming operation. Although we could do so by doing a zk lease, I'm raising the case here since this could be some potential use case for streaming job also. For example, there are some `leader specific` operation we could schedule in DSL instead of adhoc manner. Let me know if you think this makes sense to you, thank you! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7566) Add sidecar job to leader (or a random single follower) only
[ https://issues.apache.org/jira/browse/KAFKA-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7566: --- Priority: Minor (was: Major) Component/s: streams > Add sidecar job to leader (or a random single follower) only > > > Key: KAFKA-7566 > URL: https://issues.apache.org/jira/browse/KAFKA-7566 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Priority: Minor > > Hey there, > recently we need to add an archive job to a streaming application. The caveat > is that we need to make sure only one instance is doing this task to avoid > potential race condition, and we also don't want to schedule it as a regular > stream task so that we will be blocking normal streaming operation. > Although we could do so by doing a zk lease, I'm raising the case here since > this could be some potential use case for streaming job also. For example, > there are some `leader specific` operation we could schedule in DSL instead > of adhoc manner. > Let me know if you think this makes sense to you, thank you! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16682509#comment-16682509 ] Boyang Chen commented on KAFKA-7610: Thanks Jason for proposing this issue. I think in static membership ([KIP-345|https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances]) we should be able to address this issue by remembering the newly joined member's name, even after multiple disconnects we will still have the exact same member. This by far I feel is the easiest solution since no matter what approach we eventually take, it requires authentication of client identity. I'm in favor of the approach 2 so far on dynamic membership, and the key point I'm trying to understand is what if the response message failed on the way, which will lead to another "unknown member" join for a new consumer as I suppose? > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16682515#comment-16682515 ] Boyang Chen commented on KAFKA-7610: By saying failed on the way I mean a network timeout or some transit issue > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16682509#comment-16682509 ] Boyang Chen edited comment on KAFKA-7610 at 11/10/18 5:18 PM: -- Thanks Jason for proposing this issue. I think in static membership ([KIP-345|https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances]) we should be able to address this issue by remembering the newly joined member's name, even after multiple disconnects we will still have the exact same member. No matter what approach we eventually take, it requires authentication of client identity. I'm in favor of the approach 2 so far on dynamic membership, and the key point I'm trying to understand is what if the response message failed on the way, which will lead to another "unknown member" join for a new consumer as I suppose? Or we believe one or two failed responses shouldn't matter because our goal here is to avoid cache burst. Thanks! was (Author: bchen225242): Thanks Jason for proposing this issue. I think in static membership ([KIP-345|https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances]) we should be able to address this issue by remembering the newly joined member's name, even after multiple disconnects we will still have the exact same member. This by far I feel is the easiest solution since no matter what approach we eventually take, it requires authentication of client identity. I'm in favor of the approach 2 so far on dynamic membership, and the key point I'm trying to understand is what if the response message failed on the way, which will lead to another "unknown member" join for a new consumer as I suppose? > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7610: --- Comment: was deleted (was: By saying failed on the way I mean a network timeout or some transit issue) > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685810#comment-16685810 ] Boyang Chen commented on KAFKA-7610: [~hachikuji] So we will still accept unknown member joining the group? Because if we do that, one edge case I could think of is that a bad consumer keeps restarting itself which generates a lot of unknown join request. What if we hold a different map called *newMemberIds* to contain those responded member ids? This way we are fencing zombie registration and keep the memory of join attempts from new members so that next time when new members join the group we would recognize them, and do a "real join" and expand the original member list. Saving a single id should be much more memory efficient than saving a member metadata. Each time we finished one rebalance, just erase the *newMemberIds* map. We could define a new error code like UNASSIGNED_MEMBER to trigger immediate rejoin of new members. Does this extra protection make sense? > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685961#comment-16685961 ] Boyang Chen commented on KAFKA-7610: [~hachikuji] I see your point. > a simple way to limit the memory from unknown group members is to not store > the subscription until the first JoinGroup arrives using the generated > memberId The issue is that right now we are fencing real "unknown member id" when the given member id in jg request is not within the member list. So the question becomes "how do we know this consumer has visited and we already allocate a new member id for it". Any idea other than storing this allocated id information else where? > If we want to protect the overall size of the group, perhaps a configuration >would be more effective? For example, `group.max.size` or something like that. group.max.size is a good approach to limit the memory usage, however I'm just wondering whether this would create inconvenience to the user in case they need to scale up larger than group.max.size. What would be the expected behavior when we reach the member size limit, are we just refusing any new member join request then? > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685961#comment-16685961 ] Boyang Chen edited comment on KAFKA-7610 at 11/14/18 1:04 AM: -- [~hachikuji] I see your point. > a simple way to limit the memory from unknown group members is to not store > the subscription until the first JoinGroup arrives using the generated > memberId In current coordinator logic, we are fencing real "unknown member id" when the given member id in jg request is not within the member list. So the question becomes "how do we know this consumer has visited and we already allocate a new member id for it". Any idea other than storing this allocated id information else where? > If we want to protect the overall size of the group, perhaps a configuration >would be more effective? For example, `group.max.size` or something like that. group.max.size is a good approach to limit the memory usage, however I'm just wondering whether this would create inconvenience to the user in case they need to scale up larger than group.max.size. What would be the expected behavior when we reach the member size limit, are we just refusing any new member join request then? was (Author: bchen225242): [~hachikuji] I see your point. > a simple way to limit the memory from unknown group members is to not store > the subscription until the first JoinGroup arrives using the generated > memberId The issue is that right now we are fencing real "unknown member id" when the given member id in jg request is not within the member list. So the question becomes "how do we know this consumer has visited and we already allocate a new member id for it". Any idea other than storing this allocated id information else where? > If we want to protect the overall size of the group, perhaps a configuration >would be more effective? For example, `group.max.size` or something like that. group.max.size is a good approach to limit the memory usage, however I'm just wondering whether this would create inconvenience to the user in case they need to scale up larger than group.max.size. What would be the expected behavior when we reach the member size limit, are we just refusing any new member join request then? > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687475#comment-16687475 ] Boyang Chen commented on KAFKA-7610: Thanks [~guozhang] for jumping into the discussion! Just to clarify with [~hachikuji] so far the only goal of "detecting consumer failures in initial JoinGroup" is to prevent broker from memory burst due to many invalid member metadata correct? Do we have other considerations in this design? > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688332#comment-16688332 ] Boyang Chen commented on KAFKA-7610: Hey [~enether] great summary! I'm also in favor of the idea for setting an upper limit by `group.max.size`. This is so far the most intuitive way for end and has minimum changes needed. From my understanding, it is better to be set on the client side and advised to be set 10X ~ 100X of current membership size for end user. The default value could be further discussed when we talk about the memory limit we want to hold, because the member metadata size should be stable. [~hachikuji] [~guozhang] Thoughts? > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688332#comment-16688332 ] Boyang Chen edited comment on KAFKA-7610 at 11/15/18 4:44 PM: -- Hey [~enether] great summary! I'm also in favor of the idea for setting an upper limit by `group.max.size`. This is so far the most intuitive way for end user and has minimum changes needed. From my understanding, it is better to be set on the client side and advised to be set 10X ~ 100X of current membership size for end user. The default value could be further discussed when we talk about the memory limit we want to hold, because the member metadata size should be stable. [~hachikuji] [~guozhang] Thoughts? was (Author: bchen225242): Hey [~enether] great summary! I'm also in favor of the idea for setting an upper limit by `group.max.size`. This is so far the most intuitive way for end and has minimum changes needed. From my understanding, it is better to be set on the client side and advised to be set 10X ~ 100X of current membership size for end user. The default value could be further discussed when we talk about the memory limit we want to hold, because the member metadata size should be stable. [~hachikuji] [~guozhang] Thoughts? > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)