[jira] [Updated] (KAFKA-7632) Add producer option to adjust compression level

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Description: 
The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression. Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.

Since it applies to the other compression codecs, we should add the same 
functionalities to them.

  was:
The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression. Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.

Since it applies to the other compresssion codecs, we should add same 
functionalities to them/


> Add producer option to adjust compression level
> ---
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compression codecs, we should add the same 
> functionalities to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Description: 
The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression. Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.

Since it applies to the other compresssion codecs, we should add same 
functionalities to them/

  was:
The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression.  Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.

 


> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compresssion codecs, we should add same 
> functionalities to them/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add producer option to adjust compression level

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Summary: Add producer option to adjust compression level  (was: Add option 
to kafka broker config to adjust compression level for zstd)

> Add producer option to adjust compression level
> ---
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compresssion codecs, we should add same 
> functionalities to them/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Description: 
The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression.  Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.

 

  was:The compression level for ZSTD is currently set to use the default level 
(3), which is a conservative setting that in some use cases eliminates the 
value that ZSTD provides with improved compression.  Each use case will vary, 
so exposing the level as a broker configuration setting will allow the user to 
adjust the level.


> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression.  Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Fix Version/s: (was: 2.1.0)
   2.2.0

> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression.  Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Component/s: (was: core)
 clients

> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression.  Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Priority: Major  (was: Trivial)

> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression.  Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7632:
--

Assignee: Lee Dongjin

> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Trivial
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression.  Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7647) Flaky test LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7647:

Description: 

{code}
kafka.log.LogCleanerParameterizedIntegrationTest >
testCleansCombinedCompactAndDeleteTopic[3] FAILED
    java.lang.AssertionError: Contents of the map shouldn't change
expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 ->
(354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353),
2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
(343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 ->
(348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but
was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354),
1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 ->
(342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
(343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 ->
(299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 ->
(355,355))>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:118)
        at
kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129)
{code}

  was:
kafka.log.LogCleanerParameterizedIntegrationTest >
testCleansCombinedCompactAndDeleteTopic[3] FAILED
    java.lang.AssertionError: Contents of the map shouldn't change
expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 ->
(354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353),
2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
(343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 ->
(348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but
was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354),
1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 ->
(342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
(343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 ->
(299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 ->
(355,355))>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:118)
        at
kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129)


> Flaky test 
> LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic
> -
>
> Key: KAFKA-7647
> URL: https://issues.apache.org/jira/browse/KAFKA-7647
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> {code}
> kafka.log.LogCleanerParameterizedIntegrationTest >
> testCleansCombinedCompactAndDeleteTopic[3] FAILED
>     java.lang.AssertionError: Contents of the map shouldn't change
> expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 ->
> (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353),
> 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 ->
> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but
> was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354),
> 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 ->
> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 ->
> (299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 ->
> (355,355))>
>         at org.junit.Assert.fail(Assert.java:88)
>         at org.junit.Assert.failNotEquals(Assert.java:834)
>         at org.junit.Assert.assertEquals(Assert.java:118)
>         at
> kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7578) Kafka streams: add possibility to choose multiple output topics

2018-11-16 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7578:
-
Labels: needs-kip user-experience  (was: needs-kip)

> Kafka streams: add possibility to choose multiple output topics 
> 
>
> Key: KAFKA-7578
> URL: https://issues.apache.org/jira/browse/KAFKA-7578
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Taras Danylchuk
>Priority: Minor
>  Labels: needs-kip, user-experience
>
> There is an awesome feature which was added in 2.0 kafka stream - possibility 
> to choose dynamically the output topic for topology, but in some cases it 
> could be useful to chose several topics withing the same cluster.
> Personally me - I met such case: I needed to route message based on its 
> content and by routes configuration to several topics.
> I've made a 'proposal' PR for this, unfortunately I couldn't find better way 
> to implement this:
> [https://github.com/apache/kafka/pull/5801]
> If this approach is OK, and improvement could be done in future versions, 
> please let me know and I'll finish PR code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7646) Flaky test SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690174#comment-16690174
 ] 

Dong Lin commented on KAFKA-7646:
-

This issue is not captured by the automatic test runs in 
https://builds.apache.org/job/kafka-2.0-jdk8. It is hard to debug this without 
having stacktrace.

Given that the test is related to Sasal and it passes most of the time, for the 
same reason as explained in https://issues.apache.org/jira/browse/KAFKA-7651, 
this does not appear to be a blocking issue for 2.1.0 release.

> Flaky test 
> SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe
> ---
>
> Key: KAFKA-7646
> URL: https://issues.apache.org/jira/browse/KAFKA-7646
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> This test is reported to fail by [~enothereska] as part of 2.1.0 RC0 release 
> certification.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690175#comment-16690175
 ] 

Guozhang Wang commented on KAFKA-7577:
--

Hi [~dthomas-trimble], any luck trying to reproduce the issue as a unit test 
with TopologyTestDriver so that we can further investigate?

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7651:

Issue Type: Sub-task  (was: Task)
Parent: KAFKA-7645

> Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
> ---
>
> Key: KAFKA-7651
> URL: https://issues.apache.org/jira/browse/KAFKA-7651
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> Here is stacktrace from 
> https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/
> {code}
> Error Message
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
> Stacktrace
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690173#comment-16690173
 ] 

Dong Lin commented on KAFKA-7651:
-

The test failure is related to SSL handshake. In general SSL handshake is a 
stateful operation without timeout. Thus it is understandable the SSL logic in 
the test may be flaky if the GC pause is long or there is ephemeral network 
issue in the test.

There are same test failure for 2.0 branch in 
https://builds.apache.org/job/kafka-2.0-jdk8/183/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts.

Since user has been running Kafka 2.0.0 well without major issues, the test 
failure here should not be a blocking issue for 2.1.0 release.

> Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
> ---
>
> Key: KAFKA-7651
> URL: https://issues.apache.org/jira/browse/KAFKA-7651
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> Here is stacktrace from 
> https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/
> {code}
> Error Message
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
> Stacktrace
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7312:

Comment: was deleted

(was: Here is another stacktrace from 
https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/

{code}
Error Message
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
Stacktrace
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
at 
kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
{code}
)

> Transient failure in 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
> 
>
> Key: KAFKA-7312
> URL: https://issues.apache.org/jira/browse/KAFKA-7312
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Error Message
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
> Stacktrace
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1345)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7312:

Issue Type: Bug  (was: Sub-task)
Parent: (was: KAFKA-7645)

> Transient failure in 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
> 
>
> Key: KAFKA-7312
> URL: https://issues.apache.org/jira/browse/KAFKA-7312
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Error Message
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
> Stacktrace
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1345)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7651:
---

 Summary: Flaky test 
SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
 Key: KAFKA-7651
 URL: https://issues.apache.org/jira/browse/KAFKA-7651
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin


Here is stacktrace from 
https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/

{code}
Error Message
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
Stacktrace
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
at 
kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690150#comment-16690150
 ] 

Dong Lin edited comment on KAFKA-7312 at 11/17/18 12:36 AM:


Here is another stacktrace from 
https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/

{code}
Error Message
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
Stacktrace
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
at 
kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
{code}



was (Author: lindong):
Here is another stacktrace:

{code}
Error Message
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
Stacktrace
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
at 
kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
{code}


> Transient failure in 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
> 
>
> Key: KAFKA-7312
> URL: https://issues.apache.org/jira/browse/KAFKA-7312
> Project: Kafka
>  Issue Type: Sub-task
>  Components: unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Error Message
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
> Stacktrace
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> 

[jira] [Commented] (KAFKA-7649) Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690168#comment-16690168
 ] 

Dong Lin commented on KAFKA-7649:
-

The there is error in the log that says "No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/kafka7346315539242944484.tmp'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn:1011)". The source code confirms that broker 
will fail to start with "java.lang.SecurityException: zookeeper.set.acl is 
true, but the verification of the JAAS login file failed." if broker can not 
find JAAS configuration file.

So the question is why broker fails to find the JAAS configuration file even 
though "startSasl(jaasSections(kafkaServerSaslMechanisms, 
Option(kafkaClientSaslMechanism), Both))" in 
SaslEndToEndAuthorizationTest.setUp() should have created the JAAS 
configuration file. I could not find the root cause yet. 

Since this happens rarely in the integration test and this issue is related to 
the existing of a configuration file during broker initialization. My guess is 
that the bug is related to the test setup, or maybe the temporary file 
`'/tmp/kafka7346315539242944484.tmp` is somehow cleaned up by the test machine. 
Though I am not 100% sure, my opinion is that this is not a blocking issue for 
2.1.0 release.






> Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl
> 
>
> Key: KAFKA-7649
> URL: https://issues.apache.org/jira/browse/KAFKA-7649
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> Observed in 
> https://builds.apache.org/job/kafka-2.1-jdk8/49/testReport/junit/kafka.api/SslEndToEndAuthorizationTest/testNoProduceWithDescribeAcl/
> {code}
> Error Message
> java.lang.SecurityException: zookeeper.set.acl is true, but the verification 
> of the JAAS login file failed.
> Stacktrace
> java.lang.SecurityException: zookeeper.set.acl is true, but the verification 
> of the JAAS login file failed.
>   at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:361)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:202)
>   at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
>   at 
> kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:101)
>   at 
> kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:100)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> kafka.integration.KafkaServerTestHarness.setUp(KafkaServerTestHarness.scala:100)
>   at 
> kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:81)
>   at 
> kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:73)
>   at 
> kafka.api.EndToEndAuthorizationTest.setUp(EndToEndAuthorizationTest.scala:180)
>   at 
> kafka.api.SslEndToEndAuthorizationTest.setUp(SslEndToEndAuthorizationTest.scala:72)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> 

[jira] [Commented] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()

2018-11-16 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690159#comment-16690159
 ] 

Eugen Feller commented on KAFKA-7634:
-

Sure, let me try what I can do. Code looks like this:

 
{code:java}
val table =
  bldr
.table[Key, Value1](
  keySerde,
  valueSerde1,
  topicA,
  stateStoreName
)

val stream1 =
  bldr
.stream[Key, Value2](
  keySerde,
  valueSerde2,
  topicB
)
.filterNot((k: Key, s: Value2) => s == null)

val enrichedStream = stream1
  .leftJoin[Value1, Value3](
table,
joiner,
keySerde,
valueSerde2
  )

val explodedStream =
  bldr
.stream[Mac, Value4](
  keySerde,
  valueSerde4,
  topicC
)
.flatMapValues[Value3]()

val mergedStream = bldr.merge[Key, Value3](enrichedStream, explodedStream)
mergedStream.transform[Key, Value3](transformer).to(keySerde, valueSerde3, 
outputTopic){code}
 

> Punctuate not being called with merge() and/or outerJoin()
> --
>
> Key: KAFKA-7634
> URL: https://issues.apache.org/jira/browse/KAFKA-7634
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3
>Reporter: Eugen Feller
>Priority: Major
>
> Hi all,
> I am using the Processor API and having trouble to get Kafka streams 
> v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). 
> Specifically, I am having a topology where I am doing flatMapValues() -> 
> merge() and/or outerJoin -> transform(). If I dont call merge() and/or 
> outerJoin() before transform(), punctuate is being called as expected.
> Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7402) Kafka Streams should implement AutoCloseable where appropriate

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690153#comment-16690153
 ] 

ASF GitHub Bot commented on KAFKA-7402:
---

cmccabe closed pull request #5839: KAFKA-7402: Kafka Streams should implement 
AutoCloseable where approp…
URL: https://github.com/apache/kafka/pull/5839
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index 763fe512217..6af47058e4e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -40,7 +40,7 @@
  * 
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
-public interface ConsumerInterceptor extends Configurable {
+public interface ConsumerInterceptor extends Configurable, AutoCloseable 
{
 
 /**
  * This is called just before the records are returned by
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d9830877ba7..fa8bab53e72 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -932,7 +932,7 @@ public double measure(MetricConfig config, long now) {
 }
 }
 
-private class HeartbeatThread extends KafkaThread {
+private class HeartbeatThread extends KafkaThread implements AutoCloseable 
{
 private boolean enabled = false;
 private boolean closed = false;
 private AtomicReference failed = new 
AtomicReference<>(null);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index 995bdaa6dce..55d6b25a411 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -25,7 +25,7 @@
  * 
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
-public interface MetricsReporter extends Configurable {
+public interface MetricsReporter extends Configurable, AutoCloseable {
 
 /**
  * This is called when the reporter is first registered to initially 
register all existing metrics
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 47b137512ce..3bca276bc73 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -63,7 +63,7 @@
  * to memory pressure or other reasons
  * 
  */
-public class KafkaChannel {
+public class KafkaChannel implements AutoCloseable {
 private static final long MIN_REAUTH_INTERVAL_ONE_SECOND_NANOS = 1000 * 
1000 * 1000;
 
 /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index b960fcbd942..843d46dc736 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -983,7 +983,7 @@ public int numStagedReceives(KafkaChannel channel) {
 return deque == null ? 0 : deque.size();
 }
 
-private class SelectorMetrics {
+private class SelectorMetrics implements AutoCloseable {
 private final Metrics metrics;
 private final String metricGrpPrefix;
 private final Map metricTags;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 0cc2cec31f9..7512c8273da 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -37,7 +37,7 @@
  * and the builder is closed (e.g. the Producer), it's important to call 
`closeForRecordAppends` when the 

[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690155#comment-16690155
 ] 

Guozhang Wang commented on KAFKA-7531:
--

[~spuzon] I cannot recall there is a config named `session.timeout.ms` on the 
broker side, there are only

`group.min.session.timeout.ms` and `group.max.session.timeout.ms`, and the 
other is `zookeeper.session.timeout.ms` which should be irrelevant.

I looked at the source code carefully around the place you pointed out 
(TransactionCoordinator.scala:393 - 398) but cannot find any obvious link, if 
you can edit the code (seem you can since you are running out of source code) 
could you add some log entry around this piece and see which object actually 
throws the NPE?

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  

[jira] [Commented] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690150#comment-16690150
 ] 

Dong Lin commented on KAFKA-7312:
-

Here is another stacktrace:

{code}
Error Message
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
Stacktrace
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
at 
kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
{code}


> Transient failure in 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
> 
>
> Key: KAFKA-7312
> URL: https://issues.apache.org/jira/browse/KAFKA-7312
> Project: Kafka
>  Issue Type: Sub-task
>  Components: unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Error Message
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
> Stacktrace
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1345)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7650) make "auto.create.topics.enable" dynamically configurable.

2018-11-16 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7650:
-

 Summary: make "auto.create.topics.enable"  dynamically 
configurable. 
 Key: KAFKA-7650
 URL: https://issues.apache.org/jira/browse/KAFKA-7650
 Project: Kafka
  Issue Type: Improvement
Reporter: xiongqi wu
Assignee: xiongqi wu


There are several use cases that we want to make "auto.create.topics.enable" 
can be dynamically configured. 

For example:
1) wild card consumer can recreate deleted topics 
2) We also see misconfigured consumer that consumes from wrong clusters ends up 
with creating a lot of zombie topics in target cluster. 

In such cases, we may want to temporarily disable  "auto.create.topics.enable", 
and re-enable topic creation later after problem is solved without restarting 
brokers. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690140#comment-16690140
 ] 

Dong Lin commented on KAFKA-7648:
-

Currently TestUtils.createTopic(...) will re-send znode creation request to 
zookeeper service if the previous response shows Code.CONNECTIONLOSS. See 
KafkaZkClient.retryRequestsUntilConnected() for related logic.

This means that the test will fail if the zookeeper has created znode upon the 
first request, the response to the first request is lost or timed-out, the 
second request is sent, and the response of the second request shows 
Code.NODEEXISTS.

In order to fix this flaky test, we probably should implement some logic 
similar to KafkaZkClient.CheckedEphemeral() to check whether the znode has been 
created in the with the same session id after receiving Code.NODEEXISTS.



> Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests
> ---
>
> Key: KAFKA-7648
> URL: https://issues.apache.org/jira/browse/KAFKA-7648
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> Observed in 
> [https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/]
>  
> {code}
> Error Message
> org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
> exists.
> h3. Stacktrace
> org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
> exists.
> h3. Standard Output
> [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition topic-3-3 at offset 0 
> (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR 
> [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
> topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client 
> session timed out, have not heard from server in 4000ms for sessionid 
> 0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
> 17:53:14,806] WARN Unable to read additional data from client sessionid 
> 0x10051eebf480003, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] 
> WARN Client session timed out, have not heard from server in 4002ms for 
> sessionid 0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) 
> [2018-11-07 17:53:14,807] WARN Unable to read additional data from client 
> sessionid 0x10051eebf480002, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] 
> WARN Client session timed out, have not heard from server in 4002ms for 
> sessionid 0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) 
> [2018-11-07 17:53:14,824] WARN Unable to read additional data from client 
> sessionid 0x10051eebf480001, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] 
> WARN Client session timed out, have not heard from server in 4002ms for 
> sessionid 0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) 
> [2018-11-07 17:53:15,423] WARN Unable to read additional data from client 
> sessionid 0x10051eebf48, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] 
> WARN fsync-ing the write ahead log in SyncThread:0 took 4456ms which will 
> adversely effect operation latency. See the ZooKeeper troubleshooting guide 
> (org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 
> 17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] 
> Error for partition topic-4-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:23,087] ERROR 
> [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
> invalid-timeout-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:23,088] ERROR 
> [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition 
> invalid-timeout-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:23,137] ERROR 
> [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition 
> invalid-timeout-0 at offset 0 

[jira] [Comment Edited] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690140#comment-16690140
 ] 

Dong Lin edited comment on KAFKA-7648 at 11/16/18 11:47 PM:


Currently TestUtils.createTopic(...) will re-send znode creation request to 
zookeeper service if the previous response shows Code.CONNECTIONLOSS. See 
KafkaZkClient.retryRequestsUntilConnected() for related logic.

This means that the test will fail if the zookeeper has created znode upon the 
first request, the response to the first request is lost or timed-out, the 
second request is sent, and the response of the second request shows 
Code.NODEEXISTS.

In order to fix this flaky test, we probably should implement some logic 
similar to KafkaZkClient.CheckedEphemeral() to check whether the znode has been 
created in the with the same session id after receiving Code.NODEEXISTS.

Given the above understanding and the fact that the test passes with high 
probability, this flaky test does not indicate bug and should not be a blocking 
issue for 2.1.0 release.


was (Author: lindong):
Currently TestUtils.createTopic(...) will re-send znode creation request to 
zookeeper service if the previous response shows Code.CONNECTIONLOSS. See 
KafkaZkClient.retryRequestsUntilConnected() for related logic.

This means that the test will fail if the zookeeper has created znode upon the 
first request, the response to the first request is lost or timed-out, the 
second request is sent, and the response of the second request shows 
Code.NODEEXISTS.

In order to fix this flaky test, we probably should implement some logic 
similar to KafkaZkClient.CheckedEphemeral() to check whether the znode has been 
created in the with the same session id after receiving Code.NODEEXISTS.



> Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests
> ---
>
> Key: KAFKA-7648
> URL: https://issues.apache.org/jira/browse/KAFKA-7648
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> Observed in 
> [https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/]
>  
> {code}
> Error Message
> org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
> exists.
> h3. Stacktrace
> org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
> exists.
> h3. Standard Output
> [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition topic-3-3 at offset 0 
> (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR 
> [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
> topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client 
> session timed out, have not heard from server in 4000ms for sessionid 
> 0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
> 17:53:14,806] WARN Unable to read additional data from client sessionid 
> 0x10051eebf480003, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] 
> WARN Client session timed out, have not heard from server in 4002ms for 
> sessionid 0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) 
> [2018-11-07 17:53:14,807] WARN Unable to read additional data from client 
> sessionid 0x10051eebf480002, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] 
> WARN Client session timed out, have not heard from server in 4002ms for 
> sessionid 0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) 
> [2018-11-07 17:53:14,824] WARN Unable to read additional data from client 
> sessionid 0x10051eebf480001, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] 
> WARN Client session timed out, have not heard from server in 4002ms for 
> sessionid 0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) 
> [2018-11-07 17:53:15,423] WARN Unable to read additional data from client 
> sessionid 0x10051eebf48, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] 
> WARN fsync-ing the write ahead log in SyncThread:0 took 4456ms which will 
> adversely effect operation latency. See the ZooKeeper troubleshooting guide 
> (org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 
> 17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] 

[jira] [Updated] (KAFKA-7622) Add findSessions functionality to ReadOnlySessionStore

2018-11-16 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7622:
-
Labels: needs-kip user-experience  (was: needs-kip)

> Add findSessions functionality to ReadOnlySessionStore
> --
>
> Key: KAFKA-7622
> URL: https://issues.apache.org/jira/browse/KAFKA-7622
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Di Campo
>Priority: Major
>  Labels: needs-kip, user-experience
>
> When creating a session store from the DSL, and you get a 
> {{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in 
> a {{SessionStore}}, even if the key type is a {{Windowed}}. So you would 
> have to iterate through it to find the time-related entries, which should be 
> less efficient than querying by time.
> So the purpose of this ticket is to be able to query the store with (key, 
> time).
> Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
> time-bound access) to {{ReadOnlySessionStore.}}
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7628) KafkaStream is not closing

2018-11-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690138#comment-16690138
 ] 

Guozhang Wang commented on KAFKA-7628:
--

Hmm.. in this case the state should not be transit to `NOT_RUNNING` as the 
stream threads are not fully joined. In the latest version we've slightly 
improved the logic with a shutdown thread whose logic is basically:

{code}
for (final StreamThread thread : threads) {
try {
if (!thread.isRunning()) {
thread.join();
}
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}

if (globalStreamThread != null) {
globalStreamThread.setStateListener(null);
globalStreamThread.shutdown();
}

if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
try {
globalStreamThread.join();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
globalStreamThread = null;
}

adminClient.close();

metrics.close();
setState(State.NOT_RUNNING);
{code}

If the stream-threads have not all joined, it should not proceed to 
`setState(State.NOT_RUNNING)`.

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    

[jira] [Comment Edited] (KAFKA-7486) Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690115#comment-16690115
 ] 

Dong Lin edited comment on KAFKA-7486 at 11/16/18 11:13 PM:


The test calls `AdminUtils.addPartitions` to add partition to the topic after 
the async topic deletion is triggered. It assumes that the topic deletion is 
not completed when `AdminUtils.addPartitions` is executed but this is not 
guaranteed. This is the root cause of the test failure. So it does not indicate 
any bug in the code and thus this is not a blocking issue for 2.1.0 release.


was (Author: lindong):
The test calls `AdminUtils.addPartitions` to add partition to the topic after 
the async topic deletion is triggered. It assumes that the topic deletion is 
not completed when `AdminUtils.addPartitions` is executed but this is not 
guaranteed. This is the root cause of the test failure. So it does not indicate 
any bug in the code and thus this is not a blocking issue for 2.1 release.

> Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`
> --
>
> Key: KAFKA-7486
> URL: https://issues.apache.org/jira/browse/KAFKA-7486
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Priority: Major
>
> Starting to see more of this recently:
> {code}
> 10:06:28 kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic 
> FAILED
> 10:06:28 kafka.admin.AdminOperationException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/test
> 10:06:28 at 
> kafka.zk.AdminZkClient.writeTopicPartitionAssignment(AdminZkClient.scala:162)
> 10:06:28 at 
> kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:102)
> 10:06:28 at 
> kafka.zk.AdminZkClient.addPartitions(AdminZkClient.scala:229)
> 10:06:28 at 
> kafka.admin.DeleteTopicTest.testAddPartitionDuringDeleteTopic(DeleteTopicTest.scala:266)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7446) Better error message to explain the upper limit of TimeWindow

2018-11-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690134#comment-16690134
 ] 

Guozhang Wang commented on KAFKA-7446:
--

[~mrsrinivas] Please feel free to submit the PR.

> Better error message to explain the upper limit of TimeWindow
> -
>
> Key: KAFKA-7446
> URL: https://issues.apache.org/jira/browse/KAFKA-7446
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Jacek Laskowski
>Assignee: Srinivas Reddy
>Priority: Trivial
>  Labels: newbie++
>
> The following code throws a {{IllegalArgumentException}}.
> {code:java}
> import org.apache.kafka.streams.kstream.TimeWindows
> import scala.concurrent.duration._
> val timeWindow = TimeWindows
> .of(1.minute.toMillis)
> .advanceBy(2.minutes.toMillis)
> {code}
> The exception is as follows and it's not clear why {{6}} is the upper 
> limit (not to mention that {{AdvanceMs}} with the uppercase {{A}} did also 
> confuse me).
> {code:java}
> java.lang.IllegalArgumentException: AdvanceMs must lie within interval (0, 
> 6].
> at 
> org.apache.kafka.streams.kstream.TimeWindows.advanceBy(TimeWindows.java:100)
> ... 44 elided{code}
> I think that the message should be more developer-friendly and explain the 
> boundaries, perhaps with an example (and a link to docs)?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7541) Transient Failure: kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690130#comment-16690130
 ] 

Dong Lin commented on KAFKA-7541:
-

According to the source code 
DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(), the test 
will fail with the exception above if leader election is not completed within 
15 seconds. Thus the test may fail if there is long GC. We can reduce the 
chance of the test failure by increasing the wait time.

Given the above understanding and the fact that the test passes with high 
probability, this flaky test does not indicate bug and should not be a blocking 
issue for 2.1.0 release.

> Transient Failure: 
> kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable
> 
>
> Key: KAFKA-7541
> URL: https://issues.apache.org/jira/browse/KAFKA-7541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>
> Observed on Java 11: 
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/264/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/]
>  
> Stacktrace:
> {noformat}
> java.lang.AssertionError: Unclean leader not elected
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:487)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> 

[jira] [Commented] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate

2018-11-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690118#comment-16690118
 ] 

Guozhang Wang commented on KAFKA-6567:
--

[~Samuel Hawker] Sure, go ahead and submit the PR :)

> KStreamWindowReduce can be replaced by KStreamWindowAggregate
> -
>
> Key: KAFKA-6567
> URL: https://issues.apache.org/jira/browse/KAFKA-6567
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Yaswanth Kumar
>Priority: Major
>  Labels: newbie
>
> This is a tech debt worth cleaning up: KStreamWindowReduce should be able to 
> be replaced by KStreamWindowAggregate. In fact, we have already done this for 
> session windows, where in {{SessionWindowedKStreamImpl}} we use 
> {{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}} 
> to replace reducer with aggregator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()

2018-11-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690116#comment-16690116
 ] 

Guozhang Wang commented on KAFKA-7634:
--

Could you share your code snippet around the punctuation calls?

> Punctuate not being called with merge() and/or outerJoin()
> --
>
> Key: KAFKA-7634
> URL: https://issues.apache.org/jira/browse/KAFKA-7634
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3
>Reporter: Eugen Feller
>Priority: Major
>
> Hi all,
> I am using the Processor API and having trouble to get Kafka streams 
> v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). 
> Specifically, I am having a topology where I am doing flatMapValues() -> 
> merge() and/or outerJoin -> transform(). If I dont call merge() and/or 
> outerJoin() before transform(), punctuate is being called as expected.
> Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7486) Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690115#comment-16690115
 ] 

Dong Lin commented on KAFKA-7486:
-

The test calls `AdminUtils.addPartitions` to add partition to the topic after 
the async topic deletion is triggered. It assumes that the topic deletion is 
not completed when `AdminUtils.addPartitions` is executed but this is not 
guaranteed. This is the root cause of the test failure. So it does not indicate 
any bug in the code and thus this is not a blocking issue for 2.1 release.

> Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`
> --
>
> Key: KAFKA-7486
> URL: https://issues.apache.org/jira/browse/KAFKA-7486
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Priority: Major
>
> Starting to see more of this recently:
> {code}
> 10:06:28 kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic 
> FAILED
> 10:06:28 kafka.admin.AdminOperationException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/test
> 10:06:28 at 
> kafka.zk.AdminZkClient.writeTopicPartitionAssignment(AdminZkClient.scala:162)
> 10:06:28 at 
> kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:102)
> 10:06:28 at 
> kafka.zk.AdminZkClient.addPartitions(AdminZkClient.scala:229)
> 10:06:28 at 
> kafka.admin.DeleteTopicTest.testAddPartitionDuringDeleteTopic(DeleteTopicTest.scala:266)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7541) Transient Failure: kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7541:

Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-7645

> Transient Failure: 
> kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable
> 
>
> Key: KAFKA-7541
> URL: https://issues.apache.org/jira/browse/KAFKA-7541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>
> Observed on Java 11: 
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/264/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/]
>  
> Stacktrace:
> {noformat}
> java.lang.AssertionError: Unclean leader not elected
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:487)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> 

[jira] [Created] (KAFKA-7646) Flaky test SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7646:
---

 Summary: Flaky test 
SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe
 Key: KAFKA-7646
 URL: https://issues.apache.org/jira/browse/KAFKA-7646
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin


This test is reported to fail by [~enothereska] as part of 2.1.0 RC0 release 
certification.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7649) Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7649:

Issue Type: Sub-task  (was: Task)
Parent: KAFKA-7645

> Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl
> 
>
> Key: KAFKA-7649
> URL: https://issues.apache.org/jira/browse/KAFKA-7649
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> Observed in 
> https://builds.apache.org/job/kafka-2.1-jdk8/49/testReport/junit/kafka.api/SslEndToEndAuthorizationTest/testNoProduceWithDescribeAcl/
> {code}
> Error Message
> java.lang.SecurityException: zookeeper.set.acl is true, but the verification 
> of the JAAS login file failed.
> Stacktrace
> java.lang.SecurityException: zookeeper.set.acl is true, but the verification 
> of the JAAS login file failed.
>   at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:361)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:202)
>   at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
>   at 
> kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:101)
>   at 
> kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:100)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> kafka.integration.KafkaServerTestHarness.setUp(KafkaServerTestHarness.scala:100)
>   at 
> kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:81)
>   at 
> kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:73)
>   at 
> kafka.api.EndToEndAuthorizationTest.setUp(EndToEndAuthorizationTest.scala:180)
>   at 
> kafka.api.SslEndToEndAuthorizationTest.setUp(SslEndToEndAuthorizationTest.scala:72)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   

[jira] [Created] (KAFKA-7649) Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7649:
---

 Summary: Flaky test 
SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl
 Key: KAFKA-7649
 URL: https://issues.apache.org/jira/browse/KAFKA-7649
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin


Observed in 
https://builds.apache.org/job/kafka-2.1-jdk8/49/testReport/junit/kafka.api/SslEndToEndAuthorizationTest/testNoProduceWithDescribeAcl/

{code}
Error Message
java.lang.SecurityException: zookeeper.set.acl is true, but the verification of 
the JAAS login file failed.
Stacktrace
java.lang.SecurityException: zookeeper.set.acl is true, but the verification of 
the JAAS login file failed.
at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:361)
at kafka.server.KafkaServer.startup(KafkaServer.scala:202)
at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
at 
kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:101)
at 
kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:100)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
kafka.integration.KafkaServerTestHarness.setUp(KafkaServerTestHarness.scala:100)
at 
kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:81)
at 
kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:73)
at 
kafka.api.EndToEndAuthorizationTest.setUp(EndToEndAuthorizationTest.scala:180)
at 
kafka.api.SslEndToEndAuthorizationTest.setUp(SslEndToEndAuthorizationTest.scala:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 

[jira] [Updated] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7648:

Description: 
Observed in 
[https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/]

 

{code}

Error Message

org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
exists.
h3. Stacktrace

org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
exists.
h3. Standard Output

[2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=0] Error for partition topic-3-3 at offset 0 
(kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR 
[ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client 
session timed out, have not heard from server in 4000ms for sessionid 
0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:14,806] WARN Unable to read additional data from client sessionid 
0x10051eebf480003, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] WARN 
Client session timed out, have not heard from server in 4002ms for sessionid 
0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:14,807] WARN Unable to read additional data from client sessionid 
0x10051eebf480002, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] WARN 
Client session timed out, have not heard from server in 4002ms for sessionid 
0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:14,824] WARN Unable to read additional data from client sessionid 
0x10051eebf480001, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] WARN 
Client session timed out, have not heard from server in 4002ms for sessionid 
0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:15,423] WARN Unable to read additional data from client sessionid 
0x10051eebf48, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] WARN 
fsync-ing the write ahead log in SyncThread:0 took 4456ms which will adversely 
effect operation latency. See the ZooKeeper troubleshooting guide 
(org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 
17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error 
for partition topic-4-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:23,087] ERROR 
[ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
invalid-timeout-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:23,088] ERROR 
[ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition 
invalid-timeout-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:23,137] ERROR 
[ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition 
invalid-timeout-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
  
{code}

  was:
Observed in 
[https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/]

 
h3. Error Message
org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
exists.
h3. Stacktrace
org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
exists.
h3. Standard Output
[2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=0] Error for partition topic-3-3 at offset 0 
(kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR 
[ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client 
session timed out, have not heard 

[jira] [Updated] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7312:

Issue Type: Sub-task  (was: Improvement)
Parent: KAFKA-7645

> Transient failure in 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
> 
>
> Key: KAFKA-7312
> URL: https://issues.apache.org/jira/browse/KAFKA-7312
> Project: Kafka
>  Issue Type: Sub-task
>  Components: unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Error Message
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
> Stacktrace
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1345)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7648:
---

 Summary: Flaky test 
DeleteTopicsRequestTest.testValidDeleteTopicRequests
 Key: KAFKA-7648
 URL: https://issues.apache.org/jira/browse/KAFKA-7648
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin


Observed in 
[https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/]

 
h3. Error Message
org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
exists.
h3. Stacktrace
org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
exists.
h3. Standard Output
[2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=0] Error for partition topic-3-3 at offset 0 
(kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR 
[ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client 
session timed out, have not heard from server in 4000ms for sessionid 
0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:14,806] WARN Unable to read additional data from client sessionid 
0x10051eebf480003, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] WARN 
Client session timed out, have not heard from server in 4002ms for sessionid 
0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:14,807] WARN Unable to read additional data from client sessionid 
0x10051eebf480002, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] WARN 
Client session timed out, have not heard from server in 4002ms for sessionid 
0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:14,824] WARN Unable to read additional data from client sessionid 
0x10051eebf480001, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] WARN 
Client session timed out, have not heard from server in 4002ms for sessionid 
0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:15,423] WARN Unable to read additional data from client sessionid 
0x10051eebf48, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] WARN 
fsync-ing the write ahead log in SyncThread:0 took 4456ms which will adversely 
effect operation latency. See the ZooKeeper troubleshooting guide 
(org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 
17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error 
for partition topic-4-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:23,087] ERROR 
[ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
invalid-timeout-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:23,088] ERROR 
[ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition 
invalid-timeout-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:23,137] ERROR 
[ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition 
invalid-timeout-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
 
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7648:

Issue Type: Sub-task  (was: Task)
Parent: KAFKA-7645

> Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests
> ---
>
> Key: KAFKA-7648
> URL: https://issues.apache.org/jira/browse/KAFKA-7648
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> Observed in 
> [https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/]
>  
> h3. Error Message
> org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
> exists.
> h3. Stacktrace
> org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
> exists.
> h3. Standard Output
> [2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition topic-3-3 at offset 0 
> (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR 
> [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
> topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client 
> session timed out, have not heard from server in 4000ms for sessionid 
> 0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
> 17:53:14,806] WARN Unable to read additional data from client sessionid 
> 0x10051eebf480003, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] 
> WARN Client session timed out, have not heard from server in 4002ms for 
> sessionid 0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) 
> [2018-11-07 17:53:14,807] WARN Unable to read additional data from client 
> sessionid 0x10051eebf480002, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] 
> WARN Client session timed out, have not heard from server in 4002ms for 
> sessionid 0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) 
> [2018-11-07 17:53:14,824] WARN Unable to read additional data from client 
> sessionid 0x10051eebf480001, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] 
> WARN Client session timed out, have not heard from server in 4002ms for 
> sessionid 0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) 
> [2018-11-07 17:53:15,423] WARN Unable to read additional data from client 
> sessionid 0x10051eebf48, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] 
> WARN fsync-ing the write ahead log in SyncThread:0 took 4456ms which will 
> adversely effect operation latency. See the ZooKeeper troubleshooting guide 
> (org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 
> 17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] 
> Error for partition topic-4-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:23,087] ERROR 
> [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
> invalid-timeout-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:23,088] ERROR 
> [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition 
> invalid-timeout-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition. [2018-11-07 17:53:23,137] ERROR 
> [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition 
> invalid-timeout-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7647) Flaky test LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7647:

Issue Type: Sub-task  (was: Task)
Parent: KAFKA-7645

> Flaky test 
> LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic
> -
>
> Key: KAFKA-7647
> URL: https://issues.apache.org/jira/browse/KAFKA-7647
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> kafka.log.LogCleanerParameterizedIntegrationTest >
> testCleansCombinedCompactAndDeleteTopic[3] FAILED
>     java.lang.AssertionError: Contents of the map shouldn't change
> expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 ->
> (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353),
> 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 ->
> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but
> was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354),
> 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 ->
> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 ->
> (299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 ->
> (355,355))>
>         at org.junit.Assert.fail(Assert.java:88)
>         at org.junit.Assert.failNotEquals(Assert.java:834)
>         at org.junit.Assert.assertEquals(Assert.java:118)
>         at
> kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7647) Flaky test LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7647:
---

 Summary: Flaky test 
LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic
 Key: KAFKA-7647
 URL: https://issues.apache.org/jira/browse/KAFKA-7647
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin


kafka.log.LogCleanerParameterizedIntegrationTest >
testCleansCombinedCompactAndDeleteTopic[3] FAILED
    java.lang.AssertionError: Contents of the map shouldn't change
expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 ->
(354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353),
2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
(343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 ->
(348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but
was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354),
1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 ->
(342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
(343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 ->
(299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 ->
(355,355))>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:118)
        at
kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7646) Flaky test SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7646:

Issue Type: Sub-task  (was: Task)
Parent: KAFKA-7645

> Flaky test 
> SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe
> ---
>
> Key: KAFKA-7646
> URL: https://issues.apache.org/jira/browse/KAFKA-7646
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> This test is reported to fail by [~enothereska] as part of 2.1.0 RC0 release 
> certification.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7486) Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`

2018-11-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7486:

Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-7645

> Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`
> --
>
> Key: KAFKA-7486
> URL: https://issues.apache.org/jira/browse/KAFKA-7486
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Priority: Major
>
> Starting to see more of this recently:
> {code}
> 10:06:28 kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic 
> FAILED
> 10:06:28 kafka.admin.AdminOperationException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/test
> 10:06:28 at 
> kafka.zk.AdminZkClient.writeTopicPartitionAssignment(AdminZkClient.scala:162)
> 10:06:28 at 
> kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:102)
> 10:06:28 at 
> kafka.zk.AdminZkClient.addPartitions(AdminZkClient.scala:229)
> 10:06:28 at 
> kafka.admin.DeleteTopicTest.testAddPartitionDuringDeleteTopic(DeleteTopicTest.scala:266)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7645) Fix flaky unit test for 2.1 branch

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7645:
---

 Summary: Fix flaky unit test for 2.1 branch
 Key: KAFKA-7645
 URL: https://issues.apache.org/jira/browse/KAFKA-7645
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin
Assignee: Dong Lin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-16 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690102#comment-16690102
 ] 

Boyang Chen edited comment on KAFKA-7641 at 11/16/18 10:50 PM:
---

[~enether] Thanks! I think we need to propose two KIPs here: 1. require member 
id during new member join group request 2. enforce group.max.size. I could take 
this one as a practice for going through the consumer protocol change, do you 
want to work on #2 here?


was (Author: bchen225242):
[~enether] Thanks! I think we need to propose two KIPs here: 1. require member 
id during new member join group request 2. enforce group.max.size. I could take 
this one as a practice for going through the consumer protocol change, do you 
want to work on 2. here?

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-16 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690102#comment-16690102
 ] 

Boyang Chen commented on KAFKA-7641:


[~enether] Thanks! I think we need to propose two KIPs here: 1. require member 
id during new member join group request 2. enforce group.max.size. I could take 
this one as a practice for going through the consumer protocol change, do you 
want to work on 2. here?

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7486) Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`

2018-11-16 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690099#comment-16690099
 ] 

Dong Lin commented on KAFKA-7486:
-

Hey [~chia7712], I think [~hachikuji] probably missed your message. I am sure 
[~hachikuji] (and myself) is happy for your to help take this JIRA.

> Flaky test `DeleteTopicTest.testAddPartitionDuringDeleteTopic`
> --
>
> Key: KAFKA-7486
> URL: https://issues.apache.org/jira/browse/KAFKA-7486
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> Starting to see more of this recently:
> {code}
> 10:06:28 kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic 
> FAILED
> 10:06:28 kafka.admin.AdminOperationException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/test
> 10:06:28 at 
> kafka.zk.AdminZkClient.writeTopicPartitionAssignment(AdminZkClient.scala:162)
> 10:06:28 at 
> kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:102)
> 10:06:28 at 
> kafka.zk.AdminZkClient.addPartitions(AdminZkClient.scala:229)
> 10:06:28 at 
> kafka.admin.DeleteTopicTest.testAddPartitionDuringDeleteTopic(DeleteTopicTest.scala:266)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7644) Worker Re balance enhancements

2018-11-16 Thread satya (JIRA)
satya created KAFKA-7644:


 Summary: Worker Re balance enhancements
 Key: KAFKA-7644
 URL: https://issues.apache.org/jira/browse/KAFKA-7644
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: satya


Currently Kafka Connect distributed worker triggers a re balance any time there 
is a new connector/task is added irrespective of whether the connector added is 
a source connector or sink connector. 

My understanding has been the worker re balance should be identical to consumer 
group re balance. That said, should not source connectors be immune to the re 
balance ?

Are we not supposed to use source connectors with distributed workers ?

It does appear to me there is some caveat in the way the worker re balance is 
working and it needs enhancement to not trigger unwanted re balances causing 
restarts of all tasks etc.

Kafka connectors should have a way to not restart and stay with existing 
partition assignment if the re balance trigger is related to a different 
connector

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()

2018-11-16 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690046#comment-16690046
 ] 

Eugen Feller commented on KAFKA-7634:
-

[~guozhang] In the meantime I found that if I materialize the merged stream via 
through() and call transform(), it works.

> Punctuate not being called with merge() and/or outerJoin()
> --
>
> Key: KAFKA-7634
> URL: https://issues.apache.org/jira/browse/KAFKA-7634
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3
>Reporter: Eugen Feller
>Priority: Major
>
> Hi all,
> I am using the Processor API and having trouble to get Kafka streams 
> v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). 
> Specifically, I am having a topology where I am doing flatMapValues() -> 
> merge() and/or outerJoin -> transform(). If I dont call merge() and/or 
> outerJoin() before transform(), punctuate is being called as expected.
> Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7642) Kafka Connect - graceful shutdown of distributed worker

2018-11-16 Thread satya (JIRA)
satya created KAFKA-7642:


 Summary: Kafka Connect - graceful shutdown of distributed worker
 Key: KAFKA-7642
 URL: https://issues.apache.org/jira/browse/KAFKA-7642
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: satya


Currently i dont find any ability to gracefully shutdown a distributed worker 
other than killing

the process .

 

Could you a shutdown option to the workers.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5946) Give connector method parameter better name

2018-11-16 Thread Samuel Hawker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690043#comment-16690043
 ] 

Samuel Hawker commented on KAFKA-5946:
--

Hi, I notice this has not seen any activity in many months, if 
[~tanvijaywant31] is not still working on it, can I pick up the ticket? :)

Just a few questions - I understand the part about renaming the class to 
connClass, but what do you mean by standardise? In this context does it just 
mean extend the class with these extra fields?

> Give connector method parameter better name
> ---
>
> Key: KAFKA-5946
> URL: https://issues.apache.org/jira/browse/KAFKA-5946
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Tanvi Jaywant
>Priority: Major
>  Labels: connector, newbie
>
> During the development of KAFKA-5657, there were several iterations where 
> method call didn't match what the connector parameter actually represents.
> [~ewencp] had used connType as equivalent to connClass because Type wasn't 
> used to differentiate source vs sink.
> [~ewencp] proposed the following:
> {code}
> It would help to convert all the uses of connType to connClass first, then 
> standardize on class == java class, type == source/sink, name == 
> user-specified name.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7643) Connectors do not unload even when tasks fail in kafka connect

2018-11-16 Thread satya (JIRA)
satya created KAFKA-7643:


 Summary: Connectors do not unload even when tasks fail in kafka 
connect
 Key: KAFKA-7643
 URL: https://issues.apache.org/jira/browse/KAFKA-7643
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: satya


If there any issues in the tasks associated with a connector, even though the 
tasks fail, i have seen the connector themselves is not released many times. 

The only option out of this is like submitting an explicit request to delete 
the connector.

There should be a way to shut down the connectors gracefully, if there are 
exceptions encountered in the task that are not retriable.

In addition, Kafka connect also does not have a graceful exit option. There 
will be situations like server maintenance, outages etc where it would be 
prudent to gracefully shutdown the connectors rather than performing a DELETE 
through the REST endpoint.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5946) Give connector method parameter better name

2018-11-16 Thread Samuel Hawker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690043#comment-16690043
 ] 

Samuel Hawker edited comment on KAFKA-5946 at 11/16/18 9:58 PM:


Hi, I notice this has not seen any activity in many months, if 
[~tanvijaywant31] is not still working on it, can I pick up the ticket? :)

Just a few questions - I understand the part about renaming the class to 
connClass, but what do you mean by standardise? In this context does it just 
mean extend the class with these extra fields?
(The class I am referring to is ConnectorType)


was (Author: samuel hawker):
Hi, I notice this has not seen any activity in many months, if 
[~tanvijaywant31] is not still working on it, can I pick up the ticket? :)

Just a few questions - I understand the part about renaming the class to 
connClass, but what do you mean by standardise? In this context does it just 
mean extend the class with these extra fields?

> Give connector method parameter better name
> ---
>
> Key: KAFKA-5946
> URL: https://issues.apache.org/jira/browse/KAFKA-5946
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Tanvi Jaywant
>Priority: Major
>  Labels: connector, newbie
>
> During the development of KAFKA-5657, there were several iterations where 
> method call didn't match what the connector parameter actually represents.
> [~ewencp] had used connType as equivalent to connClass because Type wasn't 
> used to differentiate source vs sink.
> [~ewencp] proposed the following:
> {code}
> It would help to convert all the uses of connType to connClass first, then 
> standardize on class == java class, type == source/sink, name == 
> user-specified name.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate

2018-11-16 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690040#comment-16690040
 ] 

John Roesler commented on KAFKA-6567:
-

This seems fine to me. Unless [~sawyna] wants to protest, I'd say to go ahead 
and submit the PR.

> KStreamWindowReduce can be replaced by KStreamWindowAggregate
> -
>
> Key: KAFKA-6567
> URL: https://issues.apache.org/jira/browse/KAFKA-6567
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Yaswanth Kumar
>Priority: Major
>  Labels: newbie
>
> This is a tech debt worth cleaning up: KStreamWindowReduce should be able to 
> be replaced by KStreamWindowAggregate. In fact, we have already done this for 
> session windows, where in {{SessionWindowedKStreamImpl}} we use 
> {{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}} 
> to replace reducer with aggregator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate

2018-11-16 Thread Samuel Hawker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689983#comment-16689983
 ] 

Samuel Hawker edited comment on KAFKA-6567 at 11/16/18 9:38 PM:


Hi, I notice this has not seen any activity in many months, if [~sawyna] is not 
still working on it, can I pick up the ticket? :)


(I have the code changes ready to submit as a Pull request)


was (Author: samuel hawker):
Hi, I notice this has not seen any activity in many months, if [~sawyna] is not 
still working on it, can I pick up the ticket? :)

> KStreamWindowReduce can be replaced by KStreamWindowAggregate
> -
>
> Key: KAFKA-6567
> URL: https://issues.apache.org/jira/browse/KAFKA-6567
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Yaswanth Kumar
>Priority: Major
>  Labels: newbie
>
> This is a tech debt worth cleaning up: KStreamWindowReduce should be able to 
> be replaced by KStreamWindowAggregate. In fact, we have already done this for 
> session windows, where in {{SessionWindowedKStreamImpl}} we use 
> {{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}} 
> to replace reducer with aggregator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-16 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690013#comment-16690013
 ] 

Stanislav Kozlovski commented on KAFKA-7641:


Thanks for opening the ticket [~bchen225242] . This change will require a KIP 
as it is a public interface change. I'm willing to go ahead with this if you'd 
like, as I know you're already working on KIP-345.

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-16 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-7641:
--

Assignee: Boyang Chen

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()

2018-11-16 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690001#comment-16690001
 ] 

Eugen Feller commented on KAFKA-7634:
-

Hi [~guozhang] Thanks a lot for your quick reply. I will give it a try with one 
service. We have a mono repo. Changing the library in a major version would 
require me to change a lot of services. I am currently using 
[https://github.com/manub/scalatest-embedded-kafka] to provide embedded Kafka 
for some unit tests. For others, I am using 
[https://github.com/jpzk/mockedstreams], which is a wrapper around 
TopologyTestDriver. Will try to use the TopologyTestDriver directly to 
reproduce.

> Punctuate not being called with merge() and/or outerJoin()
> --
>
> Key: KAFKA-7634
> URL: https://issues.apache.org/jira/browse/KAFKA-7634
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3
>Reporter: Eugen Feller
>Priority: Major
>
> Hi all,
> I am using the Processor API and having trouble to get Kafka streams 
> v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). 
> Specifically, I am having a topology where I am doing flatMapValues() -> 
> merge() and/or outerJoin -> transform(). If I dont call merge() and/or 
> outerJoin() before transform(), punctuate is being called as expected.
> Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate

2018-11-16 Thread Samuel Hawker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689983#comment-16689983
 ] 

Samuel Hawker commented on KAFKA-6567:
--

Hi, I notice this has not seen any activity in many months, if [~sawyna] is not 
still working on it, can I pick up the ticket? :)

> KStreamWindowReduce can be replaced by KStreamWindowAggregate
> -
>
> Key: KAFKA-6567
> URL: https://issues.apache.org/jira/browse/KAFKA-6567
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Yaswanth Kumar
>Priority: Major
>  Labels: newbie
>
> This is a tech debt worth cleaning up: KStreamWindowReduce should be able to 
> be replaced by KStreamWindowAggregate. In fact, we have already done this for 
> session windows, where in {{SessionWindowedKStreamImpl}} we use 
> {{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}} 
> to replace reducer with aggregator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4453) add request prioritization

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689957#comment-16689957
 ] 

ASF GitHub Bot commented on KAFKA-4453:
---

gitlw closed pull request #5783: KAFKA-4453: Separating controller connections 
and requests from the data plane (KIP-291)
URL: https://github.com/apache/kafka/pull/5783
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ecf6fbf33f1..29e10383afe 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -108,14 +108,16 @@ class ControllerChannelManager(controllerContext: 
ControllerContext, config: Kaf
   private def addNewBroker(broker: Broker) {
 val messageQueue = new LinkedBlockingQueue[QueueItem]
 debug(s"Controller ${config.brokerId} trying to connect to broker 
${broker.id}")
-val brokerNode = broker.node(config.interBrokerListenerName)
+val controlPlaneListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+val controlPlaneSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+val brokerNode = broker.node(controlPlaneListenerName)
 val logContext = new LogContext(s"[Controller id=${config.brokerId}, 
targetBrokerId=${brokerNode.idString}] ")
 val networkClient = {
   val channelBuilder = ChannelBuilders.clientChannelBuilder(
-config.interBrokerSecurityProtocol,
+controlPlaneSecurityProtocol,
 JaasContext.Type.SERVER,
 config,
-config.interBrokerListenerName,
+controlPlaneListenerName,
 config.saslMechanismInterBrokerProtocol,
 config.saslInterBrokerHandshakeRequestEnable
   )
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 00b09688c5b..0bc8a2a6ff9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -40,6 +40,7 @@ object RequestChannel extends Logging {
   private val requestLogger = Logger("kafka.request.logger")
 
   val RequestQueueSizeMetric = "RequestQueueSize"
+  val ControlPlaneRequestQueueSizeMetric = "ControlPlaneRequestQueueSize"
   val ResponseQueueSizeMetric = "ResponseQueueSize"
   val ProcessorMetricTag = "processor"
 
@@ -272,13 +273,13 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int, val requestQueueSizeMetric: String) 
extends KafkaMetricsGroup {
   import RequestChannel._
   val metrics = new RequestChannel.Metrics
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
 
-  newGauge(RequestQueueSizeMetric, new Gauge[Int] {
+  newGauge(requestQueueSizeMetric, new Gauge[Int] {
   def value = requestQueue.size
   })
 
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 1365f90f763..c3dac79f5f9 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -65,8 +65,14 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   private val memoryPoolDepletedTimeMetricName = 
metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics")
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
-  val requestChannel = new RequestChannel(maxQueuedRequests)
-  private val processors = new ConcurrentHashMap[Int, Processor]()
+  val dataRequestChannel = new RequestChannel(maxQueuedRequests, 
RequestChannel.RequestQueueSizeMetric)
+  var controlPlaneRequestChannel: RequestChannel = null
+  if (config.controlPlaneListenerName.isDefined) {
+controlPlaneRequestChannel = new RequestChannel(20, 
RequestChannel.ControlPlaneRequestQueueSizeMetric)
+  }
+  private val dataProcessors = new ConcurrentHashMap[Int, Processor]()
+  // there should be only one controller processor, however we use a map to 
store it so that we can reuse the logic for data processors
+  private[network] val controlPlaneProcessors = 

[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-16 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-7641:
---
Description: 
In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason 
concluded an edge case of current consumer protocol which could cause memory 
burst on broker side:

```the case we observed in practice was caused by a consumer that was slow to 
rejoin the group after a rebalance had begun. At the same time, there were new 
members that were trying to join the group for the first time. The request 
timeout was significantly lower than the rebalance timeout, so the JoinGroup of 
the new members kept timing out. The timeout caused a retry and the group size 
eventually become quite large because we could not detect the fact that the new 
members were no longer there.```

Since many disorganized join group requests are spamming the group metadata, we 
should define a cap on broker side to avoid one consumer group from growing too 
large. So far I feel it's appropriate to introduce this as a server config 
since most times this value is only dealing with error scenarios, client users 
shouldn't worry about this config.

 

  was:
In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason 
concluded an edge case of current consumer protocol which could cause memory 
burst on broker side:

```the case we observed in practice was caused by a consumer that was slow to 
rejoin the group after a rebalance had begun. At the same time, there were new 
members that were trying to join the group for the first time. The request 
timeout was significantly lower than the rebalance timeout, so the JoinGroup of 
the new members kept timing out. The timeout caused a retry and the group size 
eventually become quite large because we could not detect the fact that the new 
members were no longer there.```

Since many disorganized join group requests are spamming the group metadata, we 
should define a cap on broker side to avoid one consumer group from growing too 
large. So far I feel it's appropriate to introduce this as a server config 
since most times this value is only dealing with error scenarios.

 


> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-16 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689925#comment-16689925
 ] 

Boyang Chen commented on KAFKA-7641:


[~hachikuji] [~enether]

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-16 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-7641:
---
Description: 
In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason 
concluded an edge case of current consumer protocol which could cause memory 
burst on broker side:

```the case we observed in practice was caused by a consumer that was slow to 
rejoin the group after a rebalance had begun. At the same time, there were new 
members that were trying to join the group for the first time. The request 
timeout was significantly lower than the rebalance timeout, so the JoinGroup of 
the new members kept timing out. The timeout caused a retry and the group size 
eventually become quite large because we could not detect the fact that the new 
members were no longer there.```

Since many disorganized join group requests are spamming the group metadata, we 
should define a cap on broker side to avoid one consumer group from growing too 
large. So far I feel it's appropriate to introduce this as a server config 
since most times this value is only dealing with error scenarios.

 

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-16 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-7641:
---
Summary: Add `consumer.group.max.size` to cap consumer metadata size on 
broker  (was: Add `group.max.size` to cap group metadata on broker)

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7641) Add `group.max.size` to cap group metadata on broker

2018-11-16 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7641:
--

 Summary: Add `group.max.size` to cap group metadata on broker
 Key: KAFKA-7641
 URL: https://issues.apache.org/jira/browse/KAFKA-7641
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-16 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689916#comment-16689916
 ] 

Boyang Chen commented on KAFKA-7610:


Sounds good Jason! I think combining `group.max.size` and member id requirement 
in join group request should be sufficient for the solving the above scenario. 
I will open a separate Jira for group size, in the meanwhile I think the 
conclusion is that Jason's original proposal 2 should be sufficient to mitigate 
the problem.

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb

2018-11-16 Thread hitesh gollahalli bachanna (JIRA)
hitesh gollahalli bachanna created KAFKA-7640:
-

 Summary: Kafka stream interactive query not returning data when 
state is backed by rocksdb
 Key: KAFKA-7640
 URL: https://issues.apache.org/jira/browse/KAFKA-7640
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.0
Reporter: hitesh gollahalli bachanna


I have a kafka stream app running with 36 different instance (one for each 
partition). Each instance come up one after the other. And I am building rest 
service on top of the state to access the data.

Here some code that I use:
{code:java}
StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); --> 
call this find ouy which host has the key
if (localSelf.host().equals(hostStoreInfo.getHost())) {
get the key from local store
}
else {
call the remote host using restTemplate
}{code}
The problem now is `metadata` object returned has a different host/ip but the 
data is on a different node. I was able to see using some application logs I 
printed. This happens every time I start my application.

The `allMetadata` method in `KafkaStreams` class says the value will be update 
as when the partition get reassigned. But its not happening in this case. 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-16 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689866#comment-16689866
 ] 

Jason Gustafson commented on KAFKA-7610:


For some background, the case we observed in practice was caused by a consumer 
that was slow to rejoin the group after a rebalance had begun. At the same 
time, there were new members that were trying to join the group for the first 
time. The request timeout was significantly lower than the rebalance timeout, 
so the JoinGroup of the new members kept timing out. The timeout caused a retry 
and the group size eventually become quite large because we could not detect 
the fact that the new members were no longer there. Although I think it may be 
useful 

As Stanislav mentions, the main advantage of detecting disconnected consumers 
is that it would work for older clients. Probably we should just do this if the 
complexity is not too high. It is not a complete solution though because 
detection of a connection which did not close cleanly will still take a 
significant amount of time. I think we should consider the protocol improvement 
in addition to this so that we have direct control over failure detection going 
forward.

Furthermore, I think our approach of holding JoinGroup requests in purgatory 
for long amounts of times has been problematic. It forces users to tradeoff the 
time to detect failed connections using `request.timeout.ms` with the time 
needed to complete a rebalance in the worst case. We kind of hacked around this 
in the consumer by allowing the JoinGroup to override the `request.timeout.ms` 
provided by the user. So users can provide a reasonable timeout value and 
detect failures in a reasonable time for every other API, but JoinGroup is 
still an issue. 

Really I'm not sure it's ever a good idea to have a request sitting on the 
broker for minutes. For example, this has bad interplay with the idle 
connection timeout. The broker will proactively close connections that aren't 
seeing any activity even if we've still got a JoinGroup request sitting in 
purgatory. It would be better to move to a model which incorporated client 
polling so that request times could be reasonably controlled. Of course we also 
don't want to the JoinGroup to introduce a lot of new request traffic, so 
holding the request for shorter durations may be reasonable. In any case, if we 
solve this problem, then we probably also solve the issue here.

As for `group.max.size`, I agree with Stanislav and Boyang that it could be 
helpful, but I think that is a separate discussion. Even if we had this, we 
would still want a way to detect failures for the initial join. Perhaps one of 
you can open a JIRA?

TLDR;
1. We probably should do the disconnect option as an initial step since it 
addresses the problem for all client version.
2. We should also consider protocol improvements to avoid holding the JoinGroup 
requests in purgatory for indefinite amounts of time.

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the 

[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-16 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689838#comment-16689838
 ] 

Matthias J. Sax commented on KAFKA-7595:


Glad it works. Thanks for confirmation!

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4453) add request prioritization

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689832#comment-16689832
 ] 

ASF GitHub Bot commented on KAFKA-4453:
---

MayureshGharat opened a new pull request #5921: KAFKA-4453 : Added code to 
separate controller connections and requests from the data plane
URL: https://github.com/apache/kafka/pull/5921
 
 
   KIP-291 Implementation : Added code to separate controller connections and 
requests from the data plane.
   
   - Tested with local deployment that the controller request are handled by 
the control plane and other requests are handled by the data plane.
   - Also added unit tests in order to test the functionality.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by 

[jira] [Commented] (KAFKA-7639) Read one request at a time from socket to reduce broker memory usage

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689730#comment-16689730
 ] 

ASF GitHub Bot commented on KAFKA-7639:
---

rajinisivaram opened a new pull request #5920: [DO NOT MERGE] KAFKA-7639: Read 
one request at a time from socket to avoid OOM
URL: https://github.com/apache/kafka/pull/5920
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Read one request at a time from socket to reduce broker memory usage
> 
>
> Key: KAFKA-7639
> URL: https://issues.apache.org/jira/browse/KAFKA-7639
> Project: Kafka
>  Issue Type: Improvement
>  Components: network
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.2.0
>
>
> Broker's Selector currently reads all requests available on the socket when 
> the socket is ready for read. These are queued up as staged receives. We mute 
> the channel and stop reading any more data until all the staged requests are 
> processed. This behaviour is slightly inconsistent since for the initial read 
> we drain the socket buffer, allowing it to get filled up again, but if data 
> arrives slighly after the initial read, then we dont read from the socket 
> buffer until pending requests are processed.
> To avoid holding onto requests for longer than required, we should read one 
> request at a time even if more data is available in the socket buffer. This 
> is especially useful for produce requests which may be large and may take 
> long to process.
> Note that with the default socket read buffer size of 100K, this is not a 
> critical issue. But with larger socket buffers, this could result in 
> excessive memory usage if a lot of produce requests are buffered in the 
> broker and the producer times out, reconnects and sends more data before 
> broker has cleared older requests. By reading one-at-a-time, we reduce the 
> amount of time the broker holds onto memory for each request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7637) Error while writing to checkpoint file due to too many open files

2018-11-16 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689570#comment-16689570
 ] 

Ismael Juma commented on KAFKA-7637:


Thanks for the report. 65k is generally low for Kafka. It would be good to 
verify if there is a leak of some sort here although we have clusters running 
for a long time and haven't seen errors like this one.

> Error while writing to checkpoint file due to too many open files
> -
>
> Key: KAFKA-7637
> URL: https://issues.apache.org/jira/browse/KAFKA-7637
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1
> Environment: Red Hat Enterprise Linux Server release 7.4 (Maipo)
>Reporter: Sander van Loo
>Priority: Major
>
> We are running a 3 node Kafka cluster on version 1.1.1 on Red Hat Linux 7.
> Max open files is set to 65000.
> After running for a few days the nodes have the following open file counts:
>  * node01d: 2712
>  * node01e: 2770
>  * node01f: 4102
> After a few weeks of runtime cluster crashes with the following error:
>  
> {noformat}
> [2018-11-12 07:05:16,790] ERROR Error while writing to checkpoint file 
> /var/lib/kafka/topics/replication-offset-checkpoint 
> (kafka.server.LogDirFailureChannel)
> java.io.FileNotFoundException: 
> /var/lib/kafka/topics/replication-offset-checkpoint.tmp (Too many open files)
>     at java.io.FileOutputStream.open0(Native Method)
>     at java.io.FileOutputStream.open(FileOutputStream.java:270)
>     at java.io.FileOutputStream.(FileOutputStream.java:213)
>     at java.io.FileOutputStream.(FileOutputStream.java:162)
>     at 
> kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52)
>     at 
> kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50)
>     at 
> kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:59)
>     at 
> kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9(ReplicaManager.scala:1384)
>     at 
> kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9$adapted(ReplicaManager.scala:1384)
>     at scala.Option.foreach(Option.scala:257)
>     at 
> kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7(ReplicaManager.scala:1384)
>     at 
> kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7$adapted(ReplicaManager.scala:1381)
>     at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
>     at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
>     at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1381)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:242)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {noformat}
> followed by this one:
> {noformat}
> [2018-11-12 07:05:16,792] ERROR [ReplicaManager broker=3] Error while writing 
> to highwatermark file in directory /var/lib/kafka/topics 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.KafkaStorageException: Error while writing to 
> checkpoint file /var/lib/kafka/topics/replication-offset-checkpoint
> Caused by: java.io.FileNotFoundException: 
> /var/lib/kafka/topics/replication-offset-checkpoint.tmp (Too many open files)
>     at java.io.FileOutputStream.open0(Native Method)
>     at java.io.FileOutputStream.open(FileOutputStream.java:270)
>     at java.io.FileOutputStream.(FileOutputStream.java:213)
>     at java.io.FileOutputStream.(FileOutputStream.java:162)
>     at 
> kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52)
>     at 
> kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50)
>     at 
> kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:59)
>     at 
> 

[jira] [Updated] (KAFKA-7635) FetcherThread stops processing after "Error processing data for partition"

2018-11-16 Thread Steven Aerts (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Aerts updated KAFKA-7635:

Description: 
After disabling unclean leader leader again after recovery of a situation where 
we enabled unclean leader due to a split brain in zookeeper, we saw that some 
of our brokers stopped replicating their partitions.

Digging into the logs, we saw that the replica thread was stopped because one 
partition had a failure which threw a [{{Error processing data for partition}} 
exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207].
  But the broker kept running and serving the partitions from which it was 
leader.

We saw three different types of exceptions triggering this (example stacktraces 
attached):
* {{kafka.common.UnexpectedAppendOffsetException}}
* {{Trying to roll a new log segment for topic partition partition-b-97 with 
start offset 1388 while it already exists.}}
* {{Kafka scheduler is not running.}}

We think there are two acceptable ways for the kafka broker to handle this:
* Mark those partitions as a partition with error and handle them accordingly.  
As is done [when a {{CorruptRecordException}} or 
{{KafkaStorageException}}|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L196]
 is thrown.
* Exit the broker as is done [when log truncation is not 
allowed|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L189].
 
Maybe even a combination of both.  Our probably naive idea is that for the 
first two types the first strategy would be the best, but for the last type, it 
is probably better to re-throw a {{FatalExitError}} and exit the broker.



  was:
After disabling unclean leader leader again after recovery of a situation where 
we enabled unclean leader due to a split brain in zookeeper, we saw that some 
of our stopped replicating their partitions.

Digging into the logs, we saw that the replica thread was stopped because one 
partition had a failure which threw a [{{Error processing data for partition}} 
exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207].
  But the broker kept running and serving the partitions from which it was 
leader.

We saw three different types of exceptions triggering this (example stacktraces 
attached):
* {{kafka.common.UnexpectedAppendOffsetException}}
* {{Trying to roll a new log segment for topic partition partition-b-97 with 
start offset 1388 while it already exists.}}
* {{Kafka scheduler is not running.}}

We think there are two acceptable ways for the kafka broker to handle this:
* Mark those partitions as a partition with error and handle them accordingly.  
As is done [when a {{CorruptRecordException}} or 
{{KafkaStorageException}}|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L196]
 is thrown.
* Exit the broker as is done [when log truncation is not 
allowed|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L189].
 
Maybe even a combination of both.  Our probably naive idea is that for the 
first two types the first strategy would be the best, but for the last type, it 
is probably better to re-throw a {{FatalExitError}} and exit the broker.




> FetcherThread stops processing after "Error processing data for partition"
> --
>
> Key: KAFKA-7635
> URL: https://issues.apache.org/jira/browse/KAFKA-7635
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 2.0.0
>Reporter: Steven Aerts
>Priority: Major
> Attachments: stacktraces.txt
>
>
> After disabling unclean leader leader again after recovery of a situation 
> where we enabled unclean leader due to a split brain in zookeeper, we saw 
> that some of our brokers stopped replicating their partitions.
> Digging into the logs, we saw that the replica thread was stopped because one 
> partition had a failure which threw a [{{Error processing data for 
> partition}} 
> exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207].
>   But the broker kept running and serving the partitions from which it was 
> leader.
> We saw three different types of exceptions triggering this (example 
> stacktraces attached):
> * {{kafka.common.UnexpectedAppendOffsetException}}
> * {{Trying to roll a new log segment for topic partition partition-b-97 with 
> start offset 1388 while it already exists.}}
> * {{Kafka scheduler is not running.}}
> We think there are two acceptable ways for the kafka broker to handle this:
> * 

[jira] [Created] (KAFKA-7638) Trogdor - Support mass task creation endpoint

2018-11-16 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7638:
--

 Summary: Trogdor - Support mass task creation endpoint
 Key: KAFKA-7638
 URL: https://issues.apache.org/jira/browse/KAFKA-7638
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


Trogdor supports the creation of tasks via the `coordinator/tasks/create` 
endpoint - it currently accepts only one task.

Since Trogdor support scheduling multiple jobs to execute at a certain time 
(via the `startTime` task parameter leveraged by all tasks), it makes sense to 
support creating multiple tasks in a single endpoint. 
Users might want to leverage the scheduler to, say, create 100 tasks. In the 
current model, they would need to issue 100 requests - which is inefficient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7637) Error while writing to checkpoint file due to too many open files

2018-11-16 Thread Sander van Loo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sander van Loo updated KAFKA-7637:
--
Summary: Error while writing to checkpoint file due to too many open files  
(was: ERROR Error while writing to checkpoint file due to too many open files)

> Error while writing to checkpoint file due to too many open files
> -
>
> Key: KAFKA-7637
> URL: https://issues.apache.org/jira/browse/KAFKA-7637
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1
> Environment: Red Hat Enterprise Linux Server release 7.4 (Maipo)
>Reporter: Sander van Loo
>Priority: Major
>
> We are running a 3 node Kafka cluster on version 1.1.1 on Red Hat Linux 7.
> Max open files is set to 65000.
> After running for a few days the nodes have the following open file counts:
>  * node01d: 2712
>  * node01e: 2770
>  * node01f: 4102
> After a few weeks of runtime cluster crashes with the following error:
>  
> {noformat}
> [2018-11-12 07:05:16,790] ERROR Error while writing to checkpoint file 
> /var/lib/kafka/topics/replication-offset-checkpoint 
> (kafka.server.LogDirFailureChannel)
> java.io.FileNotFoundException: 
> /var/lib/kafka/topics/replication-offset-checkpoint.tmp (Too many open files)
>     at java.io.FileOutputStream.open0(Native Method)
>     at java.io.FileOutputStream.open(FileOutputStream.java:270)
>     at java.io.FileOutputStream.(FileOutputStream.java:213)
>     at java.io.FileOutputStream.(FileOutputStream.java:162)
>     at 
> kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52)
>     at 
> kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50)
>     at 
> kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:59)
>     at 
> kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9(ReplicaManager.scala:1384)
>     at 
> kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9$adapted(ReplicaManager.scala:1384)
>     at scala.Option.foreach(Option.scala:257)
>     at 
> kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7(ReplicaManager.scala:1384)
>     at 
> kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7$adapted(ReplicaManager.scala:1381)
>     at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
>     at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
>     at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1381)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:242)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {noformat}
> followed by this one:
> {noformat}
> [2018-11-12 07:05:16,792] ERROR [ReplicaManager broker=3] Error while writing 
> to highwatermark file in directory /var/lib/kafka/topics 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.KafkaStorageException: Error while writing to 
> checkpoint file /var/lib/kafka/topics/replication-offset-checkpoint
> Caused by: java.io.FileNotFoundException: 
> /var/lib/kafka/topics/replication-offset-checkpoint.tmp (Too many open files)
>     at java.io.FileOutputStream.open0(Native Method)
>     at java.io.FileOutputStream.open(FileOutputStream.java:270)
>     at java.io.FileOutputStream.(FileOutputStream.java:213)
>     at java.io.FileOutputStream.(FileOutputStream.java:162)
>     at 
> kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52)
>     at 
> kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50)
>     at 
> kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:59)
>     at 
> kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9(ReplicaManager.scala:1384)
>     at 
> 

[jira] [Updated] (KAFKA-7636) Allow consumer to update maxPollRecords value

2018-11-16 Thread Kcirtap Seven (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kcirtap Seven updated KAFKA-7636:
-
 Labels: pull-request-available  (was: )
Description: 
Hi,

We have two use cases where we would need to change the max.poll.records 
parameter on the fly :

1. We offer a REST API to get 'feedbacks'. This API takes into account a 
parameter 'count'. 
 The system was previously based on cassandra. It is now based on kafka and 
'feedbacks' are stored into kafka topics. 
 To be compliant with the legacy interface contract, we would like to be able 
to change the max.poll.records on the fly to take into account this 'count' 
parameter.

2. We receive 'notification requests' related to a 'sender' via a REST API. We 
store those requests into topics (by sender). 
 Each sender is associated with a weight. Here is the algorithm that process 
the requests :

    1. At each iteration, we process at max n records (n configurable) for the 
whole bunch of requests. For this example, let's say 100. 
     2. We compute the max poll records for each sender. Let's say we have 3 
senders with the following weight 2, 1, 1. Hence 50 records max for the first 
one, 25 for the others two. 
     3. We consume the topics one after the other. We would like to reallocate 
some capacity to remaining consumers if the max.poll.records is not reached for 
the current consumer. Let'say at each iteration we make the following 
synchronous calls :
             sender1Consumer.poll() with computed max.poll.records 50
             sender2Consumer.poll() with computed max.poll.records 25
             sender3Consumer.poll() with computed max.poll.records 25
        If the first call returns only 10 records, we would like to reallocate 
the 40 "spare" records to the other consumers, 20 for each for instance (or 
another strategy). We would make the following calls instead : 
             sender2Consumer.poll() with updated max.poll.records 45
             sender3Consumer.poll() with updated max.poll.records 45

 

For that requirement we also need to change the max.poll.records on the fly.

 

PR: https://github.com/apache/kafka/pull/5919

Regards,

  was:
Hi,

We have two use cases where we would need to change the max.poll.records 
parameter on the fly :

1. We offer a REST API to get 'feedbacks'. This API takes into account a 
parameter 'count'. 
The system was previously based on cassandra. It is now based on kafka and 
'feedbacks' are stored into kafka topics. 
To be compliant with the legacy interface contract, we would like to be able to 
change the max.poll.records on the fly to take into account this 'count' 
parameter. 

2. We receive 'notification requests' related to a 'sender' via a REST API. We 
store those requests into topics (by sender). 
Each sender is associated with a weight. Here is the algorithm that process the 
requests :

    1. At each iteration, we process at max n records (n configurable) for the 
whole bunch of requests. For this example, let's say 100. 
    2. We compute the max poll records for each sender. Let's say we have 3 
senders with the following weight 2, 1, 1. Hence 50 records max for the first 
one, 25 for the others two. 
    3. We consume the topics one after the other. We would like to reallocate 
some capacity to remaining consumers if the max.poll.records is not reached for 
the current consumer. Let'say at each iteration we make the following 
synchronous calls :
            sender1Consumer.poll() with computed max.poll.records 50
            sender2Consumer.poll() with computed max.poll.records 25
            sender3Consumer.poll() with computed max.poll.records 25
       If the first call returns only 10 records, we would like to reallocate 
the 40 "spare" records to the other consumers, 20 for each for instance (or 
another strategy). We would make the following calls instead : 
            sender2Consumer.poll() with updated max.poll.records 45
            sender3Consumer.poll() with updated max.poll.records 45

 

For that requirement we also need to change the max.poll.records on the fly.


Regards,


> Allow consumer to update maxPollRecords value
> -
>
> Key: KAFKA-7636
> URL: https://issues.apache.org/jira/browse/KAFKA-7636
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.0.1
>Reporter: Kcirtap Seven
>Priority: Minor
>  Labels: pull-request-available
>
> Hi,
> We have two use cases where we would need to change the max.poll.records 
> parameter on the fly :
> 1. We offer a REST API to get 'feedbacks'. This API takes into account a 
> parameter 'count'. 
>  The system was previously based on cassandra. It is now based on kafka and 
> 'feedbacks' are stored into kafka topics. 
>  To be compliant with 

[jira] [Commented] (KAFKA-7636) Allow consumer to update maxPollRecords value

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689465#comment-16689465
 ] 

ASF GitHub Bot commented on KAFKA-7636:
---

kcirtap7 opened a new pull request #5919: KAFKA-7636: Allow consumer to update 
maxPollRecords value
URL: https://github.com/apache/kafka/pull/5919
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow consumer to update maxPollRecords value
> -
>
> Key: KAFKA-7636
> URL: https://issues.apache.org/jira/browse/KAFKA-7636
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.0.1
>Reporter: Kcirtap Seven
>Priority: Minor
>
> Hi,
> We have two use cases where we would need to change the max.poll.records 
> parameter on the fly :
> 1. We offer a REST API to get 'feedbacks'. This API takes into account a 
> parameter 'count'. 
> The system was previously based on cassandra. It is now based on kafka and 
> 'feedbacks' are stored into kafka topics. 
> To be compliant with the legacy interface contract, we would like to be able 
> to change the max.poll.records on the fly to take into account this 'count' 
> parameter. 
> 2. We receive 'notification requests' related to a 'sender' via a REST API. 
> We store those requests into topics (by sender). 
> Each sender is associated with a weight. Here is the algorithm that process 
> the requests :
>     1. At each iteration, we process at max n records (n configurable) for 
> the whole bunch of requests. For this example, let's say 100. 
>     2. We compute the max poll records for each sender. Let's say we have 3 
> senders with the following weight 2, 1, 1. Hence 50 records max for the first 
> one, 25 for the others two. 
>     3. We consume the topics one after the other. We would like to reallocate 
> some capacity to remaining consumers if the max.poll.records is not reached 
> for the current consumer. Let'say at each iteration we make the following 
> synchronous calls :
>             sender1Consumer.poll() with computed max.poll.records 50
>             sender2Consumer.poll() with computed max.poll.records 25
>             sender3Consumer.poll() with computed max.poll.records 25
>        If the first call returns only 10 records, we would like to reallocate 
> the 40 "spare" records to the other consumers, 20 for each for instance (or 
> another strategy). We would make the following calls instead : 
>             sender2Consumer.poll() with updated max.poll.records 45
>             sender3Consumer.poll() with updated max.poll.records 45
>  
> For that requirement we also need to change the max.poll.records on the fly.
> Regards,



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7636) Allow consumer to update maxPollRecords value

2018-11-16 Thread Kcirtap Seven (JIRA)
Kcirtap Seven created KAFKA-7636:


 Summary: Allow consumer to update maxPollRecords value
 Key: KAFKA-7636
 URL: https://issues.apache.org/jira/browse/KAFKA-7636
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.0.1
Reporter: Kcirtap Seven


Hi,

We have two use cases where we would need to change the max.poll.records 
parameter on the fly :

1. We offer a REST API to get 'feedbacks'. This API takes into account a 
parameter 'count'. 
The system was previously based on cassandra. It is now based on kafka and 
'feedbacks' are stored into kafka topics. 
To be compliant with the legacy interface contract, we would like to be able to 
change the max.poll.records on the fly to take into account this 'count' 
parameter. 

2. We receive 'notification requests' related to a 'sender' via a REST API. We 
store those requests into topics (by sender). 
Each sender is associated with a weight. Here is the algorithm that process the 
requests :

    1. At each iteration, we process at max n records (n configurable) for the 
whole bunch of requests. For this example, let's say 100. 
    2. We compute the max poll records for each sender. Let's say we have 3 
senders with the following weight 2, 1, 1. Hence 50 records max for the first 
one, 25 for the others two. 
    3. We consume the topics one after the other. We would like to reallocate 
some capacity to remaining consumers if the max.poll.records is not reached for 
the current consumer. Let'say at each iteration we make the following 
synchronous calls :
            sender1Consumer.poll() with computed max.poll.records 50
            sender2Consumer.poll() with computed max.poll.records 25
            sender3Consumer.poll() with computed max.poll.records 25
       If the first call returns only 10 records, we would like to reallocate 
the 40 "spare" records to the other consumers, 20 for each for instance (or 
another strategy). We would make the following calls instead : 
            sender2Consumer.poll() with updated max.poll.records 45
            sender3Consumer.poll() with updated max.poll.records 45

 

For that requirement we also need to change the max.poll.records on the fly.


Regards,



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7565) NPE in KafkaConsumer

2018-11-16 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7565.
---
   Resolution: Duplicate
Fix Version/s: (was: 2.2.0)

[~avakhrenev] Thanks for testing, closing this issue.

> NPE in KafkaConsumer
> 
>
> Key: KAFKA-7565
> URL: https://issues.apache.org/jira/browse/KAFKA-7565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Alexey Vakhrenev
>Priority: Critical
>
> The stacktrace is
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> {noformat}
> Couldn't find minimal reproducer, but it happens quite often in our system. 
> We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is 
> somehow related.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7054) Kafka describe command should throw topic doesn't exist exception.

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689311#comment-16689311
 ] 

ASF GitHub Bot commented on KAFKA-7054:
---

ManoharVanam opened a new pull request #5211: [KAFKA-7054] Kafka describe 
command should throw topic doesn't exist exception.
URL: https://github.com/apache/kafka/pull/5211
 
 
   **User Interface Improvement :** If topic doesn't exist then Kafka describe 
command should throw topic doesn't exist exception, like alter and delete 
commands
   
   ### Committer Checklist (excluded from commit message)
 - [ ] Verify design and implementation
 - [ ] Verify test coverage and CI build status
 - [ ] Verify documentation (including upgrade notes)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka describe command should throw topic doesn't exist exception.
> --
>
> Key: KAFKA-7054
> URL: https://issues.apache.org/jira/browse/KAFKA-7054
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Manohar Vanam
>Priority: Minor
>
> If topic doesn't exist then Kafka describe command should throw topic doesn't 
> exist exception.
> like alter and delete commands :
> {code:java}
> local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --delete 
> --topic manu
> Error while executing topic command : Topic manu does not exist on ZK path 
> localhost:2181
> [2018-06-13 15:08:13,111] ERROR java.lang.IllegalArgumentException: Topic 
> manu does not exist on ZK path localhost:2181
>  at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
>  at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:184)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:71)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --alter 
> --topic manu
> Error while executing topic command : Topic manu does not exist on ZK path 
> localhost:2181
> [2018-06-13 15:08:43,663] ERROR java.lang.IllegalArgumentException: Topic 
> manu does not exist on ZK path localhost:2181
>  at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
>  at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:125)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7576) Dynamic update of replica fetcher threads may fail to start/close fetchers

2018-11-16 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7576.
---
Resolution: Fixed
  Reviewer: Jason Gustafson

> Dynamic update of replica fetcher threads may fail to start/close fetchers
> --
>
> Key: KAFKA-7576
> URL: https://issues.apache.org/jira/browse/KAFKA-7576
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.1, 2.0.1, 2.1.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.2, 2.1.1, 2.0.2
>
>
> KAFKA-6051 moved  ReplicaFetcherBlockingSend shutdown earlier in the shutdown 
> sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers 
> can now throw an exception because Selector may be closed on a different 
> thread while data is being written on another thread. KAFKA-7464 changed this 
> behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and 
> not propagated to avoid exceptions during broker shutdown.
> When config update notification of `num.replica.fetchers` is processed, 
> partitions are migrated as necessary to increase or decrease the number of 
> fetcher threads. Existing fetchers are shutdown first before new ones are 
> created.This migration is performed on the thread processing ZK change 
> notification. The shutdown of Selector of existing fetchers is not safe since 
> replica fetcher thread may be processing data at the time using the same 
> Selector.
> Without the fix from KAFKA-7464, another update of the config or broker 
> restart is required to restart the replica fetchers after dynamic config 
> update if shutdown encounters an exception.
> Exception stack trace:
> {code:java}
> java.lang.IllegalArgumentException
> at java.nio.Buffer.position(Buffer.java:244)
> at sun.nio.ch.IOUtil.write(IOUtil.java:68)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
> at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
> at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
> at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
> at org.apache.kafka.common.network.Selector.close(Selector.java:736)
> at org.apache.kafka.common.network.Selector.close(Selector.java:698)
> at org.apache.kafka.common.network.Selector.close(Selector.java:314)
> at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
> at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
> at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
> at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
> at 
> kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
> at 
> kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
> at 
> kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
> at 
> kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
> at 
> kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
> kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with 

[jira] [Commented] (KAFKA-7054) Kafka describe command should throw topic doesn't exist exception.

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689282#comment-16689282
 ] 

ASF GitHub Bot commented on KAFKA-7054:
---

ManoharVanam closed pull request #5211: [KAFKA-7054] Kafka describe command 
should throw topic doesn't exist exception.
URL: https://github.com/apache/kafka/pull/5211
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka describe command should throw topic doesn't exist exception.
> --
>
> Key: KAFKA-7054
> URL: https://issues.apache.org/jira/browse/KAFKA-7054
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Manohar Vanam
>Priority: Minor
>
> If topic doesn't exist then Kafka describe command should throw topic doesn't 
> exist exception.
> like alter and delete commands :
> {code:java}
> local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --delete 
> --topic manu
> Error while executing topic command : Topic manu does not exist on ZK path 
> localhost:2181
> [2018-06-13 15:08:13,111] ERROR java.lang.IllegalArgumentException: Topic 
> manu does not exist on ZK path localhost:2181
>  at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
>  at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:184)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:71)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --alter 
> --topic manu
> Error while executing topic command : Topic manu does not exist on ZK path 
> localhost:2181
> [2018-06-13 15:08:43,663] ERROR java.lang.IllegalArgumentException: Topic 
> manu does not exist on ZK path localhost:2181
>  at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
>  at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:125)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7635) FetcherThread stops processing after "Error processing data for partition"

2018-11-16 Thread Steven Aerts (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689278#comment-16689278
 ] 

Steven Aerts commented on KAFKA-7635:
-

Similar but focusses on a specific issue.

> FetcherThread stops processing after "Error processing data for partition"
> --
>
> Key: KAFKA-7635
> URL: https://issues.apache.org/jira/browse/KAFKA-7635
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 2.0.0
>Reporter: Steven Aerts
>Priority: Major
> Attachments: stacktraces.txt
>
>
> After disabling unclean leader leader again after recovery of a situation 
> where we enabled unclean leader due to a split brain in zookeeper, we saw 
> that some of our stopped replicating their partitions.
> Digging into the logs, we saw that the replica thread was stopped because one 
> partition had a failure which threw a [{{Error processing data for 
> partition}} 
> exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207].
>   But the broker kept running and serving the partitions from which it was 
> leader.
> We saw three different types of exceptions triggering this (example 
> stacktraces attached):
> * {{kafka.common.UnexpectedAppendOffsetException}}
> * {{Trying to roll a new log segment for topic partition partition-b-97 with 
> start offset 1388 while it already exists.}}
> * {{Kafka scheduler is not running.}}
> We think there are two acceptable ways for the kafka broker to handle this:
> * Mark those partitions as a partition with error and handle them 
> accordingly.  As is done [when a {{CorruptRecordException}} or 
> {{KafkaStorageException}}|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L196]
>  is thrown.
> * Exit the broker as is done [when log truncation is not 
> allowed|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L189].
>  
> Maybe even a combination of both.  Our probably naive idea is that for the 
> first two types the first strategy would be the best, but for the last type, 
> it is probably better to re-throw a {{FatalExitError}} and exit the broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread

2018-11-16 Thread Sachin Upadhyay (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689269#comment-16689269
 ] 

Sachin Upadhyay commented on KAFKA-7280:


[~rsivaram] Thanks for the detailed answers.

[~ijuma] I rechecked and I can see "2.0.1" kafka-client in maven repo now.

Is "1.1.2" also scheduled to be released anytime soon? I was thinking of 
upgrading clients to version having only minor-version bumped up instead of 
major-version.

Thanks,

-Sachin

> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --
>
> Key: KAFKA-7280
> URL: https://issues.apache.org/jira/browse/KAFKA-7280
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we 
> are using it in Kafka consumer without any synchronization even though poll() 
> from heartbeat thread can process responses. Heartbeat thread holds the 
> coordinator lock while processing its poll and responses, making other 
> operations involving the group coordinator safe. We also need to lock 
> FetchSessionHandler for the operations that update or read 
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of 
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
>  at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
>  at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>  
> The logs just prior to the exception show that a partition was removed from 
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-1 because there is an 
> in-flight request to worker4:9095 (id: 3 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Completed receive from node 2 for FETCH with correlation id 
> 417, received 
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro: 
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Added READ_UNCOMMITTED fetch request for partition 
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
> (org.apache.kafka.clients.FetchSessionHandler)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Sending READ_UNCOMMITTED 

[jira] [Commented] (KAFKA-7565) NPE in KafkaConsumer

2018-11-16 Thread Alexey Vakhrenev (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689243#comment-16689243
 ] 

Alexey Vakhrenev commented on KAFKA-7565:
-

[~rsivaram], [~ijuma] the issue seems to be fixed in 2.0.1, I didn't encounter 
this error anymore. So this can be closed I think.
Thank you.

> NPE in KafkaConsumer
> 
>
> Key: KAFKA-7565
> URL: https://issues.apache.org/jira/browse/KAFKA-7565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Alexey Vakhrenev
>Priority: Critical
> Fix For: 2.2.0
>
>
> The stacktrace is
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> {noformat}
> Couldn't find minimal reproducer, but it happens quite often in our system. 
> We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is 
> somehow related.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7635) FetcherThread stops processing after "Error processing data for partition"

2018-11-16 Thread Steven Aerts (JIRA)
Steven Aerts created KAFKA-7635:
---

 Summary: FetcherThread stops processing after "Error processing 
data for partition"
 Key: KAFKA-7635
 URL: https://issues.apache.org/jira/browse/KAFKA-7635
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 2.0.0
Reporter: Steven Aerts
 Attachments: stacktraces.txt

After disabling unclean leader leader again after recovery of a situation where 
we enabled unclean leader due to a split brain in zookeeper, we saw that some 
of our stopped replicating their partitions.

Digging into the logs, we saw that the replica thread was stopped because one 
partition had a failure which threw a [{{Error processing data for partition}} 
exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207].
  But the broker kept running and serving the partitions from which it was 
leader.

We saw three different types of exceptions triggering this (example stacktraces 
attached):
* {{kafka.common.UnexpectedAppendOffsetException}}
* {{Trying to roll a new log segment for topic partition partition-b-97 with 
start offset 1388 while it already exists.}}
* {{Kafka scheduler is not running.}}

We think there are two acceptable ways for the kafka broker to handle this:
* Mark those partitions as a partition with error and handle them accordingly.  
As is done [when a {{CorruptRecordException}} or 
{{KafkaStorageException}}|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L196]
 is thrown.
* Exit the broker as is done [when log truncation is not 
allowed|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L189].
 
Maybe even a combination of both.  Our probably naive idea is that for the 
first two types the first strategy would be the best, but for the last type, it 
is probably better to re-throw a {{FatalExitError}} and exit the broker.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6774) Improve default groupId behavior in consumer

2018-11-16 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6774.

Resolution: Fixed

> Improve default groupId behavior in consumer
> 
>
> Key: KAFKA-6774
> URL: https://issues.apache.org/jira/browse/KAFKA-6774
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> At the moment, the default groupId in the consumer is "". If you try to use 
> this to subscribe() to a topic, the broker will reject the group as invalid. 
> On the other hand, if you use it with assign(), then the user will be able to 
> fetch and commit offsets using the empty groupId. Probably 99% of the time, 
> this is not what the user expects. Instead you would probably expect that if 
> no groupId is provided, then no committed offsets will be fetched at all and 
> we'll just use the auto reset behavior if we don't have a current position.
> Here are two potential solutions (both requiring a KIP):
> 1. Change the default to null. We will preserve the current behavior for 
> subscribe(). When using assign(), we will not bother fetching committed 
> offsets for the null groupId, and any attempt to commit offsets will raise an 
> error. The user can still use the empty groupId, but they have to specify it 
> explicitly.
> 2. Keep the current default, but change the consumer to treat this value as 
> if it were null as described in option 1. The argument for this behavior is 
> that using the empty groupId to commit offsets is inherently a dangerous 
> practice and should not be permitted. We'd have to convince ourselves that 
> we're fine not needing to allow the empty groupId for backwards compatibility 
> though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689175#comment-16689175
 ] 

ASF GitHub Bot commented on KAFKA-6774:
---

hachikuji closed pull request #5877: KAFKA-6774: Improve the default group id 
behavior in KafkaConsumer (KIP-289)
URL: https://github.com/apache/kafka/pull/5877
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 795a762a494..9cd5766ea3e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -277,7 +277,7 @@

ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
 Importance.MEDIUM,
 
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
-.define(GROUP_ID_CONFIG, Type.STRING, "", 
Importance.HIGH, GROUP_ID_DOC)
+.define(GROUP_ID_CONFIG, Type.STRING, null, 
Importance.HIGH, GROUP_ID_DOC)
 .define(SESSION_TIMEOUT_MS_CONFIG,
 Type.INT,
 1,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3a756721fd8..5c673a58c10 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -37,6 +37,8 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -557,6 +559,7 @@
 
 private final Logger log;
 private final String clientId;
+private String groupId;
 private final ConsumerCoordinator coordinator;
 private final Deserializer keyDeserializer;
 private final Deserializer valueDeserializer;
@@ -654,18 +657,23 @@ public KafkaConsumer(Properties properties,
 }
 
 @SuppressWarnings("unchecked")
-private KafkaConsumer(ConsumerConfig config,
-  Deserializer keyDeserializer,
-  Deserializer valueDeserializer) {
+private KafkaConsumer(ConsumerConfig config, Deserializer 
keyDeserializer, Deserializer valueDeserializer) {
 try {
 String clientId = 
config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
 if (clientId.isEmpty())
 clientId = "consumer-" + 
CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
 this.clientId = clientId;
-String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
-
+this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
 LogContext logContext = new LogContext("[Consumer clientId=" + 
clientId + ", groupId=" + groupId + "] ");
 this.log = logContext.logger(getClass());
+boolean enableAutoCommit = 
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+if (groupId == null) { // overwrite in case of default group id 
where the config is not explicitly provided
+if 
(!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
+enableAutoCommit = false;
+else if (enableAutoCommit)
+throw new 
InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " 
cannot be set to true when default group id (null) is used.");
+} else if (groupId.isEmpty())
+log.warn("Support for using the empty group id by consumers is 
deprecated and will be removed in the next major release.");
 
 log.debug("Initializing the Kafka consumer");
 this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
@@ -678,8 +686,7 @@ private KafkaConsumer(ConsumerConfig config,
 
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
 .tags(metricsTags);