[jira] [Updated] (KAFKA-7632) Add producer option to adjust compression level
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin updated KAFKA-7632: --- Description: The compression level for ZSTD is currently set to use the default level (3), which is a conservative setting that in some use cases eliminates the value that ZSTD provides with improved compression. Each use case will vary, so exposing the level as a broker configuration setting will allow the user to adjust the level. Since it applies to the other compression codecs, we should add the same functionalities to them. was: The compression level for ZSTD is currently set to use the default level (3), which is a conservative setting that in some use cases eliminates the value that ZSTD provides with improved compression. Each use case will vary, so exposing the level as a broker configuration setting will allow the user to adjust the level. Since it applies to the other compresssion codecs, we should add same functionalities to them/ > Add producer option to adjust compression level > --- > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. > Since it applies to the other compression codecs, we should add the same > functionalities to them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin updated KAFKA-7632: --- Description: The compression level for ZSTD is currently set to use the default level (3), which is a conservative setting that in some use cases eliminates the value that ZSTD provides with improved compression. Each use case will vary, so exposing the level as a broker configuration setting will allow the user to adjust the level. Since it applies to the other compresssion codecs, we should add same functionalities to them/ was: The compression level for ZSTD is currently set to use the default level (3), which is a conservative setting that in some use cases eliminates the value that ZSTD provides with improved compression. Each use case will vary, so exposing the level as a broker configuration setting will allow the user to adjust the level. > Add option to kafka broker config to adjust compression level for zstd > -- > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. > Since it applies to the other compresssion codecs, we should add same > functionalities to them/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7632) Add producer option to adjust compression level
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin updated KAFKA-7632: --- Summary: Add producer option to adjust compression level (was: Add option to kafka broker config to adjust compression level for zstd) > Add producer option to adjust compression level > --- > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. > Since it applies to the other compresssion codecs, we should add same > functionalities to them/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin updated KAFKA-7632: --- Description: The compression level for ZSTD is currently set to use the default level (3), which is a conservative setting that in some use cases eliminates the value that ZSTD provides with improved compression. Each use case will vary, so exposing the level as a broker configuration setting will allow the user to adjust the level. was:The compression level for ZSTD is currently set to use the default level (3), which is a conservative setting that in some use cases eliminates the value that ZSTD provides with improved compression. Each use case will vary, so exposing the level as a broker configuration setting will allow the user to adjust the level. > Add option to kafka broker config to adjust compression level for zstd > -- > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin updated KAFKA-7632: --- Fix Version/s: (was: 2.1.0) 2.2.0 > Add option to kafka broker config to adjust compression level for zstd > -- > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin updated KAFKA-7632: --- Component/s: (was: core) clients > Add option to kafka broker config to adjust compression level for zstd > -- > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin updated KAFKA-7632: --- Priority: Major (was: Trivial) > Add option to kafka broker config to adjust compression level for zstd > -- > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > Fix For: 2.1.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin reassigned KAFKA-7632: -- Assignee: Lee Dongjin > Add option to kafka broker config to adjust compression level for zstd > -- > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Lee Dongjin >Priority: Trivial > Labels: needs-kip > Fix For: 2.1.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7647) Flaky test LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic
[ https://issues.apache.org/jira/browse/KAFKA-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7647: Description: {code} kafka.log.LogCleanerParameterizedIntegrationTest > testCleansCombinedCompactAndDeleteTopic[3] FAILED java.lang.AssertionError: Contents of the map shouldn't change expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 -> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 -> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 -> (299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:118) at kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129) {code} was: kafka.log.LogCleanerParameterizedIntegrationTest > testCleansCombinedCompactAndDeleteTopic[3] FAILED java.lang.AssertionError: Contents of the map shouldn't change expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 -> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 -> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 -> (299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:118) at kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129) > Flaky test > LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic > - > > Key: KAFKA-7647 > URL: https://issues.apache.org/jira/browse/KAFKA-7647 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > {code} > kafka.log.LogCleanerParameterizedIntegrationTest > > testCleansCombinedCompactAndDeleteTopic[3] FAILED > java.lang.AssertionError: Contents of the map shouldn't change > expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> > (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), > 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 -> > (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 -> > (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but > was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354), > 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 -> > (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 -> > (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 -> > (299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> > (355,355))> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:118) > at > kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7578) Kafka streams: add possibility to choose multiple output topics
[ https://issues.apache.org/jira/browse/KAFKA-7578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-7578: - Labels: needs-kip user-experience (was: needs-kip) > Kafka streams: add possibility to choose multiple output topics > > > Key: KAFKA-7578 > URL: https://issues.apache.org/jira/browse/KAFKA-7578 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Taras Danylchuk >Priority: Minor > Labels: needs-kip, user-experience > > There is an awesome feature which was added in 2.0 kafka stream - possibility > to choose dynamically the output topic for topology, but in some cases it > could be useful to chose several topics withing the same cluster. > Personally me - I met such case: I needed to route message based on its > content and by routes configuration to several topics. > I've made a 'proposal' PR for this, unfortunately I couldn't find better way > to implement this: > [https://github.com/apache/kafka/pull/5801] > If this approach is OK, and improvement could be done in future versions, > please let me know and I'll finish PR code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7646) Flaky test SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-7646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690174#comment-16690174 ] Dong Lin commented on KAFKA-7646: - This issue is not captured by the automatic test runs in https://builds.apache.org/job/kafka-2.0-jdk8. It is hard to debug this without having stacktrace. Given that the test is related to Sasal and it passes most of the time, for the same reason as explained in https://issues.apache.org/jira/browse/KAFKA-7651, this does not appear to be a blocking issue for 2.1.0 release. > Flaky test > SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe > --- > > Key: KAFKA-7646 > URL: https://issues.apache.org/jira/browse/KAFKA-7646 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > This test is reported to fail by [~enothereska] as part of 2.1.0 RC0 release > certification. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690175#comment-16690175 ] Guozhang Wang commented on KAFKA-7577: -- Hi [~dthomas-trimble], any luck trying to reproduce the issue as a unit test with TopologyTestDriver so that we can further investigate? > Semantics of Table-Table Join with Null Message Are Incorrect > - > > Key: KAFKA-7577 > URL: https://issues.apache.org/jira/browse/KAFKA-7577 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Daren Thomas >Priority: Major > > Observed behavior of Table-Table join with a Null Message does not match the > semantics described in the documentation > ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).] > The expectation is: > * Message A results in [A, null] from the Left Join > * Message null (tombstone) results in null (tombstone) from the Left Join > The observed behavior was that the null (tombstone) message did not pass > through the Left Join to the output topic like expected. This behavior was > observed with and without caching enabled, against a test harness, and > against a local Confluent 5.0.0 platform. It was also observed that the > KTableKTableLeftJoinProcessor.process() function was not called for the null > (tombstone) message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
[ https://issues.apache.org/jira/browse/KAFKA-7651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7651: Issue Type: Sub-task (was: Task) Parent: KAFKA-7645 > Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts > --- > > Key: KAFKA-7651 > URL: https://issues.apache.org/jira/browse/KAFKA-7651 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > Here is stacktrace from > https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/ > {code} > Error Message > java.lang.AssertionError: Expected an exception of type > org.apache.kafka.common.errors.TimeoutException; got type > org.apache.kafka.common.errors.SslAuthenticationException > Stacktrace > java.lang.AssertionError: Expected an exception of type > org.apache.kafka.common.errors.TimeoutException; got type > org.apache.kafka.common.errors.SslAuthenticationException > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at > kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404) > at > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
[ https://issues.apache.org/jira/browse/KAFKA-7651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690173#comment-16690173 ] Dong Lin commented on KAFKA-7651: - The test failure is related to SSL handshake. In general SSL handshake is a stateful operation without timeout. Thus it is understandable the SSL logic in the test may be flaky if the GC pause is long or there is ephemeral network issue in the test. There are same test failure for 2.0 branch in https://builds.apache.org/job/kafka-2.0-jdk8/183/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts. Since user has been running Kafka 2.0.0 well without major issues, the test failure here should not be a blocking issue for 2.1.0 release. > Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts > --- > > Key: KAFKA-7651 > URL: https://issues.apache.org/jira/browse/KAFKA-7651 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > Here is stacktrace from > https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/ > {code} > Error Message > java.lang.AssertionError: Expected an exception of type > org.apache.kafka.common.errors.TimeoutException; got type > org.apache.kafka.common.errors.SslAuthenticationException > Stacktrace > java.lang.AssertionError: Expected an exception of type > org.apache.kafka.common.errors.TimeoutException; got type > org.apache.kafka.common.errors.SslAuthenticationException > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at > kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404) > at > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
[ https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7312: Comment: was deleted (was: Here is another stacktrace from https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/ {code} Error Message java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException Stacktrace java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404) at kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} ) > Transient failure in > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts > > > Key: KAFKA-7312 > URL: https://issues.apache.org/jira/browse/KAFKA-7312 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Guozhang Wang >Priority: Major > > {code} > Error Message > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > Stacktrace > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) > at > kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1345) > at > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
[ https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7312: Issue Type: Bug (was: Sub-task) Parent: (was: KAFKA-7645) > Transient failure in > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts > > > Key: KAFKA-7312 > URL: https://issues.apache.org/jira/browse/KAFKA-7312 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Guozhang Wang >Priority: Major > > {code} > Error Message > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > Stacktrace > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) > at > kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1345) > at > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
Dong Lin created KAFKA-7651: --- Summary: Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts Key: KAFKA-7651 URL: https://issues.apache.org/jira/browse/KAFKA-7651 Project: Kafka Issue Type: Task Reporter: Dong Lin Here is stacktrace from https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/ {code} Error Message java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException Stacktrace java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404) at kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
[ https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690150#comment-16690150 ] Dong Lin edited comment on KAFKA-7312 at 11/17/18 12:36 AM: Here is another stacktrace from https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/ {code} Error Message java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException Stacktrace java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404) at kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} was (Author: lindong): Here is another stacktrace: {code} Error Message java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException Stacktrace java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404) at kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} > Transient failure in > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts > > > Key: KAFKA-7312 > URL: https://issues.apache.org/jira/browse/KAFKA-7312 > Project: Kafka > Issue Type: Sub-task > Components: unit tests >Reporter: Guozhang Wang >Priority: Major > > {code} > Error Message > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > Stacktrace > org.junit.runners.model.TestTimedOutException: test timed out after 12 >
[jira] [Commented] (KAFKA-7649) Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690168#comment-16690168 ] Dong Lin commented on KAFKA-7649: - The there is error in the log that says "No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/kafka7346315539242944484.tmp'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn:1011)". The source code confirms that broker will fail to start with "java.lang.SecurityException: zookeeper.set.acl is true, but the verification of the JAAS login file failed." if broker can not find JAAS configuration file. So the question is why broker fails to find the JAAS configuration file even though "startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))" in SaslEndToEndAuthorizationTest.setUp() should have created the JAAS configuration file. I could not find the root cause yet. Since this happens rarely in the integration test and this issue is related to the existing of a configuration file during broker initialization. My guess is that the bug is related to the test setup, or maybe the temporary file `'/tmp/kafka7346315539242944484.tmp` is somehow cleaned up by the test machine. Though I am not 100% sure, my opinion is that this is not a blocking issue for 2.1.0 release. > Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl > > > Key: KAFKA-7649 > URL: https://issues.apache.org/jira/browse/KAFKA-7649 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > Observed in > https://builds.apache.org/job/kafka-2.1-jdk8/49/testReport/junit/kafka.api/SslEndToEndAuthorizationTest/testNoProduceWithDescribeAcl/ > {code} > Error Message > java.lang.SecurityException: zookeeper.set.acl is true, but the verification > of the JAAS login file failed. > Stacktrace > java.lang.SecurityException: zookeeper.set.acl is true, but the verification > of the JAAS login file failed. > at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:361) > at kafka.server.KafkaServer.startup(KafkaServer.scala:202) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:135) > at > kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:101) > at > kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:100) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.integration.KafkaServerTestHarness.setUp(KafkaServerTestHarness.scala:100) > at > kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:81) > at > kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:73) > at > kafka.api.EndToEndAuthorizationTest.setUp(EndToEndAuthorizationTest.scala:180) > at > kafka.api.SslEndToEndAuthorizationTest.setUp(SslEndToEndAuthorizationTest.scala:72) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at >
[jira] [Commented] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()
[ https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690159#comment-16690159 ] Eugen Feller commented on KAFKA-7634: - Sure, let me try what I can do. Code looks like this: {code:java} val table = bldr .table[Key, Value1]( keySerde, valueSerde1, topicA, stateStoreName ) val stream1 = bldr .stream[Key, Value2]( keySerde, valueSerde2, topicB ) .filterNot((k: Key, s: Value2) => s == null) val enrichedStream = stream1 .leftJoin[Value1, Value3]( table, joiner, keySerde, valueSerde2 ) val explodedStream = bldr .stream[Mac, Value4]( keySerde, valueSerde4, topicC ) .flatMapValues[Value3]() val mergedStream = bldr.merge[Key, Value3](enrichedStream, explodedStream) mergedStream.transform[Key, Value3](transformer).to(keySerde, valueSerde3, outputTopic){code} > Punctuate not being called with merge() and/or outerJoin() > -- > > Key: KAFKA-7634 > URL: https://issues.apache.org/jira/browse/KAFKA-7634 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.3 >Reporter: Eugen Feller >Priority: Major > > Hi all, > I am using the Processor API and having trouble to get Kafka streams > v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). > Specifically, I am having a topology where I am doing flatMapValues() -> > merge() and/or outerJoin -> transform(). If I dont call merge() and/or > outerJoin() before transform(), punctuate is being called as expected. > Thank you very much in advance for your help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7402) Kafka Streams should implement AutoCloseable where appropriate
[ https://issues.apache.org/jira/browse/KAFKA-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690153#comment-16690153 ] ASF GitHub Bot commented on KAFKA-7402: --- cmccabe closed pull request #5839: KAFKA-7402: Kafka Streams should implement AutoCloseable where approp… URL: https://github.com/apache/kafka/pull/5839 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java index 763fe512217..6af47058e4e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java @@ -40,7 +40,7 @@ * * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface ConsumerInterceptor extends Configurable { +public interface ConsumerInterceptor extends Configurable, AutoCloseable { /** * This is called just before the records are returned by diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index d9830877ba7..fa8bab53e72 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -932,7 +932,7 @@ public double measure(MetricConfig config, long now) { } } -private class HeartbeatThread extends KafkaThread { +private class HeartbeatThread extends KafkaThread implements AutoCloseable { private boolean enabled = false; private boolean closed = false; private AtomicReference failed = new AtomicReference<>(null); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java index 995bdaa6dce..55d6b25a411 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java @@ -25,7 +25,7 @@ * * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface MetricsReporter extends Configurable { +public interface MetricsReporter extends Configurable, AutoCloseable { /** * This is called when the reporter is first registered to initially register all existing metrics diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 47b137512ce..3bca276bc73 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -63,7 +63,7 @@ * to memory pressure or other reasons * */ -public class KafkaChannel { +public class KafkaChannel implements AutoCloseable { private static final long MIN_REAUTH_INTERVAL_ONE_SECOND_NANOS = 1000 * 1000 * 1000; /** diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index b960fcbd942..843d46dc736 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -983,7 +983,7 @@ public int numStagedReceives(KafkaChannel channel) { return deque == null ? 0 : deque.size(); } -private class SelectorMetrics { +private class SelectorMetrics implements AutoCloseable { private final Metrics metrics; private final String metricGrpPrefix; private final Map metricTags; diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 0cc2cec31f9..7512c8273da 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -37,7 +37,7 @@ * and the builder is closed (e.g. the Producer), it's important to call `closeForRecordAppends` when the
[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction
[ https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690155#comment-16690155 ] Guozhang Wang commented on KAFKA-7531: -- [~spuzon] I cannot recall there is a config named `session.timeout.ms` on the broker side, there are only `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, and the other is `zookeeper.session.timeout.ms` which should be irrelevant. I looked at the source code carefully around the place you pointed out (TransactionCoordinator.scala:393 - 398) but cannot find any obvious link, if you can edit the code (seem you can since you are running out of source code) could you add some log entry around this piece and see which object actually throws the NPE? > NPE NullPointerException at TransactionCoordinator handleEndTransaction > --- > > Key: KAFKA-7531 > URL: https://issues.apache.org/jira/browse/KAFKA-7531 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.0.0 >Reporter: Sebastian Puzoń >Priority: Critical > Fix For: 2.1.1, 2.0.2 > > > Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper. > Streams Application 4 instances, each has 5 Streams threads, total 20 stream > threads. > I observe NPE NullPointerException at coordinator broker which causes all > application stream threads shutdown, here's stack from broker: > {code:java} > [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member > elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe > in group elo > g_agg has failed, removing it from the group > (kafka.coordinator.group.GroupCoordinator) > [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance > group elog_agg with old generation 49 (__consumer_offsets-21) > (kafka.coordinator.gro > up.GroupCoordinator) > [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group > elog_agg generation 50 (__consumer_offsets-21) > (kafka.coordinator.group.GroupCoordinator) > [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from > leader for group elog_agg for generation 50 > (kafka.coordinator.group.GroupCoordina > tor) > [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized > transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on > partition _ > _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator) > [ > [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request > {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr > ue} (kafka.server.KafkaApis) > java.lang.NullPointerException > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383) > at scala.util.Either$RightProjection.flatMap(Either.scala:702) > at > kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) > at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110) > at > kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) > at > kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590) >
[jira] [Commented] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
[ https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690150#comment-16690150 ] Dong Lin commented on KAFKA-7312: - Here is another stacktrace: {code} Error Message java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException Stacktrace java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404) at kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} > Transient failure in > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts > > > Key: KAFKA-7312 > URL: https://issues.apache.org/jira/browse/KAFKA-7312 > Project: Kafka > Issue Type: Sub-task > Components: unit tests >Reporter: Guozhang Wang >Priority: Major > > {code} > Error Message > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > Stacktrace > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) > at > kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1345) > at > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7650) make "auto.create.topics.enable" dynamically configurable.
xiongqi wu created KAFKA-7650: - Summary: make "auto.create.topics.enable" dynamically configurable. Key: KAFKA-7650 URL: https://issues.apache.org/jira/browse/KAFKA-7650 Project: Kafka Issue Type: Improvement Reporter: xiongqi wu Assignee: xiongqi wu There are several use cases that we want to make "auto.create.topics.enable" can be dynamically configured. For example: 1) wild card consumer can recreate deleted topics 2) We also see misconfigured consumer that consumes from wrong clusters ends up with creating a lot of zombie topics in target cluster. In such cases, we may want to temporarily disable "auto.create.topics.enable", and re-enable topic creation later after problem is solved without restarting brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests
[ https://issues.apache.org/jira/browse/KAFKA-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690140#comment-16690140 ] Dong Lin commented on KAFKA-7648: - Currently TestUtils.createTopic(...) will re-send znode creation request to zookeeper service if the previous response shows Code.CONNECTIONLOSS. See KafkaZkClient.retryRequestsUntilConnected() for related logic. This means that the test will fail if the zookeeper has created znode upon the first request, the response to the first request is lost or timed-out, the second request is sent, and the response of the second request shows Code.NODEEXISTS. In order to fix this flaky test, we probably should implement some logic similar to KafkaZkClient.CheckedEphemeral() to check whether the znode has been created in the with the same session id after receiving Code.NODEEXISTS. > Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests > --- > > Key: KAFKA-7648 > URL: https://issues.apache.org/jira/browse/KAFKA-7648 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > Observed in > [https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/] > > {code} > Error Message > org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already > exists. > h3. Stacktrace > org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already > exists. > h3. Standard Output > [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition topic-3-3 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR > [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition > topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client > session timed out, have not heard from server in 4000ms for sessionid > 0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 > 17:53:14,806] WARN Unable to read additional data from client sessionid > 0x10051eebf480003, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] > WARN Client session timed out, have not heard from server in 4002ms for > sessionid 0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) > [2018-11-07 17:53:14,807] WARN Unable to read additional data from client > sessionid 0x10051eebf480002, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] > WARN Client session timed out, have not heard from server in 4002ms for > sessionid 0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) > [2018-11-07 17:53:14,824] WARN Unable to read additional data from client > sessionid 0x10051eebf480001, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] > WARN Client session timed out, have not heard from server in 4002ms for > sessionid 0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) > [2018-11-07 17:53:15,423] WARN Unable to read additional data from client > sessionid 0x10051eebf48, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] > WARN fsync-ing the write ahead log in SyncThread:0 took 4456ms which will > adversely effect operation latency. See the ZooKeeper troubleshooting guide > (org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 > 17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] > Error for partition topic-4-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:23,087] ERROR > [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition > invalid-timeout-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:23,088] ERROR > [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition > invalid-timeout-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:23,137] ERROR > [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition > invalid-timeout-0 at offset 0
[jira] [Comment Edited] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests
[ https://issues.apache.org/jira/browse/KAFKA-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690140#comment-16690140 ] Dong Lin edited comment on KAFKA-7648 at 11/16/18 11:47 PM: Currently TestUtils.createTopic(...) will re-send znode creation request to zookeeper service if the previous response shows Code.CONNECTIONLOSS. See KafkaZkClient.retryRequestsUntilConnected() for related logic. This means that the test will fail if the zookeeper has created znode upon the first request, the response to the first request is lost or timed-out, the second request is sent, and the response of the second request shows Code.NODEEXISTS. In order to fix this flaky test, we probably should implement some logic similar to KafkaZkClient.CheckedEphemeral() to check whether the znode has been created in the with the same session id after receiving Code.NODEEXISTS. Given the above understanding and the fact that the test passes with high probability, this flaky test does not indicate bug and should not be a blocking issue for 2.1.0 release. was (Author: lindong): Currently TestUtils.createTopic(...) will re-send znode creation request to zookeeper service if the previous response shows Code.CONNECTIONLOSS. See KafkaZkClient.retryRequestsUntilConnected() for related logic. This means that the test will fail if the zookeeper has created znode upon the first request, the response to the first request is lost or timed-out, the second request is sent, and the response of the second request shows Code.NODEEXISTS. In order to fix this flaky test, we probably should implement some logic similar to KafkaZkClient.CheckedEphemeral() to check whether the znode has been created in the with the same session id after receiving Code.NODEEXISTS. > Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests > --- > > Key: KAFKA-7648 > URL: https://issues.apache.org/jira/browse/KAFKA-7648 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > Observed in > [https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/] > > {code} > Error Message > org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already > exists. > h3. Stacktrace > org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already > exists. > h3. Standard Output > [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition topic-3-3 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR > [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition > topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client > session timed out, have not heard from server in 4000ms for sessionid > 0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 > 17:53:14,806] WARN Unable to read additional data from client sessionid > 0x10051eebf480003, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] > WARN Client session timed out, have not heard from server in 4002ms for > sessionid 0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) > [2018-11-07 17:53:14,807] WARN Unable to read additional data from client > sessionid 0x10051eebf480002, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] > WARN Client session timed out, have not heard from server in 4002ms for > sessionid 0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) > [2018-11-07 17:53:14,824] WARN Unable to read additional data from client > sessionid 0x10051eebf480001, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] > WARN Client session timed out, have not heard from server in 4002ms for > sessionid 0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) > [2018-11-07 17:53:15,423] WARN Unable to read additional data from client > sessionid 0x10051eebf48, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] > WARN fsync-ing the write ahead log in SyncThread:0 took 4456ms which will > adversely effect operation latency. See the ZooKeeper troubleshooting guide > (org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 > 17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0]
[jira] [Updated] (KAFKA-7622) Add findSessions functionality to ReadOnlySessionStore
[ https://issues.apache.org/jira/browse/KAFKA-7622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-7622: - Labels: needs-kip user-experience (was: needs-kip) > Add findSessions functionality to ReadOnlySessionStore > -- > > Key: KAFKA-7622 > URL: https://issues.apache.org/jira/browse/KAFKA-7622 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Di Campo >Priority: Major > Labels: needs-kip, user-experience > > When creating a session store from the DSL, and you get a > {{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in > a {{SessionStore}}, even if the key type is a {{Windowed}}. So you would > have to iterate through it to find the time-related entries, which should be > less efficient than querying by time. > So the purpose of this ticket is to be able to query the store with (key, > time). > Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. > time-bound access) to {{ReadOnlySessionStore.}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690138#comment-16690138 ] Guozhang Wang commented on KAFKA-7628: -- Hmm.. in this case the state should not be transit to `NOT_RUNNING` as the stream threads are not fully joined. In the latest version we've slightly improved the logic with a shutdown thread whose logic is basically: {code} for (final StreamThread thread : threads) { try { if (!thread.isRunning()) { thread.join(); } } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); } } if (globalStreamThread != null) { globalStreamThread.setStateListener(null); globalStreamThread.shutdown(); } if (globalStreamThread != null && !globalStreamThread.stillRunning()) { try { globalStreamThread.join(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } globalStreamThread = null; } adminClient.close(); metrics.close(); setState(State.NOT_RUNNING); {code} If the stream-threads have not all joined, it should not proceed to `setState(State.NOT_RUNNING)`. > KafkaStream is not closing > -- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 > Environment: Macbook Pro >Reporter: Ozgur >Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0
[jira] [Comment Edited] (KAFKA-7486) Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`
[ https://issues.apache.org/jira/browse/KAFKA-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690115#comment-16690115 ] Dong Lin edited comment on KAFKA-7486 at 11/16/18 11:13 PM: The test calls `AdminUtils.addPartitions` to add partition to the topic after the async topic deletion is triggered. It assumes that the topic deletion is not completed when `AdminUtils.addPartitions` is executed but this is not guaranteed. This is the root cause of the test failure. So it does not indicate any bug in the code and thus this is not a blocking issue for 2.1.0 release. was (Author: lindong): The test calls `AdminUtils.addPartitions` to add partition to the topic after the async topic deletion is triggered. It assumes that the topic deletion is not completed when `AdminUtils.addPartitions` is executed but this is not guaranteed. This is the root cause of the test failure. So it does not indicate any bug in the code and thus this is not a blocking issue for 2.1 release. > Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic` > -- > > Key: KAFKA-7486 > URL: https://issues.apache.org/jira/browse/KAFKA-7486 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Priority: Major > > Starting to see more of this recently: > {code} > 10:06:28 kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic > FAILED > 10:06:28 kafka.admin.AdminOperationException: > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/topics/test > 10:06:28 at > kafka.zk.AdminZkClient.writeTopicPartitionAssignment(AdminZkClient.scala:162) > 10:06:28 at > kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:102) > 10:06:28 at > kafka.zk.AdminZkClient.addPartitions(AdminZkClient.scala:229) > 10:06:28 at > kafka.admin.DeleteTopicTest.testAddPartitionDuringDeleteTopic(DeleteTopicTest.scala:266) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7446) Better error message to explain the upper limit of TimeWindow
[ https://issues.apache.org/jira/browse/KAFKA-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690134#comment-16690134 ] Guozhang Wang commented on KAFKA-7446: -- [~mrsrinivas] Please feel free to submit the PR. > Better error message to explain the upper limit of TimeWindow > - > > Key: KAFKA-7446 > URL: https://issues.apache.org/jira/browse/KAFKA-7446 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Jacek Laskowski >Assignee: Srinivas Reddy >Priority: Trivial > Labels: newbie++ > > The following code throws a {{IllegalArgumentException}}. > {code:java} > import org.apache.kafka.streams.kstream.TimeWindows > import scala.concurrent.duration._ > val timeWindow = TimeWindows > .of(1.minute.toMillis) > .advanceBy(2.minutes.toMillis) > {code} > The exception is as follows and it's not clear why {{6}} is the upper > limit (not to mention that {{AdvanceMs}} with the uppercase {{A}} did also > confuse me). > {code:java} > java.lang.IllegalArgumentException: AdvanceMs must lie within interval (0, > 6]. > at > org.apache.kafka.streams.kstream.TimeWindows.advanceBy(TimeWindows.java:100) > ... 44 elided{code} > I think that the message should be more developer-friendly and explain the > boundaries, perhaps with an example (and a link to docs)? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7541) Transient Failure: kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable
[ https://issues.apache.org/jira/browse/KAFKA-7541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690130#comment-16690130 ] Dong Lin commented on KAFKA-7541: - According to the source code DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(), the test will fail with the exception above if leader election is not completed within 15 seconds. Thus the test may fail if there is long GC. We can reduce the chance of the test failure by increasing the wait time. Given the above understanding and the fact that the test passes with high probability, this flaky test does not indicate bug and should not be a blocking issue for 2.1.0 release. > Transient Failure: > kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable > > > Key: KAFKA-7541 > URL: https://issues.apache.org/jira/browse/KAFKA-7541 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > > Observed on Java 11: > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/264/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/] > > Stacktrace: > {noformat} > java.lang.AssertionError: Unclean leader not elected > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at > kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:487) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at >
[jira] [Commented] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate
[ https://issues.apache.org/jira/browse/KAFKA-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690118#comment-16690118 ] Guozhang Wang commented on KAFKA-6567: -- [~Samuel Hawker] Sure, go ahead and submit the PR :) > KStreamWindowReduce can be replaced by KStreamWindowAggregate > - > > Key: KAFKA-6567 > URL: https://issues.apache.org/jira/browse/KAFKA-6567 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Yaswanth Kumar >Priority: Major > Labels: newbie > > This is a tech debt worth cleaning up: KStreamWindowReduce should be able to > be replaced by KStreamWindowAggregate. In fact, we have already done this for > session windows, where in {{SessionWindowedKStreamImpl}} we use > {{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}} > to replace reducer with aggregator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()
[ https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690116#comment-16690116 ] Guozhang Wang commented on KAFKA-7634: -- Could you share your code snippet around the punctuation calls? > Punctuate not being called with merge() and/or outerJoin() > -- > > Key: KAFKA-7634 > URL: https://issues.apache.org/jira/browse/KAFKA-7634 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.3 >Reporter: Eugen Feller >Priority: Major > > Hi all, > I am using the Processor API and having trouble to get Kafka streams > v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). > Specifically, I am having a topology where I am doing flatMapValues() -> > merge() and/or outerJoin -> transform(). If I dont call merge() and/or > outerJoin() before transform(), punctuate is being called as expected. > Thank you very much in advance for your help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7486) Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`
[ https://issues.apache.org/jira/browse/KAFKA-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690115#comment-16690115 ] Dong Lin commented on KAFKA-7486: - The test calls `AdminUtils.addPartitions` to add partition to the topic after the async topic deletion is triggered. It assumes that the topic deletion is not completed when `AdminUtils.addPartitions` is executed but this is not guaranteed. This is the root cause of the test failure. So it does not indicate any bug in the code and thus this is not a blocking issue for 2.1 release. > Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic` > -- > > Key: KAFKA-7486 > URL: https://issues.apache.org/jira/browse/KAFKA-7486 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Priority: Major > > Starting to see more of this recently: > {code} > 10:06:28 kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic > FAILED > 10:06:28 kafka.admin.AdminOperationException: > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/topics/test > 10:06:28 at > kafka.zk.AdminZkClient.writeTopicPartitionAssignment(AdminZkClient.scala:162) > 10:06:28 at > kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:102) > 10:06:28 at > kafka.zk.AdminZkClient.addPartitions(AdminZkClient.scala:229) > 10:06:28 at > kafka.admin.DeleteTopicTest.testAddPartitionDuringDeleteTopic(DeleteTopicTest.scala:266) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7541) Transient Failure: kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable
[ https://issues.apache.org/jira/browse/KAFKA-7541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7541: Issue Type: Sub-task (was: Bug) Parent: KAFKA-7645 > Transient Failure: > kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable > > > Key: KAFKA-7541 > URL: https://issues.apache.org/jira/browse/KAFKA-7541 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > > Observed on Java 11: > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/264/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/] > > Stacktrace: > {noformat} > java.lang.AssertionError: Unclean leader not elected > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at > kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:487) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at >
[jira] [Created] (KAFKA-7646) Flaky test SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe
Dong Lin created KAFKA-7646: --- Summary: Flaky test SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe Key: KAFKA-7646 URL: https://issues.apache.org/jira/browse/KAFKA-7646 Project: Kafka Issue Type: Task Reporter: Dong Lin This test is reported to fail by [~enothereska] as part of 2.1.0 RC0 release certification. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7649) Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7649: Issue Type: Sub-task (was: Task) Parent: KAFKA-7645 > Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl > > > Key: KAFKA-7649 > URL: https://issues.apache.org/jira/browse/KAFKA-7649 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > Observed in > https://builds.apache.org/job/kafka-2.1-jdk8/49/testReport/junit/kafka.api/SslEndToEndAuthorizationTest/testNoProduceWithDescribeAcl/ > {code} > Error Message > java.lang.SecurityException: zookeeper.set.acl is true, but the verification > of the JAAS login file failed. > Stacktrace > java.lang.SecurityException: zookeeper.set.acl is true, but the verification > of the JAAS login file failed. > at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:361) > at kafka.server.KafkaServer.startup(KafkaServer.scala:202) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:135) > at > kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:101) > at > kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:100) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.integration.KafkaServerTestHarness.setUp(KafkaServerTestHarness.scala:100) > at > kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:81) > at > kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:73) > at > kafka.api.EndToEndAuthorizationTest.setUp(EndToEndAuthorizationTest.scala:180) > at > kafka.api.SslEndToEndAuthorizationTest.setUp(SslEndToEndAuthorizationTest.scala:72) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) >
[jira] [Created] (KAFKA-7649) Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl
Dong Lin created KAFKA-7649: --- Summary: Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl Key: KAFKA-7649 URL: https://issues.apache.org/jira/browse/KAFKA-7649 Project: Kafka Issue Type: Task Reporter: Dong Lin Observed in https://builds.apache.org/job/kafka-2.1-jdk8/49/testReport/junit/kafka.api/SslEndToEndAuthorizationTest/testNoProduceWithDescribeAcl/ {code} Error Message java.lang.SecurityException: zookeeper.set.acl is true, but the verification of the JAAS login file failed. Stacktrace java.lang.SecurityException: zookeeper.set.acl is true, but the verification of the JAAS login file failed. at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:361) at kafka.server.KafkaServer.startup(KafkaServer.scala:202) at kafka.utils.TestUtils$.createServer(TestUtils.scala:135) at kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:101) at kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:100) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.integration.KafkaServerTestHarness.setUp(KafkaServerTestHarness.scala:100) at kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:81) at kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:73) at kafka.api.EndToEndAuthorizationTest.setUp(EndToEndAuthorizationTest.scala:180) at kafka.api.SslEndToEndAuthorizationTest.setUp(SslEndToEndAuthorizationTest.scala:72) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at
[jira] [Updated] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests
[ https://issues.apache.org/jira/browse/KAFKA-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7648: Description: Observed in [https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/] {code} Error Message org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already exists. h3. Stacktrace org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already exists. h3. Standard Output [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition topic-3-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client session timed out, have not heard from server in 4000ms for sessionid 0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 17:53:14,806] WARN Unable to read additional data from client sessionid 0x10051eebf480003, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] WARN Client session timed out, have not heard from server in 4002ms for sessionid 0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 17:53:14,807] WARN Unable to read additional data from client sessionid 0x10051eebf480002, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] WARN Client session timed out, have not heard from server in 4002ms for sessionid 0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 17:53:14,824] WARN Unable to read additional data from client sessionid 0x10051eebf480001, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] WARN Client session timed out, have not heard from server in 4002ms for sessionid 0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 17:53:15,423] WARN Unable to read additional data from client sessionid 0x10051eebf48, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] WARN fsync-ing the write ahead log in SyncThread:0 took 4456ms which will adversely effect operation latency. See the ZooKeeper troubleshooting guide (org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition topic-4-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:23,087] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition invalid-timeout-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:23,088] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition invalid-timeout-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:23,137] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition invalid-timeout-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. {code} was: Observed in [https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/] h3. Error Message org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already exists. h3. Stacktrace org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already exists. h3. Standard Output [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition topic-3-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client session timed out, have not heard
[jira] [Updated] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
[ https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7312: Issue Type: Sub-task (was: Improvement) Parent: KAFKA-7645 > Transient failure in > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts > > > Key: KAFKA-7312 > URL: https://issues.apache.org/jira/browse/KAFKA-7312 > Project: Kafka > Issue Type: Sub-task > Components: unit tests >Reporter: Guozhang Wang >Priority: Major > > {code} > Error Message > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > Stacktrace > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) > at > kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1345) > at > kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests
Dong Lin created KAFKA-7648: --- Summary: Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests Key: KAFKA-7648 URL: https://issues.apache.org/jira/browse/KAFKA-7648 Project: Kafka Issue Type: Task Reporter: Dong Lin Observed in [https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/] h3. Error Message org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already exists. h3. Stacktrace org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already exists. h3. Standard Output [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition topic-3-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client session timed out, have not heard from server in 4000ms for sessionid 0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 17:53:14,806] WARN Unable to read additional data from client sessionid 0x10051eebf480003, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] WARN Client session timed out, have not heard from server in 4002ms for sessionid 0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 17:53:14,807] WARN Unable to read additional data from client sessionid 0x10051eebf480002, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] WARN Client session timed out, have not heard from server in 4002ms for sessionid 0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 17:53:14,824] WARN Unable to read additional data from client sessionid 0x10051eebf480001, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] WARN Client session timed out, have not heard from server in 4002ms for sessionid 0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 17:53:15,423] WARN Unable to read additional data from client sessionid 0x10051eebf48, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] WARN fsync-ing the write ahead log in SyncThread:0 took 4456ms which will adversely effect operation latency. See the ZooKeeper troubleshooting guide (org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition topic-4-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:23,087] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition invalid-timeout-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:23,088] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition invalid-timeout-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2018-11-07 17:53:23,137] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition invalid-timeout-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests
[ https://issues.apache.org/jira/browse/KAFKA-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7648: Issue Type: Sub-task (was: Task) Parent: KAFKA-7645 > Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests > --- > > Key: KAFKA-7648 > URL: https://issues.apache.org/jira/browse/KAFKA-7648 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > Observed in > [https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/] > > h3. Error Message > org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already > exists. > h3. Stacktrace > org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already > exists. > h3. Standard Output > [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition topic-3-3 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR > [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition > topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client > session timed out, have not heard from server in 4000ms for sessionid > 0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 > 17:53:14,806] WARN Unable to read additional data from client sessionid > 0x10051eebf480003, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] > WARN Client session timed out, have not heard from server in 4002ms for > sessionid 0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) > [2018-11-07 17:53:14,807] WARN Unable to read additional data from client > sessionid 0x10051eebf480002, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] > WARN Client session timed out, have not heard from server in 4002ms for > sessionid 0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) > [2018-11-07 17:53:14,824] WARN Unable to read additional data from client > sessionid 0x10051eebf480001, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] > WARN Client session timed out, have not heard from server in 4002ms for > sessionid 0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) > [2018-11-07 17:53:15,423] WARN Unable to read additional data from client > sessionid 0x10051eebf48, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] > WARN fsync-ing the write ahead log in SyncThread:0 took 4456ms which will > adversely effect operation latency. See the ZooKeeper troubleshooting guide > (org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 > 17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] > Error for partition topic-4-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:23,087] ERROR > [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition > invalid-timeout-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:23,088] ERROR > [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition > invalid-timeout-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. [2018-11-07 17:53:23,137] ERROR > [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition > invalid-timeout-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7647) Flaky test LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic
[ https://issues.apache.org/jira/browse/KAFKA-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7647: Issue Type: Sub-task (was: Task) Parent: KAFKA-7645 > Flaky test > LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic > - > > Key: KAFKA-7647 > URL: https://issues.apache.org/jira/browse/KAFKA-7647 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > kafka.log.LogCleanerParameterizedIntegrationTest > > testCleansCombinedCompactAndDeleteTopic[3] FAILED > java.lang.AssertionError: Contents of the map shouldn't change > expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> > (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), > 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 -> > (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 -> > (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but > was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354), > 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 -> > (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 -> > (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 -> > (299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> > (355,355))> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:118) > at > kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7647) Flaky test LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic
Dong Lin created KAFKA-7647: --- Summary: Flaky test LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic Key: KAFKA-7647 URL: https://issues.apache.org/jira/browse/KAFKA-7647 Project: Kafka Issue Type: Task Reporter: Dong Lin kafka.log.LogCleanerParameterizedIntegrationTest > testCleansCombinedCompactAndDeleteTopic[3] FAILED java.lang.AssertionError: Contents of the map shouldn't change expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 -> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 -> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 -> (299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:118) at kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7646) Flaky test SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-7646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7646: Issue Type: Sub-task (was: Task) Parent: KAFKA-7645 > Flaky test > SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe > --- > > Key: KAFKA-7646 > URL: https://issues.apache.org/jira/browse/KAFKA-7646 > Project: Kafka > Issue Type: Sub-task >Reporter: Dong Lin >Priority: Major > > This test is reported to fail by [~enothereska] as part of 2.1.0 RC0 release > certification. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7486) Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`
[ https://issues.apache.org/jira/browse/KAFKA-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7486: Issue Type: Sub-task (was: Bug) Parent: KAFKA-7645 > Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic` > -- > > Key: KAFKA-7486 > URL: https://issues.apache.org/jira/browse/KAFKA-7486 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Priority: Major > > Starting to see more of this recently: > {code} > 10:06:28 kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic > FAILED > 10:06:28 kafka.admin.AdminOperationException: > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/topics/test > 10:06:28 at > kafka.zk.AdminZkClient.writeTopicPartitionAssignment(AdminZkClient.scala:162) > 10:06:28 at > kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:102) > 10:06:28 at > kafka.zk.AdminZkClient.addPartitions(AdminZkClient.scala:229) > 10:06:28 at > kafka.admin.DeleteTopicTest.testAddPartitionDuringDeleteTopic(DeleteTopicTest.scala:266) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7645) Fix flaky unit test for 2.1 branch
Dong Lin created KAFKA-7645: --- Summary: Fix flaky unit test for 2.1 branch Key: KAFKA-7645 URL: https://issues.apache.org/jira/browse/KAFKA-7645 Project: Kafka Issue Type: Task Reporter: Dong Lin Assignee: Dong Lin -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690102#comment-16690102 ] Boyang Chen edited comment on KAFKA-7641 at 11/16/18 10:50 PM: --- [~enether] Thanks! I think we need to propose two KIPs here: 1. require member id during new member join group request 2. enforce group.max.size. I could take this one as a practice for going through the consumer protocol change, do you want to work on #2 here? was (Author: bchen225242): [~enether] Thanks! I think we need to propose two KIPs here: 1. require member id during new member join group request 2. enforce group.max.size. I could take this one as a practice for going through the consumer protocol change, do you want to work on 2. here? > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690102#comment-16690102 ] Boyang Chen commented on KAFKA-7641: [~enether] Thanks! I think we need to propose two KIPs here: 1. require member id during new member join group request 2. enforce group.max.size. I could take this one as a practice for going through the consumer protocol change, do you want to work on 2. here? > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7486) Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`
[ https://issues.apache.org/jira/browse/KAFKA-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690099#comment-16690099 ] Dong Lin commented on KAFKA-7486: - Hey [~chia7712], I think [~hachikuji] probably missed your message. I am sure [~hachikuji] (and myself) is happy for your to help take this JIRA. > Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic` > -- > > Key: KAFKA-7486 > URL: https://issues.apache.org/jira/browse/KAFKA-7486 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > Starting to see more of this recently: > {code} > 10:06:28 kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic > FAILED > 10:06:28 kafka.admin.AdminOperationException: > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/topics/test > 10:06:28 at > kafka.zk.AdminZkClient.writeTopicPartitionAssignment(AdminZkClient.scala:162) > 10:06:28 at > kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:102) > 10:06:28 at > kafka.zk.AdminZkClient.addPartitions(AdminZkClient.scala:229) > 10:06:28 at > kafka.admin.DeleteTopicTest.testAddPartitionDuringDeleteTopic(DeleteTopicTest.scala:266) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7644) Worker Re balance enhancements
satya created KAFKA-7644: Summary: Worker Re balance enhancements Key: KAFKA-7644 URL: https://issues.apache.org/jira/browse/KAFKA-7644 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: satya Currently Kafka Connect distributed worker triggers a re balance any time there is a new connector/task is added irrespective of whether the connector added is a source connector or sink connector. My understanding has been the worker re balance should be identical to consumer group re balance. That said, should not source connectors be immune to the re balance ? Are we not supposed to use source connectors with distributed workers ? It does appear to me there is some caveat in the way the worker re balance is working and it needs enhancement to not trigger unwanted re balances causing restarts of all tasks etc. Kafka connectors should have a way to not restart and stay with existing partition assignment if the re balance trigger is related to a different connector -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()
[ https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690046#comment-16690046 ] Eugen Feller commented on KAFKA-7634: - [~guozhang] In the meantime I found that if I materialize the merged stream via through() and call transform(), it works. > Punctuate not being called with merge() and/or outerJoin() > -- > > Key: KAFKA-7634 > URL: https://issues.apache.org/jira/browse/KAFKA-7634 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.3 >Reporter: Eugen Feller >Priority: Major > > Hi all, > I am using the Processor API and having trouble to get Kafka streams > v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). > Specifically, I am having a topology where I am doing flatMapValues() -> > merge() and/or outerJoin -> transform(). If I dont call merge() and/or > outerJoin() before transform(), punctuate is being called as expected. > Thank you very much in advance for your help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7642) Kafka Connect - graceful shutdown of distributed worker
satya created KAFKA-7642: Summary: Kafka Connect - graceful shutdown of distributed worker Key: KAFKA-7642 URL: https://issues.apache.org/jira/browse/KAFKA-7642 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: satya Currently i dont find any ability to gracefully shutdown a distributed worker other than killing the process . Could you a shutdown option to the workers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5946) Give connector method parameter better name
[ https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690043#comment-16690043 ] Samuel Hawker commented on KAFKA-5946: -- Hi, I notice this has not seen any activity in many months, if [~tanvijaywant31] is not still working on it, can I pick up the ticket? :) Just a few questions - I understand the part about renaming the class to connClass, but what do you mean by standardise? In this context does it just mean extend the class with these extra fields? > Give connector method parameter better name > --- > > Key: KAFKA-5946 > URL: https://issues.apache.org/jira/browse/KAFKA-5946 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Tanvi Jaywant >Priority: Major > Labels: connector, newbie > > During the development of KAFKA-5657, there were several iterations where > method call didn't match what the connector parameter actually represents. > [~ewencp] had used connType as equivalent to connClass because Type wasn't > used to differentiate source vs sink. > [~ewencp] proposed the following: > {code} > It would help to convert all the uses of connType to connClass first, then > standardize on class == java class, type == source/sink, name == > user-specified name. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7643) Connectors do not unload even when tasks fail in kafka connect
satya created KAFKA-7643: Summary: Connectors do not unload even when tasks fail in kafka connect Key: KAFKA-7643 URL: https://issues.apache.org/jira/browse/KAFKA-7643 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: satya If there any issues in the tasks associated with a connector, even though the tasks fail, i have seen the connector themselves is not released many times. The only option out of this is like submitting an explicit request to delete the connector. There should be a way to shut down the connectors gracefully, if there are exceptions encountered in the task that are not retriable. In addition, Kafka connect also does not have a graceful exit option. There will be situations like server maintenance, outages etc where it would be prudent to gracefully shutdown the connectors rather than performing a DELETE through the REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5946) Give connector method parameter better name
[ https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690043#comment-16690043 ] Samuel Hawker edited comment on KAFKA-5946 at 11/16/18 9:58 PM: Hi, I notice this has not seen any activity in many months, if [~tanvijaywant31] is not still working on it, can I pick up the ticket? :) Just a few questions - I understand the part about renaming the class to connClass, but what do you mean by standardise? In this context does it just mean extend the class with these extra fields? (The class I am referring to is ConnectorType) was (Author: samuel hawker): Hi, I notice this has not seen any activity in many months, if [~tanvijaywant31] is not still working on it, can I pick up the ticket? :) Just a few questions - I understand the part about renaming the class to connClass, but what do you mean by standardise? In this context does it just mean extend the class with these extra fields? > Give connector method parameter better name > --- > > Key: KAFKA-5946 > URL: https://issues.apache.org/jira/browse/KAFKA-5946 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Tanvi Jaywant >Priority: Major > Labels: connector, newbie > > During the development of KAFKA-5657, there were several iterations where > method call didn't match what the connector parameter actually represents. > [~ewencp] had used connType as equivalent to connClass because Type wasn't > used to differentiate source vs sink. > [~ewencp] proposed the following: > {code} > It would help to convert all the uses of connType to connClass first, then > standardize on class == java class, type == source/sink, name == > user-specified name. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate
[ https://issues.apache.org/jira/browse/KAFKA-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690040#comment-16690040 ] John Roesler commented on KAFKA-6567: - This seems fine to me. Unless [~sawyna] wants to protest, I'd say to go ahead and submit the PR. > KStreamWindowReduce can be replaced by KStreamWindowAggregate > - > > Key: KAFKA-6567 > URL: https://issues.apache.org/jira/browse/KAFKA-6567 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Yaswanth Kumar >Priority: Major > Labels: newbie > > This is a tech debt worth cleaning up: KStreamWindowReduce should be able to > be replaced by KStreamWindowAggregate. In fact, we have already done this for > session windows, where in {{SessionWindowedKStreamImpl}} we use > {{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}} > to replace reducer with aggregator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate
[ https://issues.apache.org/jira/browse/KAFKA-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689983#comment-16689983 ] Samuel Hawker edited comment on KAFKA-6567 at 11/16/18 9:38 PM: Hi, I notice this has not seen any activity in many months, if [~sawyna] is not still working on it, can I pick up the ticket? :) (I have the code changes ready to submit as a Pull request) was (Author: samuel hawker): Hi, I notice this has not seen any activity in many months, if [~sawyna] is not still working on it, can I pick up the ticket? :) > KStreamWindowReduce can be replaced by KStreamWindowAggregate > - > > Key: KAFKA-6567 > URL: https://issues.apache.org/jira/browse/KAFKA-6567 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Yaswanth Kumar >Priority: Major > Labels: newbie > > This is a tech debt worth cleaning up: KStreamWindowReduce should be able to > be replaced by KStreamWindowAggregate. In fact, we have already done this for > session windows, where in {{SessionWindowedKStreamImpl}} we use > {{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}} > to replace reducer with aggregator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690013#comment-16690013 ] Stanislav Kozlovski commented on KAFKA-7641: Thanks for opening the ticket [~bchen225242] . This change will require a KIP as it is a public interface change. I'm willing to go ahead with this if you'd like, as I know you're already working on KIP-345. > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-7641: -- Assignee: Boyang Chen > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()
[ https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690001#comment-16690001 ] Eugen Feller commented on KAFKA-7634: - Hi [~guozhang] Thanks a lot for your quick reply. I will give it a try with one service. We have a mono repo. Changing the library in a major version would require me to change a lot of services. I am currently using [https://github.com/manub/scalatest-embedded-kafka] to provide embedded Kafka for some unit tests. For others, I am using [https://github.com/jpzk/mockedstreams], which is a wrapper around TopologyTestDriver. Will try to use the TopologyTestDriver directly to reproduce. > Punctuate not being called with merge() and/or outerJoin() > -- > > Key: KAFKA-7634 > URL: https://issues.apache.org/jira/browse/KAFKA-7634 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.3 >Reporter: Eugen Feller >Priority: Major > > Hi all, > I am using the Processor API and having trouble to get Kafka streams > v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). > Specifically, I am having a topology where I am doing flatMapValues() -> > merge() and/or outerJoin -> transform(). If I dont call merge() and/or > outerJoin() before transform(), punctuate is being called as expected. > Thank you very much in advance for your help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate
[ https://issues.apache.org/jira/browse/KAFKA-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689983#comment-16689983 ] Samuel Hawker commented on KAFKA-6567: -- Hi, I notice this has not seen any activity in many months, if [~sawyna] is not still working on it, can I pick up the ticket? :) > KStreamWindowReduce can be replaced by KStreamWindowAggregate > - > > Key: KAFKA-6567 > URL: https://issues.apache.org/jira/browse/KAFKA-6567 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Yaswanth Kumar >Priority: Major > Labels: newbie > > This is a tech debt worth cleaning up: KStreamWindowReduce should be able to > be replaced by KStreamWindowAggregate. In fact, we have already done this for > session windows, where in {{SessionWindowedKStreamImpl}} we use > {{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}} > to replace reducer with aggregator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4453) add request prioritization
[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689957#comment-16689957 ] ASF GitHub Bot commented on KAFKA-4453: --- gitlw closed pull request #5783: KAFKA-4453: Separating controller connections and requests from the data plane (KIP-291) URL: https://github.com/apache/kafka/pull/5783 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ecf6fbf33f1..29e10383afe 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -108,14 +108,16 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf private def addNewBroker(broker: Broker) { val messageQueue = new LinkedBlockingQueue[QueueItem] debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}") -val brokerNode = broker.node(config.interBrokerListenerName) +val controlPlaneListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) +val controlPlaneSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) +val brokerNode = broker.node(controlPlaneListenerName) val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ") val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( -config.interBrokerSecurityProtocol, +controlPlaneSecurityProtocol, JaasContext.Type.SERVER, config, -config.interBrokerListenerName, +controlPlaneListenerName, config.saslMechanismInterBrokerProtocol, config.saslInterBrokerHandshakeRequestEnable ) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 00b09688c5b..0bc8a2a6ff9 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -40,6 +40,7 @@ object RequestChannel extends Logging { private val requestLogger = Logger("kafka.request.logger") val RequestQueueSizeMetric = "RequestQueueSize" + val ControlPlaneRequestQueueSizeMetric = "ControlPlaneRequestQueueSize" val ResponseQueueSizeMetric = "ResponseQueueSize" val ProcessorMetricTag = "processor" @@ -272,13 +273,13 @@ object RequestChannel extends Logging { } } -class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup { +class RequestChannel(val queueSize: Int, val requestQueueSizeMetric: String) extends KafkaMetricsGroup { import RequestChannel._ val metrics = new RequestChannel.Metrics private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val processors = new ConcurrentHashMap[Int, Processor]() - newGauge(RequestQueueSizeMetric, new Gauge[Int] { + newGauge(requestQueueSizeMetric, new Gauge[Int] { def value = requestQueue.size }) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 1365f90f763..c3dac79f5f9 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -65,8 +65,14 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics") memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE - val requestChannel = new RequestChannel(maxQueuedRequests) - private val processors = new ConcurrentHashMap[Int, Processor]() + val dataRequestChannel = new RequestChannel(maxQueuedRequests, RequestChannel.RequestQueueSizeMetric) + var controlPlaneRequestChannel: RequestChannel = null + if (config.controlPlaneListenerName.isDefined) { +controlPlaneRequestChannel = new RequestChannel(20, RequestChannel.ControlPlaneRequestQueueSizeMetric) + } + private val dataProcessors = new ConcurrentHashMap[Int, Processor]() + // there should be only one controller processor, however we use a map to store it so that we can reuse the logic for data processors + private[network] val controlPlaneProcessors =
[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7641: --- Description: In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason concluded an edge case of current consumer protocol which could cause memory burst on broker side: ```the case we observed in practice was caused by a consumer that was slow to rejoin the group after a rebalance had begun. At the same time, there were new members that were trying to join the group for the first time. The request timeout was significantly lower than the rebalance timeout, so the JoinGroup of the new members kept timing out. The timeout caused a retry and the group size eventually become quite large because we could not detect the fact that the new members were no longer there.``` Since many disorganized join group requests are spamming the group metadata, we should define a cap on broker side to avoid one consumer group from growing too large. So far I feel it's appropriate to introduce this as a server config since most times this value is only dealing with error scenarios, client users shouldn't worry about this config. was: In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason concluded an edge case of current consumer protocol which could cause memory burst on broker side: ```the case we observed in practice was caused by a consumer that was slow to rejoin the group after a rebalance had begun. At the same time, there were new members that were trying to join the group for the first time. The request timeout was significantly lower than the rebalance timeout, so the JoinGroup of the new members kept timing out. The timeout caused a retry and the group size eventually become quite large because we could not detect the fact that the new members were no longer there.``` Since many disorganized join group requests are spamming the group metadata, we should define a cap on broker side to avoid one consumer group from growing too large. So far I feel it's appropriate to introduce this as a server config since most times this value is only dealing with error scenarios. > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689925#comment-16689925 ] Boyang Chen commented on KAFKA-7641: [~hachikuji] [~enether] > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7641: --- Description: In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason concluded an edge case of current consumer protocol which could cause memory burst on broker side: ```the case we observed in practice was caused by a consumer that was slow to rejoin the group after a rebalance had begun. At the same time, there were new members that were trying to join the group for the first time. The request timeout was significantly lower than the rebalance timeout, so the JoinGroup of the new members kept timing out. The timeout caused a retry and the group size eventually become quite large because we could not detect the fact that the new members were no longer there.``` Since many disorganized join group requests are spamming the group metadata, we should define a cap on broker side to avoid one consumer group from growing too large. So far I feel it's appropriate to introduce this as a server config since most times this value is only dealing with error scenarios. > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-7641: --- Summary: Add `consumer.group.max.size` to cap consumer metadata size on broker (was: Add `group.max.size` to cap group metadata on broker) > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7641) Add `group.max.size` to cap group metadata on broker
Boyang Chen created KAFKA-7641: -- Summary: Add `group.max.size` to cap group metadata on broker Key: KAFKA-7641 URL: https://issues.apache.org/jira/browse/KAFKA-7641 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689916#comment-16689916 ] Boyang Chen commented on KAFKA-7610: Sounds good Jason! I think combining `group.max.size` and member id requirement in join group request should be sufficient for the solving the above scenario. I will open a separate Jira for group size, in the meanwhile I think the conclusion is that Jason's original proposal 2 should be sufficient to mitigate the problem. > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb
hitesh gollahalli bachanna created KAFKA-7640: - Summary: Kafka stream interactive query not returning data when state is backed by rocksdb Key: KAFKA-7640 URL: https://issues.apache.org/jira/browse/KAFKA-7640 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.0.0 Reporter: hitesh gollahalli bachanna I have a kafka stream app running with 36 different instance (one for each partition). Each instance come up one after the other. And I am building rest service on top of the state to access the data. Here some code that I use: {code:java} StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); --> call this find ouy which host has the key if (localSelf.host().equals(hostStoreInfo.getHost())) { get the key from local store } else { call the remote host using restTemplate }{code} The problem now is `metadata` object returned has a different host/ip but the data is on a different node. I was able to see using some application logs I printed. This happens every time I start my application. The `allMetadata` method in `KafkaStreams` class says the value will be update as when the partition get reassigned. But its not happening in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689866#comment-16689866 ] Jason Gustafson commented on KAFKA-7610: For some background, the case we observed in practice was caused by a consumer that was slow to rejoin the group after a rebalance had begun. At the same time, there were new members that were trying to join the group for the first time. The request timeout was significantly lower than the rebalance timeout, so the JoinGroup of the new members kept timing out. The timeout caused a retry and the group size eventually become quite large because we could not detect the fact that the new members were no longer there. Although I think it may be useful As Stanislav mentions, the main advantage of detecting disconnected consumers is that it would work for older clients. Probably we should just do this if the complexity is not too high. It is not a complete solution though because detection of a connection which did not close cleanly will still take a significant amount of time. I think we should consider the protocol improvement in addition to this so that we have direct control over failure detection going forward. Furthermore, I think our approach of holding JoinGroup requests in purgatory for long amounts of times has been problematic. It forces users to tradeoff the time to detect failed connections using `request.timeout.ms` with the time needed to complete a rebalance in the worst case. We kind of hacked around this in the consumer by allowing the JoinGroup to override the `request.timeout.ms` provided by the user. So users can provide a reasonable timeout value and detect failures in a reasonable time for every other API, but JoinGroup is still an issue. Really I'm not sure it's ever a good idea to have a request sitting on the broker for minutes. For example, this has bad interplay with the idle connection timeout. The broker will proactively close connections that aren't seeing any activity even if we've still got a JoinGroup request sitting in purgatory. It would be better to move to a model which incorporated client polling so that request times could be reasonably controlled. Of course we also don't want to the JoinGroup to introduce a lot of new request traffic, so holding the request for shorter durations may be reasonable. In any case, if we solve this problem, then we probably also solve the issue here. As for `group.max.size`, I agree with Stanislav and Boyang that it could be helpful, but I think that is a separate discussion. Even if we had this, we would still want a way to detect failures for the initial join. Perhaps one of you can open a JIRA? TLDR; 1. We probably should do the disconnect option as an initial step since it addresses the problem for all client version. 2. We should also consider protocol improvements to avoid holding the JoinGroup requests in purgatory for indefinite amounts of time. > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the
[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689838#comment-16689838 ] Matthias J. Sax commented on KAFKA-7595: Glad it works. Thanks for confirmation! > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4453) add request prioritization
[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689832#comment-16689832 ] ASF GitHub Bot commented on KAFKA-4453: --- MayureshGharat opened a new pull request #5921: KAFKA-4453 : Added code to separate controller connections and requests from the data plane URL: https://github.com/apache/kafka/pull/5921 KIP-291 Implementation : Added code to separate controller connections and requests from the data plane. - Tested with local deployment that the controller request are handled by the control plane and other requests are handled by the data plane. - Also added unit tests in order to test the functionality. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > add request prioritization > -- > > Key: KAFKA-4453 > URL: https://issues.apache.org/jira/browse/KAFKA-4453 > Project: Kafka > Issue Type: Improvement >Reporter: Onur Karaman >Assignee: Mayuresh Gharat >Priority: Major > > Today all requests (client requests, broker requests, controller requests) to > a broker are put into the same queue. They all have the same priority. So a > backlog of requests ahead of the controller request will delay the processing > of controller requests. This causes requests infront of the controller > request to get processed based on stale state. > Side effects may include giving clients stale metadata\[1\], rejecting > ProduceRequests and FetchRequests\[2\], and data loss (for some > unofficial\[3\] definition of data loss in terms of messages beyond the high > watermark)\[4\]. > We'd like to minimize the number of requests processed based on stale state. > With request prioritization, controller requests get processed before regular > queued up requests, so requests can get processed with up-to-date state. > \[1\] Say a client's MetadataRequest is sitting infront of a controller's > UpdateMetadataRequest on a given broker's request queue. Suppose the > MetadataRequest is for a topic whose partitions have recently undergone > leadership changes and that these leadership changes are being broadcasted > from the controller in the later UpdateMetadataRequest. Today the broker > processes the MetadataRequest before processing the UpdateMetadataRequest, > meaning the metadata returned to the client will be stale. The client will > waste a roundtrip sending requests to the stale partition leader, get a > NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the > topic metadata again. > \[2\] Clients can issue ProduceRequests to the wrong broker based on stale > metadata, causing rejected ProduceRequests. Based on how long the client acts > based on the stale metadata, the impact may or may not be visible to a > producer application. If the number of rejected ProduceRequests does not > exceed the max number of retries, the producer application would not be > impacted. On the other hand, if the retries are exhausted, the failed produce > will be visible to the producer application. > \[3\] The official definition of data loss in kafka is when we lose a > "committed" message. A message is considered "committed" when all in sync > replicas for that partition have applied it to their log. > \[4\] Say a number of ProduceRequests are sitting infront of a controller's > LeaderAndIsrRequest on a given broker's request queue. Suppose the > ProduceRequests are for partitions whose leadership has recently shifted out > from the current broker to another broker in the replica set. Today the > broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning > the ProduceRequests are getting processed on the former partition leader. As > part of becoming a follower for a partition, the broker truncates the log to > the high-watermark. With weaker ack settings such as acks=1, the leader may > successfully write to its own log, respond to the user with a success, > process the LeaderAndIsrRequest making the broker a follower of the > partition, and truncate the log to a point before the user's produced > messages. So users have a false sense that their produce attempt succeeded > while in reality their messages got erased. While technically part of what > they signed up for with acks=1, it can still come as a surprise. -- This message was sent by
[jira] [Commented] (KAFKA-7639) Read one request at a time from socket to reduce broker memory usage
[ https://issues.apache.org/jira/browse/KAFKA-7639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689730#comment-16689730 ] ASF GitHub Bot commented on KAFKA-7639: --- rajinisivaram opened a new pull request #5920: [DO NOT MERGE] KAFKA-7639: Read one request at a time from socket to avoid OOM URL: https://github.com/apache/kafka/pull/5920 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Read one request at a time from socket to reduce broker memory usage > > > Key: KAFKA-7639 > URL: https://issues.apache.org/jira/browse/KAFKA-7639 > Project: Kafka > Issue Type: Improvement > Components: network >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.2.0 > > > Broker's Selector currently reads all requests available on the socket when > the socket is ready for read. These are queued up as staged receives. We mute > the channel and stop reading any more data until all the staged requests are > processed. This behaviour is slightly inconsistent since for the initial read > we drain the socket buffer, allowing it to get filled up again, but if data > arrives slighly after the initial read, then we dont read from the socket > buffer until pending requests are processed. > To avoid holding onto requests for longer than required, we should read one > request at a time even if more data is available in the socket buffer. This > is especially useful for produce requests which may be large and may take > long to process. > Note that with the default socket read buffer size of 100K, this is not a > critical issue. But with larger socket buffers, this could result in > excessive memory usage if a lot of produce requests are buffered in the > broker and the producer times out, reconnects and sends more data before > broker has cleared older requests. By reading one-at-a-time, we reduce the > amount of time the broker holds onto memory for each request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7637) Error while writing to checkpoint file due to too many open files
[ https://issues.apache.org/jira/browse/KAFKA-7637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689570#comment-16689570 ] Ismael Juma commented on KAFKA-7637: Thanks for the report. 65k is generally low for Kafka. It would be good to verify if there is a leak of some sort here although we have clusters running for a long time and haven't seen errors like this one. > Error while writing to checkpoint file due to too many open files > - > > Key: KAFKA-7637 > URL: https://issues.apache.org/jira/browse/KAFKA-7637 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1 > Environment: Red Hat Enterprise Linux Server release 7.4 (Maipo) >Reporter: Sander van Loo >Priority: Major > > We are running a 3 node Kafka cluster on version 1.1.1 on Red Hat Linux 7. > Max open files is set to 65000. > After running for a few days the nodes have the following open file counts: > * node01d: 2712 > * node01e: 2770 > * node01f: 4102 > After a few weeks of runtime cluster crashes with the following error: > > {noformat} > [2018-11-12 07:05:16,790] ERROR Error while writing to checkpoint file > /var/lib/kafka/topics/replication-offset-checkpoint > (kafka.server.LogDirFailureChannel) > java.io.FileNotFoundException: > /var/lib/kafka/topics/replication-offset-checkpoint.tmp (Too many open files) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at > kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52) > at > kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50) > at > kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:59) > at > kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9(ReplicaManager.scala:1384) > at > kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9$adapted(ReplicaManager.scala:1384) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7(ReplicaManager.scala:1384) > at > kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7$adapted(ReplicaManager.scala:1381) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > at > kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1381) > at > kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:242) > at > kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > followed by this one: > {noformat} > [2018-11-12 07:05:16,792] ERROR [ReplicaManager broker=3] Error while writing > to highwatermark file in directory /var/lib/kafka/topics > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.KafkaStorageException: Error while writing to > checkpoint file /var/lib/kafka/topics/replication-offset-checkpoint > Caused by: java.io.FileNotFoundException: > /var/lib/kafka/topics/replication-offset-checkpoint.tmp (Too many open files) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at > kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52) > at > kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50) > at > kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:59) > at >
[jira] [Updated] (KAFKA-7635) FetcherThread stops processing after "Error processing data for partition"
[ https://issues.apache.org/jira/browse/KAFKA-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Aerts updated KAFKA-7635: Description: After disabling unclean leader leader again after recovery of a situation where we enabled unclean leader due to a split brain in zookeeper, we saw that some of our brokers stopped replicating their partitions. Digging into the logs, we saw that the replica thread was stopped because one partition had a failure which threw a [{{Error processing data for partition}} exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207]. But the broker kept running and serving the partitions from which it was leader. We saw three different types of exceptions triggering this (example stacktraces attached): * {{kafka.common.UnexpectedAppendOffsetException}} * {{Trying to roll a new log segment for topic partition partition-b-97 with start offset 1388 while it already exists.}} * {{Kafka scheduler is not running.}} We think there are two acceptable ways for the kafka broker to handle this: * Mark those partitions as a partition with error and handle them accordingly. As is done [when a {{CorruptRecordException}} or {{KafkaStorageException}}|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L196] is thrown. * Exit the broker as is done [when log truncation is not allowed|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L189]. Maybe even a combination of both. Our probably naive idea is that for the first two types the first strategy would be the best, but for the last type, it is probably better to re-throw a {{FatalExitError}} and exit the broker. was: After disabling unclean leader leader again after recovery of a situation where we enabled unclean leader due to a split brain in zookeeper, we saw that some of our stopped replicating their partitions. Digging into the logs, we saw that the replica thread was stopped because one partition had a failure which threw a [{{Error processing data for partition}} exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207]. But the broker kept running and serving the partitions from which it was leader. We saw three different types of exceptions triggering this (example stacktraces attached): * {{kafka.common.UnexpectedAppendOffsetException}} * {{Trying to roll a new log segment for topic partition partition-b-97 with start offset 1388 while it already exists.}} * {{Kafka scheduler is not running.}} We think there are two acceptable ways for the kafka broker to handle this: * Mark those partitions as a partition with error and handle them accordingly. As is done [when a {{CorruptRecordException}} or {{KafkaStorageException}}|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L196] is thrown. * Exit the broker as is done [when log truncation is not allowed|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L189]. Maybe even a combination of both. Our probably naive idea is that for the first two types the first strategy would be the best, but for the last type, it is probably better to re-throw a {{FatalExitError}} and exit the broker. > FetcherThread stops processing after "Error processing data for partition" > -- > > Key: KAFKA-7635 > URL: https://issues.apache.org/jira/browse/KAFKA-7635 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 2.0.0 >Reporter: Steven Aerts >Priority: Major > Attachments: stacktraces.txt > > > After disabling unclean leader leader again after recovery of a situation > where we enabled unclean leader due to a split brain in zookeeper, we saw > that some of our brokers stopped replicating their partitions. > Digging into the logs, we saw that the replica thread was stopped because one > partition had a failure which threw a [{{Error processing data for > partition}} > exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207]. > But the broker kept running and serving the partitions from which it was > leader. > We saw three different types of exceptions triggering this (example > stacktraces attached): > * {{kafka.common.UnexpectedAppendOffsetException}} > * {{Trying to roll a new log segment for topic partition partition-b-97 with > start offset 1388 while it already exists.}} > * {{Kafka scheduler is not running.}} > We think there are two acceptable ways for the kafka broker to handle this: > *
[jira] [Created] (KAFKA-7638) Trogdor - Support mass task creation endpoint
Stanislav Kozlovski created KAFKA-7638: -- Summary: Trogdor - Support mass task creation endpoint Key: KAFKA-7638 URL: https://issues.apache.org/jira/browse/KAFKA-7638 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Trogdor supports the creation of tasks via the `coordinator/tasks/create` endpoint - it currently accepts only one task. Since Trogdor support scheduling multiple jobs to execute at a certain time (via the `startTime` task parameter leveraged by all tasks), it makes sense to support creating multiple tasks in a single endpoint. Users might want to leverage the scheduler to, say, create 100 tasks. In the current model, they would need to issue 100 requests - which is inefficient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7637) Error while writing to checkpoint file due to too many open files
[ https://issues.apache.org/jira/browse/KAFKA-7637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sander van Loo updated KAFKA-7637: -- Summary: Error while writing to checkpoint file due to too many open files (was: ERROR Error while writing to checkpoint file due to too many open files) > Error while writing to checkpoint file due to too many open files > - > > Key: KAFKA-7637 > URL: https://issues.apache.org/jira/browse/KAFKA-7637 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1 > Environment: Red Hat Enterprise Linux Server release 7.4 (Maipo) >Reporter: Sander van Loo >Priority: Major > > We are running a 3 node Kafka cluster on version 1.1.1 on Red Hat Linux 7. > Max open files is set to 65000. > After running for a few days the nodes have the following open file counts: > * node01d: 2712 > * node01e: 2770 > * node01f: 4102 > After a few weeks of runtime cluster crashes with the following error: > > {noformat} > [2018-11-12 07:05:16,790] ERROR Error while writing to checkpoint file > /var/lib/kafka/topics/replication-offset-checkpoint > (kafka.server.LogDirFailureChannel) > java.io.FileNotFoundException: > /var/lib/kafka/topics/replication-offset-checkpoint.tmp (Too many open files) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at > kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52) > at > kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50) > at > kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:59) > at > kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9(ReplicaManager.scala:1384) > at > kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9$adapted(ReplicaManager.scala:1384) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7(ReplicaManager.scala:1384) > at > kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7$adapted(ReplicaManager.scala:1381) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > at > kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1381) > at > kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:242) > at > kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > followed by this one: > {noformat} > [2018-11-12 07:05:16,792] ERROR [ReplicaManager broker=3] Error while writing > to highwatermark file in directory /var/lib/kafka/topics > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.KafkaStorageException: Error while writing to > checkpoint file /var/lib/kafka/topics/replication-offset-checkpoint > Caused by: java.io.FileNotFoundException: > /var/lib/kafka/topics/replication-offset-checkpoint.tmp (Too many open files) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at > kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52) > at > kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50) > at > kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:59) > at > kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9(ReplicaManager.scala:1384) > at >
[jira] [Updated] (KAFKA-7636) Allow consumer to update maxPollRecords value
[ https://issues.apache.org/jira/browse/KAFKA-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kcirtap Seven updated KAFKA-7636: - Labels: pull-request-available (was: ) Description: Hi, We have two use cases where we would need to change the max.poll.records parameter on the fly : 1. We offer a REST API to get 'feedbacks'. This API takes into account a parameter 'count'. The system was previously based on cassandra. It is now based on kafka and 'feedbacks' are stored into kafka topics. To be compliant with the legacy interface contract, we would like to be able to change the max.poll.records on the fly to take into account this 'count' parameter. 2. We receive 'notification requests' related to a 'sender' via a REST API. We store those requests into topics (by sender). Each sender is associated with a weight. Here is the algorithm that process the requests : 1. At each iteration, we process at max n records (n configurable) for the whole bunch of requests. For this example, let's say 100. 2. We compute the max poll records for each sender. Let's say we have 3 senders with the following weight 2, 1, 1. Hence 50 records max for the first one, 25 for the others two. 3. We consume the topics one after the other. We would like to reallocate some capacity to remaining consumers if the max.poll.records is not reached for the current consumer. Let'say at each iteration we make the following synchronous calls : sender1Consumer.poll() with computed max.poll.records 50 sender2Consumer.poll() with computed max.poll.records 25 sender3Consumer.poll() with computed max.poll.records 25 If the first call returns only 10 records, we would like to reallocate the 40 "spare" records to the other consumers, 20 for each for instance (or another strategy). We would make the following calls instead : sender2Consumer.poll() with updated max.poll.records 45 sender3Consumer.poll() with updated max.poll.records 45 For that requirement we also need to change the max.poll.records on the fly. PR: https://github.com/apache/kafka/pull/5919 Regards, was: Hi, We have two use cases where we would need to change the max.poll.records parameter on the fly : 1. We offer a REST API to get 'feedbacks'. This API takes into account a parameter 'count'. The system was previously based on cassandra. It is now based on kafka and 'feedbacks' are stored into kafka topics. To be compliant with the legacy interface contract, we would like to be able to change the max.poll.records on the fly to take into account this 'count' parameter. 2. We receive 'notification requests' related to a 'sender' via a REST API. We store those requests into topics (by sender). Each sender is associated with a weight. Here is the algorithm that process the requests : 1. At each iteration, we process at max n records (n configurable) for the whole bunch of requests. For this example, let's say 100. 2. We compute the max poll records for each sender. Let's say we have 3 senders with the following weight 2, 1, 1. Hence 50 records max for the first one, 25 for the others two. 3. We consume the topics one after the other. We would like to reallocate some capacity to remaining consumers if the max.poll.records is not reached for the current consumer. Let'say at each iteration we make the following synchronous calls : sender1Consumer.poll() with computed max.poll.records 50 sender2Consumer.poll() with computed max.poll.records 25 sender3Consumer.poll() with computed max.poll.records 25 If the first call returns only 10 records, we would like to reallocate the 40 "spare" records to the other consumers, 20 for each for instance (or another strategy). We would make the following calls instead : sender2Consumer.poll() with updated max.poll.records 45 sender3Consumer.poll() with updated max.poll.records 45 For that requirement we also need to change the max.poll.records on the fly. Regards, > Allow consumer to update maxPollRecords value > - > > Key: KAFKA-7636 > URL: https://issues.apache.org/jira/browse/KAFKA-7636 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.0.1 >Reporter: Kcirtap Seven >Priority: Minor > Labels: pull-request-available > > Hi, > We have two use cases where we would need to change the max.poll.records > parameter on the fly : > 1. We offer a REST API to get 'feedbacks'. This API takes into account a > parameter 'count'. > The system was previously based on cassandra. It is now based on kafka and > 'feedbacks' are stored into kafka topics. > To be compliant with
[jira] [Commented] (KAFKA-7636) Allow consumer to update maxPollRecords value
[ https://issues.apache.org/jira/browse/KAFKA-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689465#comment-16689465 ] ASF GitHub Bot commented on KAFKA-7636: --- kcirtap7 opened a new pull request #5919: KAFKA-7636: Allow consumer to update maxPollRecords value URL: https://github.com/apache/kafka/pull/5919 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow consumer to update maxPollRecords value > - > > Key: KAFKA-7636 > URL: https://issues.apache.org/jira/browse/KAFKA-7636 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.0.1 >Reporter: Kcirtap Seven >Priority: Minor > > Hi, > We have two use cases where we would need to change the max.poll.records > parameter on the fly : > 1. We offer a REST API to get 'feedbacks'. This API takes into account a > parameter 'count'. > The system was previously based on cassandra. It is now based on kafka and > 'feedbacks' are stored into kafka topics. > To be compliant with the legacy interface contract, we would like to be able > to change the max.poll.records on the fly to take into account this 'count' > parameter. > 2. We receive 'notification requests' related to a 'sender' via a REST API. > We store those requests into topics (by sender). > Each sender is associated with a weight. Here is the algorithm that process > the requests : > 1. At each iteration, we process at max n records (n configurable) for > the whole bunch of requests. For this example, let's say 100. > 2. We compute the max poll records for each sender. Let's say we have 3 > senders with the following weight 2, 1, 1. Hence 50 records max for the first > one, 25 for the others two. > 3. We consume the topics one after the other. We would like to reallocate > some capacity to remaining consumers if the max.poll.records is not reached > for the current consumer. Let'say at each iteration we make the following > synchronous calls : > sender1Consumer.poll() with computed max.poll.records 50 > sender2Consumer.poll() with computed max.poll.records 25 > sender3Consumer.poll() with computed max.poll.records 25 > If the first call returns only 10 records, we would like to reallocate > the 40 "spare" records to the other consumers, 20 for each for instance (or > another strategy). We would make the following calls instead : > sender2Consumer.poll() with updated max.poll.records 45 > sender3Consumer.poll() with updated max.poll.records 45 > > For that requirement we also need to change the max.poll.records on the fly. > Regards, -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7636) Allow consumer to update maxPollRecords value
Kcirtap Seven created KAFKA-7636: Summary: Allow consumer to update maxPollRecords value Key: KAFKA-7636 URL: https://issues.apache.org/jira/browse/KAFKA-7636 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 2.0.1 Reporter: Kcirtap Seven Hi, We have two use cases where we would need to change the max.poll.records parameter on the fly : 1. We offer a REST API to get 'feedbacks'. This API takes into account a parameter 'count'. The system was previously based on cassandra. It is now based on kafka and 'feedbacks' are stored into kafka topics. To be compliant with the legacy interface contract, we would like to be able to change the max.poll.records on the fly to take into account this 'count' parameter. 2. We receive 'notification requests' related to a 'sender' via a REST API. We store those requests into topics (by sender). Each sender is associated with a weight. Here is the algorithm that process the requests : 1. At each iteration, we process at max n records (n configurable) for the whole bunch of requests. For this example, let's say 100. 2. We compute the max poll records for each sender. Let's say we have 3 senders with the following weight 2, 1, 1. Hence 50 records max for the first one, 25 for the others two. 3. We consume the topics one after the other. We would like to reallocate some capacity to remaining consumers if the max.poll.records is not reached for the current consumer. Let'say at each iteration we make the following synchronous calls : sender1Consumer.poll() with computed max.poll.records 50 sender2Consumer.poll() with computed max.poll.records 25 sender3Consumer.poll() with computed max.poll.records 25 If the first call returns only 10 records, we would like to reallocate the 40 "spare" records to the other consumers, 20 for each for instance (or another strategy). We would make the following calls instead : sender2Consumer.poll() with updated max.poll.records 45 sender3Consumer.poll() with updated max.poll.records 45 For that requirement we also need to change the max.poll.records on the fly. Regards, -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7565) NPE in KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7565. --- Resolution: Duplicate Fix Version/s: (was: 2.2.0) [~avakhrenev] Thanks for testing, closing this issue. > NPE in KafkaConsumer > > > Key: KAFKA-7565 > URL: https://issues.apache.org/jira/browse/KAFKA-7565 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.1 >Reporter: Alexey Vakhrenev >Priority: Critical > > The stacktrace is > {noformat} > java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > {noformat} > Couldn't find minimal reproducer, but it happens quite often in our system. > We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is > somehow related. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7054) Kafka describe command should throw topic doesn't exist exception.
[ https://issues.apache.org/jira/browse/KAFKA-7054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689311#comment-16689311 ] ASF GitHub Bot commented on KAFKA-7054: --- ManoharVanam opened a new pull request #5211: [KAFKA-7054] Kafka describe command should throw topic doesn't exist exception. URL: https://github.com/apache/kafka/pull/5211 **User Interface Improvement :** If topic doesn't exist then Kafka describe command should throw topic doesn't exist exception, like alter and delete commands ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka describe command should throw topic doesn't exist exception. > -- > > Key: KAFKA-7054 > URL: https://issues.apache.org/jira/browse/KAFKA-7054 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Manohar Vanam >Priority: Minor > > If topic doesn't exist then Kafka describe command should throw topic doesn't > exist exception. > like alter and delete commands : > {code:java} > local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --delete > --topic manu > Error while executing topic command : Topic manu does not exist on ZK path > localhost:2181 > [2018-06-13 15:08:13,111] ERROR java.lang.IllegalArgumentException: Topic > manu does not exist on ZK path localhost:2181 > at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91) > at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:184) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:71) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$) > local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --alter > --topic manu > Error while executing topic command : Topic manu does not exist on ZK path > localhost:2181 > [2018-06-13 15:08:43,663] ERROR java.lang.IllegalArgumentException: Topic > manu does not exist on ZK path localhost:2181 > at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91) > at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:125) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:65) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7576) Dynamic update of replica fetcher threads may fail to start/close fetchers
[ https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7576. --- Resolution: Fixed Reviewer: Jason Gustafson > Dynamic update of replica fetcher threads may fail to start/close fetchers > -- > > Key: KAFKA-7576 > URL: https://issues.apache.org/jira/browse/KAFKA-7576 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.1, 2.0.1, 2.1.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.2, 2.1.1, 2.0.2 > > > KAFKA-6051 moved ReplicaFetcherBlockingSend shutdown earlier in the shutdown > sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers > can now throw an exception because Selector may be closed on a different > thread while data is being written on another thread. KAFKA-7464 changed this > behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and > not propagated to avoid exceptions during broker shutdown. > When config update notification of `num.replica.fetchers` is processed, > partitions are migrated as necessary to increase or decrease the number of > fetcher threads. Existing fetchers are shutdown first before new ones are > created.This migration is performed on the thread processing ZK change > notification. The shutdown of Selector of existing fetchers is not safe since > replica fetcher thread may be processing data at the time using the same > Selector. > Without the fix from KAFKA-7464, another update of the config or broker > restart is required to restart the replica fetchers after dynamic config > update if shutdown encounters an exception. > Exception stack trace: > {code:java} > java.lang.IllegalArgumentException > at java.nio.Buffer.position(Buffer.java:244) > at sun.nio.ch.IOUtil.write(IOUtil.java:68) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210) > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160) > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70) > at org.apache.kafka.common.network.Selector.doClose(Selector.java:748) > at org.apache.kafka.common.network.Selector.close(Selector.java:736) > at org.apache.kafka.common.network.Selector.close(Selector.java:698) > at org.apache.kafka.common.network.Selector.close(Selector.java:314) > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533) > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90) > at > kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86) > at > kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76) > at > kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > at > kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72) > at > kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88) > at > kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574) > at > kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410) > at > kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410) > at scala.collection.immutable.List.foreach(List.scala:392) > at > kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410) > kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with
[jira] [Commented] (KAFKA-7054) Kafka describe command should throw topic doesn't exist exception.
[ https://issues.apache.org/jira/browse/KAFKA-7054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689282#comment-16689282 ] ASF GitHub Bot commented on KAFKA-7054: --- ManoharVanam closed pull request #5211: [KAFKA-7054] Kafka describe command should throw topic doesn't exist exception. URL: https://github.com/apache/kafka/pull/5211 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka describe command should throw topic doesn't exist exception. > -- > > Key: KAFKA-7054 > URL: https://issues.apache.org/jira/browse/KAFKA-7054 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Manohar Vanam >Priority: Minor > > If topic doesn't exist then Kafka describe command should throw topic doesn't > exist exception. > like alter and delete commands : > {code:java} > local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --delete > --topic manu > Error while executing topic command : Topic manu does not exist on ZK path > localhost:2181 > [2018-06-13 15:08:13,111] ERROR java.lang.IllegalArgumentException: Topic > manu does not exist on ZK path localhost:2181 > at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91) > at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:184) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:71) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$) > local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --alter > --topic manu > Error while executing topic command : Topic manu does not exist on ZK path > localhost:2181 > [2018-06-13 15:08:43,663] ERROR java.lang.IllegalArgumentException: Topic > manu does not exist on ZK path localhost:2181 > at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91) > at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:125) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:65) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7635) FetcherThread stops processing after "Error processing data for partition"
[ https://issues.apache.org/jira/browse/KAFKA-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689278#comment-16689278 ] Steven Aerts commented on KAFKA-7635: - Similar but focusses on a specific issue. > FetcherThread stops processing after "Error processing data for partition" > -- > > Key: KAFKA-7635 > URL: https://issues.apache.org/jira/browse/KAFKA-7635 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 2.0.0 >Reporter: Steven Aerts >Priority: Major > Attachments: stacktraces.txt > > > After disabling unclean leader leader again after recovery of a situation > where we enabled unclean leader due to a split brain in zookeeper, we saw > that some of our stopped replicating their partitions. > Digging into the logs, we saw that the replica thread was stopped because one > partition had a failure which threw a [{{Error processing data for > partition}} > exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207]. > But the broker kept running and serving the partitions from which it was > leader. > We saw three different types of exceptions triggering this (example > stacktraces attached): > * {{kafka.common.UnexpectedAppendOffsetException}} > * {{Trying to roll a new log segment for topic partition partition-b-97 with > start offset 1388 while it already exists.}} > * {{Kafka scheduler is not running.}} > We think there are two acceptable ways for the kafka broker to handle this: > * Mark those partitions as a partition with error and handle them > accordingly. As is done [when a {{CorruptRecordException}} or > {{KafkaStorageException}}|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L196] > is thrown. > * Exit the broker as is done [when log truncation is not > allowed|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L189]. > > Maybe even a combination of both. Our probably naive idea is that for the > first two types the first strategy would be the best, but for the last type, > it is probably better to re-throw a {{FatalExitError}} and exit the broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689269#comment-16689269 ] Sachin Upadhyay commented on KAFKA-7280: [~rsivaram] Thanks for the detailed answers. [~ijuma] I rechecked and I can see "2.0.1" kafka-client in maven repo now. Is "1.1.2" also scheduled to be released anytime soon? I was thinking of upgrading clients to version having only minor-version bumped up instead of major-version. Thanks, -Sachin > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) > {quote} > > The logs just prior to the exception show that a partition was removed from > the session: > {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-1 because there is an > in-flight request to worker4:9095 (id: 3 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Completed receive from node 2 for FETCH with correlation id > 417, received > {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header= > Unknown macro: > \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null} > ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 > bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Added READ_UNCOMMITTED fetch request for partition > test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for > node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) > (org.apache.kafka.clients.FetchSessionHandler) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Sending READ_UNCOMMITTED
[jira] [Commented] (KAFKA-7565) NPE in KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689243#comment-16689243 ] Alexey Vakhrenev commented on KAFKA-7565: - [~rsivaram], [~ijuma] the issue seems to be fixed in 2.0.1, I didn't encounter this error anymore. So this can be closed I think. Thank you. > NPE in KafkaConsumer > > > Key: KAFKA-7565 > URL: https://issues.apache.org/jira/browse/KAFKA-7565 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.1 >Reporter: Alexey Vakhrenev >Priority: Critical > Fix For: 2.2.0 > > > The stacktrace is > {noformat} > java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > {noformat} > Couldn't find minimal reproducer, but it happens quite often in our system. > We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is > somehow related. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7635) FetcherThread stops processing after "Error processing data for partition"
Steven Aerts created KAFKA-7635: --- Summary: FetcherThread stops processing after "Error processing data for partition" Key: KAFKA-7635 URL: https://issues.apache.org/jira/browse/KAFKA-7635 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 2.0.0 Reporter: Steven Aerts Attachments: stacktraces.txt After disabling unclean leader leader again after recovery of a situation where we enabled unclean leader due to a split brain in zookeeper, we saw that some of our stopped replicating their partitions. Digging into the logs, we saw that the replica thread was stopped because one partition had a failure which threw a [{{Error processing data for partition}} exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207]. But the broker kept running and serving the partitions from which it was leader. We saw three different types of exceptions triggering this (example stacktraces attached): * {{kafka.common.UnexpectedAppendOffsetException}} * {{Trying to roll a new log segment for topic partition partition-b-97 with start offset 1388 while it already exists.}} * {{Kafka scheduler is not running.}} We think there are two acceptable ways for the kafka broker to handle this: * Mark those partitions as a partition with error and handle them accordingly. As is done [when a {{CorruptRecordException}} or {{KafkaStorageException}}|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L196] is thrown. * Exit the broker as is done [when log truncation is not allowed|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L189]. Maybe even a combination of both. Our probably naive idea is that for the first two types the first strategy would be the best, but for the last type, it is probably better to re-throw a {{FatalExitError}} and exit the broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6774) Improve default groupId behavior in consumer
[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6774. Resolution: Fixed > Improve default groupId behavior in consumer > > > Key: KAFKA-6774 > URL: https://issues.apache.org/jira/browse/KAFKA-6774 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > At the moment, the default groupId in the consumer is "". If you try to use > this to subscribe() to a topic, the broker will reject the group as invalid. > On the other hand, if you use it with assign(), then the user will be able to > fetch and commit offsets using the empty groupId. Probably 99% of the time, > this is not what the user expects. Instead you would probably expect that if > no groupId is provided, then no committed offsets will be fetched at all and > we'll just use the auto reset behavior if we don't have a current position. > Here are two potential solutions (both requiring a KIP): > 1. Change the default to null. We will preserve the current behavior for > subscribe(). When using assign(), we will not bother fetching committed > offsets for the null groupId, and any attempt to commit offsets will raise an > error. The user can still use the empty groupId, but they have to specify it > explicitly. > 2. Keep the current default, but change the consumer to treat this value as > if it were null as described in option 1. The argument for this behavior is > that using the empty groupId to commit offsets is inherently a dangerous > practice and should not be permitted. We'd have to convince ourselves that > we're fine not needing to allow the empty groupId for backwards compatibility > though. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer
[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689175#comment-16689175 ] ASF GitHub Bot commented on KAFKA-6774: --- hachikuji closed pull request #5877: KAFKA-6774: Improve the default group id behavior in KafkaConsumer (KIP-289) URL: https://github.com/apache/kafka/pull/5877 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 795a762a494..9cd5766ea3e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -277,7 +277,7 @@ ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) -.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) +.define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC) .define(SESSION_TIMEOUT_MS_CONFIG, Type.INT, 1, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3a756721fd8..5c673a58c10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -37,6 +37,8 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.JmxReporter; @@ -557,6 +559,7 @@ private final Logger log; private final String clientId; +private String groupId; private final ConsumerCoordinator coordinator; private final Deserializer keyDeserializer; private final Deserializer valueDeserializer; @@ -654,18 +657,23 @@ public KafkaConsumer(Properties properties, } @SuppressWarnings("unchecked") -private KafkaConsumer(ConsumerConfig config, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { +private KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) { try { String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); if (clientId.isEmpty()) clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; -String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); - +this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] "); this.log = logContext.logger(getClass()); +boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); +if (groupId == null) { // overwrite in case of default group id where the config is not explicitly provided +if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) +enableAutoCommit = false; +else if (enableAutoCommit) +throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used."); +} else if (groupId.isEmpty()) +log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release."); log.debug("Initializing the Kafka consumer"); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); @@ -678,8 +686,7 @@ private KafkaConsumer(ConsumerConfig config, .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricsTags);