[jira] [Updated] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread Patrik Kleindl (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Kleindl updated KAFKA-7660:
--
Attachment: heapdump-1543563004522_after.hprof

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof, 
> heapdump-1543563004522_after.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread Patrik Kleindl (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Kleindl updated KAFKA-7660:
--
Attachment: (was: heapdump-1543563004522_after.hprof)

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7690) Change disk allocation policy for multiple partitions on a broker when topic is created

2018-11-30 Thread haiyangyu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

haiyangyu updated KAFKA-7690:
-
Description: 
h3. *Background*

if target topic partitions lager than broker size when create a topic or add 
partition, one broker will be assigned more than one partition. if current all 
disk is not balance, such as one disk has one partition and the other one has 
four partitions due to topic delete or others, the mutil partitions will be all 
allocated in a single disk, and if the target topic has a huge flow, it is 
easily to fill up the disk io.
h3. *Improvement strategy*

when mutil ** partition is going to be allocated on a broker, the strategy is 
as follow:

1、calculate the target topic partition count and total partition count on each 
disk.

topic count

2、sorted by the target topic partition count wich ascending order, if the 
target topic partition count is equal, sorted by the total partitions on each 
disk.
h3. *Example*

 
||disk||target topic partition count||total partition count||
|disk1|0|11|
|disk2|0|9|

when tow partitions are assigned on this broker, if use origin strategy, the 
result is as follows:
||disk||target topic partition count||total partition count||
|disk1|0|11|
|disk2|2|11|

use new strategy, the result is as follows:
||disk||target topic partition count||total partition count||
|disk1|1|12|
|disk2|1|10|

if the topic has a huge flow such as 50MB/s per partition, it is easily to fill 
up disk2 io if use origin strategy. However if use new strategy, it's well in 
disk io rebalance.
h3. *Summary*

This strategy is good to build a big cluster and create a topic which has huge 
amount partition, the disk io will be more balanced.

 

  was:
h3. *Background*

if target topic partitions lager than broker size when create a topic or add 
partition, one broker will be assigned more than one partition. if current all 
disk is not balance, such as one disk has one partition and the other one has 
four partitions due to topic delete or others, the mutil partitions will be all 
allocated in the first disk, and if the target topic has a large flow, it is 
easily to fill up the disk io.
h3. *Improvement strategy*

when mutil ** partition is going to be allocated on a broker, the strategy is 
as follow:

1、calculate the target topic partition count and total partition count on each 
disk.

topic count

2、sorted by the target topic partition count wich ascending order, if the 
target topic partition count is equal, sorted by the total partitions on each 
disk.
h3. *Example*

 
||disk||target topic partition count||total partition count||
|disk1|0|11|
|disk2|0|9|

when tow partitions are assigned on this broker, if use origin strategy, the 
result is as follows:
||disk||target topic partition count||total partition count||
|disk1|0|11|
|disk2|2|11|

use new strategy, the result is as follows:
||disk||target topic partition count||total partition count||
|disk1|1|12|
|disk2|1|10|

if the topic has a huge flow such as 50MB/s per partition, it is easily to fill 
up disk2 io if use origin strategy. However if use new strategy, it's well in 
disk io rebalance.
h3. *Summary*

This strategy is good to build a big cluster and create a topic which has huge 
amount partition, the disk io will be more balanced.

 


> Change disk allocation policy for multiple partitions on a broker when topic 
> is created
> ---
>
> Key: KAFKA-7690
> URL: https://issues.apache.org/jira/browse/KAFKA-7690
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.2.0, 1.0.0, 2.0.0
>Reporter: haiyangyu
>Priority: Major
>
> h3. *Background*
> if target topic partitions lager than broker size when create a topic or add 
> partition, one broker will be assigned more than one partition. if current 
> all disk is not balance, such as one disk has one partition and the other one 
> has four partitions due to topic delete or others, the mutil partitions will 
> be all allocated in a single disk, and if the target topic has a huge flow, 
> it is easily to fill up the disk io.
> h3. *Improvement strategy*
> when mutil ** partition is going to be allocated on a broker, the strategy is 
> as follow:
> 1、calculate the target topic partition count and total partition count on 
> each disk.
> topic count
> 2、sorted by the target topic partition count wich ascending order, if the 
> target topic partition count is equal, sorted by the total partitions on each 
> disk.
> h3. *Example*
>  
> ||disk||target topic partition count||total partition count||
> |disk1|0|11|
> |disk2|0|9|
> when tow partitions are assigned on this broker, if use origin strategy, the 
> result is as follows:
> ||disk||target topic partition count||total partition count||
> |disk1|0|11|
> |disk2|2|1

[jira] [Updated] (KAFKA-7690) Change disk allocation policy for multiple partitions on a broker when topic is created

2018-11-30 Thread haiyangyu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

haiyangyu updated KAFKA-7690:
-
Attachment: disk_assignent_strategy.patch

> Change disk allocation policy for multiple partitions on a broker when topic 
> is created
> ---
>
> Key: KAFKA-7690
> URL: https://issues.apache.org/jira/browse/KAFKA-7690
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.2.0, 1.0.0, 2.0.0
>Reporter: haiyangyu
>Priority: Major
> Attachments: disk_assignent_strategy.patch
>
>
> h3. *Background*
> if target topic partitions lager than broker size when create a topic or add 
> partition, one broker will be assigned more than one partition. if current 
> all disk is not balance, such as one disk has one partition and the other one 
> has four partitions due to topic delete or others, the mutil partitions will 
> be all allocated in a single disk, and if the target topic has a huge flow, 
> it is easily to fill up the disk io.
> h3. *Improvement strategy*
> when mutil ** partition is going to be allocated on a broker, the strategy is 
> as follow:
> 1、calculate the target topic partition count and total partition count on 
> each disk.
> topic count
> 2、sorted by the target topic partition count wich ascending order, if the 
> target topic partition count is equal, sorted by the total partitions on each 
> disk.
> h3. *Example*
>  
> ||disk||target topic partition count||total partition count||
> |disk1|0|11|
> |disk2|0|9|
> when tow partitions are assigned on this broker, if use origin strategy, the 
> result is as follows:
> ||disk||target topic partition count||total partition count||
> |disk1|0|11|
> |disk2|2|11|
> use new strategy, the result is as follows:
> ||disk||target topic partition count||total partition count||
> |disk1|1|12|
> |disk2|1|10|
> if the topic has a huge flow such as 50MB/s per partition, it is easily to 
> fill up disk2 io if use origin strategy. However if use new strategy, it's 
> well in disk io rebalance.
> h3. *Summary*
> This strategy is good to build a big cluster and create a topic which has 
> huge amount partition, the disk io will be more balanced.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7690) Change disk allocation policy for multiple partitions on a broker when topic is created

2018-11-30 Thread haiyangyu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704378#comment-16704378
 ] 

haiyangyu edited comment on KAFKA-7690 at 11/30/18 7:58 AM:


[~omkreddy]

[~ijuma]

[~guozhang]

[~hachikuji]

[~cmccabe]

[~junrao]

 


was (Author: yuhaiyang):
[~omkreddy]

[~ijuma]

[~guozhang]

[~hachikuji]

[~cmccabe]

[~junrao]

> Change disk allocation policy for multiple partitions on a broker when topic 
> is created
> ---
>
> Key: KAFKA-7690
> URL: https://issues.apache.org/jira/browse/KAFKA-7690
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.2.0, 1.0.0, 2.0.0
>Reporter: haiyangyu
>Priority: Major
>
> h3. *Background*
> if target topic partitions lager than broker size when create a topic or add 
> partition, one broker will be assigned more than one partition. if current 
> all disk is not balance, such as one disk has one partition and the other one 
> has four partitions due to topic delete or others, the mutil partitions will 
> be all allocated in the first disk, and if the target topic has a large flow, 
> it is easily to fill up the disk io.
> h3. *Improvement strategy*
> when mutil ** partition is going to be allocated on a broker, the strategy is 
> as follow:
> 1、calculate the target topic partition count and total partition count on 
> each disk.
> topic count
> 2、sorted by the target topic partition count wich ascending order, if the 
> target topic partition count is equal, sorted by the total partitions on each 
> disk.
> h3. *Example*
>  
> ||disk||target topic partition count||total partition count||
> |disk1|0|11|
> |disk2|0|9|
> when tow partitions are assigned on this broker, if use origin strategy, the 
> result is as follows:
> ||disk||target topic partition count||total partition count||
> |disk1|0|11|
> |disk2|2|11|
> use new strategy, the result is as follows:
> ||disk||target topic partition count||total partition count||
> |disk1|1|12|
> |disk2|1|10|
> if the topic has a huge flow such as 50MB/s per partition, it is easily to 
> fill up disk2 io if use origin strategy. However if use new strategy, it's 
> well in disk io rebalance.
> h3. *Summary*
> This strategy is good to build a big cluster and create a topic which has 
> huge amount partition, the disk io will be more balanced.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704395#comment-16704395
 ] 

Patrik Kleindl commented on KAFKA-7660:
---

[~vvcephei] [~guozhang] I almost reported that it didn't work before noticing 
that this had changed kafka-clients too.

Just reran the test for 10 minutes and the number of Sensor, KafkaMetric and 
MetricName instances now seems stable (eg. GC cleans them up)

Thumbs up :)

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704399#comment-16704399
 ] 

Patrik Kleindl commented on KAFKA-7660:
---

Last question, who should "resolve" the Jira issue? I guess that is done by the 
person who fixed it to supply fixVersion etc?

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-11-30 Thread Sarvesh Tamba (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704404#comment-16704404
 ] 

Sarvesh Tamba commented on KAFKA-7580:
--

Cloned and downloaded the 'trunk' branch - https://github.com/apache/kafka.git. 
No other changes were applied. The following unit test fail on Ubuntu:-

kafka.log.LogCleanerParameterizedIntegrationTest > 
testCleansCombinedCompactAndDeleteTopic[4] FAILED
 java.lang.AssertionError: Contents of the map shouldn't change expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354), 1 -> (341,341), 
6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 -> (342,342), 17 -> 
(357,357), 12 -> (352,352), 7 -> (347,347), 3 -> (343,343), 18 -> (358,358), 16 
-> (356,356), 11 -> (351,351), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 
15 -> (355,355))> but was: (340,340), 88 -> (288,288), 5 -> (345,345), 
10 -> (350,350), 14 -> (354,354), 93 -> (293,293), 89 -> (289,289), 1 -> 
(341,341), 6 -> (346,346), 92 -> (292,292), 97 -> (297,297), 9 -> (349,349), 96 
-> (296,296), 13 -> (353,353), 2 -> (342,342), 17 -> (357,357), 12 -> 
(352,352), 7 -> (347,347), 98 -> (298,298), 91 -> (291,291), 3 -> (343,343), 18 
-> (358,358), 95 -> (295,295), 16 -> (356,356), 11 -> (351,351), 99 -> 
(299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 94 -> (294,294), 15 
-> (355,355), 90 -> (290,290))>
 at org.junit.Assert.fail(Assert.java:88)
 at org.junit.Assert.failNotEquals(Assert.java:834)
 at org.junit.Assert.assertEquals(Assert.java:118)
 at 
kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129)

 

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission 

[jira] [Updated] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-30 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastian Puzoń updated KAFKA-7531:
---
Attachment: server.log.2018-11-26-16.gz

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
> Attachments: server.log.2018-11-26-16.gz
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_res

[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-30 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704424#comment-16704424
 ] 

Sebastian Puzoń commented on KAFKA-7531:


I got another NPE in different code fragment, logs attached

 

 

 
{code:java}
ERROR [KafkaApi-1] Error when handling request 
{transactional_id=elog_server_inst_visits_agg-0_3,producer_id=1002,producer_epoch=303,transaction_result=true}
 (kafka.server.KafkaApis)
java.lang.NullPointerException
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$logInvalidStateTransitionAndReturnError$1(TransactionCoordinator.scala:290)
at kafka.utils.Logging.debug(Logging.scala:62)
at kafka.utils.Logging.debug$(Logging.scala:62)
at 
kafka.coordinator.transaction.TransactionCoordinator.debug(TransactionCoordinator.scala:85)
at 
kafka.coordinator.transaction.TransactionCoordinator.logInvalidStateTransitionAndReturnError(TransactionCoordinator.scala:289)
at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:394)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
at scala.util.Either$RightProjection.flatMap(Either.scala:702)
at 
kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
at 
kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
at 
kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
at 
kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at 
kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
at 
kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745)
{code}
 

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
> Attachments: server.log.2018-11-26-16.gz
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__cons

[jira] [Updated] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-30 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastian Puzoń updated KAFKA-7531:
---
Attachment: (was: server.log.2018-11-26-16.gz)

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_result=true}
>  (kafka.server.KafkaApis)
> ja

[jira] [Updated] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-30 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastian Puzoń updated KAFKA-7531:
---
Attachment: server.log.2018-11-29-16.gz

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
> Attachments: server.log.2018-11-29-16.gz
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_res

[jira] [Resolved] (KAFKA-7617) Document security primitives

2018-11-30 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7617.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

Issue resolved by pull request 5906
[https://github.com/apache/kafka/pull/5906]

> Document security primitives
> 
>
> Key: KAFKA-7617
> URL: https://issues.apache.org/jira/browse/KAFKA-7617
> Project: Kafka
>  Issue Type: Task
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>Priority: Minor
> Fix For: 2.2.0
>
>
> Although the documentation gives help on configuring the authentication and 
> authorization, it won't list what are the security primitives (operations and 
> resources) that can be used which makes it hard for users to easily set up 
> thorough authorization rules.
> This task would cover adding these to the security page of the Kafka 
> documentation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7617) Document security primitives

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704458#comment-16704458
 ] 

ASF GitHub Bot commented on KAFKA-7617:
---

omkreddy closed pull request #5906: KAFKA-7617: Add authorization primitives to 
security page
URL: https://github.com/apache/kafka/pull/5906
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/security.html b/docs/security.html
index 5f6d0aceb8e..28c506c7c0e 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1258,6 +1258,450 @@ Examples
 
 
 
+Authorization Primitives
+Protocol calls are usually performing some operations on certain 
resources in Kafka. It is required to know the
+operations and resources to set up effective protection. In this 
section we'll list these operations and
+resources, then list the combination of these with the protocols to 
see the valid scenarios.
+Operations in 
Kafka
+There are a few operation primitives that can be used to build up 
privileges. These can be matched up with
+certain resources to allow specific protocol calls for a given user. 
These are:
+
+Read
+Write
+Create
+Delete
+Alter
+Describe
+ClusterAction
+DescribeConfigs
+AlterConfigs
+IdempotentWrite
+All
+
+Resources in 
Kafka
+The operations above can be applied on certain resources which are 
described below.
+
+Topic: this simply represents a Topic. All protocol calls 
that are acting on topics (such as reading,
+writing them) require the corresponding privilege to be added. If 
there is an authorization error with a
+topic resource, then a TOPIC_AUTHORIZATION_FAILED (error code: 29) 
will be returned.
+Group: this represents the consumer groups in the brokers. 
All protocol calls that are working with
+consumer groups, like joining a group must have privileges with 
the group in subject. If the privilege is not
+given then a GROUP_AUTHORIZATION_FAILED (error code: 30) will be 
returned in the protocol response.
+Cluster: this resource represents the cluster. Operations 
that are affecting the whole cluster, like
+controlled shutdown are protected by privileges on the Cluster 
resource. If there is an authorization problem
+on a cluster resource, then a CLUSTER_AUTHORIZATION_FAILED (error 
code: 31) will be returned.
+TransactionalId: this resource represents actions related 
to transactions, such as committing.
+If any error occurs, then a TRANSACTIONAL_ID_AUTHORIZATION_FAILED 
(error code: 53) will be returned by brokers.
+DelegationToken: this represents the delegation tokens in 
the cluster. Actions, such as describing
+delegation tokens could be protected by a privilege on the 
DelegationToken resource. Since these objects have
+a little special behavior in Kafka it is recommended to read
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka#KIP-48DelegationtokensupportforKafka-DescribeDelegationTokenRequest";>KIP-48
+and the related upstream documentation at Authentication using Delegation 
Tokens.
+
+Operations and Resources on 
Protocols
+In the below table we'll list the valid operations on resources that 
are executed by the Kafka API protocols.
+
+
+
+Protocol (API key)
+Operation
+Resource
+Note
+
+
+
+
+PRODUCE (0)
+Write
+TransactionalId
+An transactional producer which has its transactional.id set 
requires this privilege.
+
+
+PRODUCE (0)
+IdempotentWrite
+Cluster
+An idempotent produce action requires this privilege.
+
+
+PRODUCE (0)
+Write
+Topic
+This applies to a normal produce action.
+
+
+FETCH (1)
+ClusterAction
+Cluster
+A follower must have ClusterAction on the Cluster resource in 
order to fetch partition data.
+
+
+FETCH (1)
+Read
+Topic
+Regular Kafka consumers need READ permission on each partition 
they are fetching.
+
+
+LIST_OFFSETS (2)
+Describe
+Topic
+
+
+
+METADATA (3)
+Describe
+Topic
+  

[jira] [Commented] (KAFKA-6987) Reimplement KafkaFuture with CompletableFuture

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704611#comment-16704611
 ] 

ASF GitHub Bot commented on KAFKA-6987:
---

andrasbeni closed pull request #5131: KAFKA-6987 Reimplement KafkaFuture with 
CompletableFuture
URL: https://github.com/apache/kafka/pull/5131
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java 
b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 9cd2e01dc42..4afe3c530bf 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -19,14 +19,19 @@
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
- * A flexible future which supports call chaining and other asynchronous 
programming patterns. This will
- * eventually become a thin shim on top of Java 8's CompletableFuture.
+ * A flexible future which supports call chaining and other asynchronous 
programming patterns. This
+ * is a thin shim on top of Java 8's CompletableFuture.
+ *
+ * Please note that while this class offers methods similar to 
CompletableFuture's whenComplete and thenApply,
+ * functions passed to these methods will never be called with 
CompletionException. If you wish to use
+ * CompletableFuture semantics, use {@link #toCompletableFuture()}.
  *
  * The API for this class is still evolving and we may break compatibility in 
minor releases, if necessary.
  */
@@ -202,4 +207,13 @@ public abstract T get(long timeout, TimeUnit unit) throws 
InterruptedException,
  */
 @Override
 public abstract boolean isDone();
+
+/**
+ * Returns a ComletableFuture equivalent to this Future.
+ *
+ * Implemented in {@link KafkaFuture} throws UnsuportedOperationException.
+ */
+public CompletableFuture toCompletableFuture() {
+throw new UnsupportedOperationException();
+}
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index 33916ac952a..fd9093d763a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -16,124 +16,38 @@
  */
 package org.apache.kafka.common.internals;
 
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.kafka.common.KafkaFuture;
+
+
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.kafka.common.KafkaFuture;
-
 /**
  * A flexible future which supports call chaining and other asynchronous 
programming patterns.
- * This will eventually become a thin shim on top of Java 8's 
CompletableFuture.
+ * This is a thin shim on top of Java 8's CompletableFuture.
+ *
+ * Please note that while this class offers methods similar to 
CompletableFuture's whenComplete and thenApply,
+ * functions passed to these methods will never be called with 
CompletionException. If you wish to use
+ * CompletableFuture semantics, use {@link #toCompletableFuture()}.
+ *
  */
 public class KafkaFutureImpl extends KafkaFuture {
-/**
- * A convenience method that throws the current exception, wrapping it if 
needed.
- *
- * In general, KafkaFuture throws CancellationException and 
InterruptedException directly, and
- * wraps all other exceptions in an ExecutionException.
- */
-private static void wrapAndThrow(Throwable t) throws InterruptedException, 
ExecutionException {
-if (t instanceof CancellationException) {
-throw (CancellationException) t;
-} else if (t instanceof InterruptedException) {
-throw (InterruptedException) t;
-} else {
-throw new ExecutionException(t);
-}
-}
-
-private static class Applicant implements BiConsumer {
-private final BaseFunction function;
-private final KafkaFutureImpl future;
 
-Applicant(BaseFunction function, KafkaFutureImpl future) {
-this.function = function;
-this.future =

[jira] [Commented] (KAFKA-3362) Update protocol schema and field doc strings

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704610#comment-16704610
 ] 

ASF GitHub Bot commented on KAFKA-3362:
---

andrasbeni closed pull request #3495: KAFKA-3362: Update protocol schema and 
field doc strings
URL: https://github.com/apache/kafka/pull/3495
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index 920c2957c4d..f0fbf268a30 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -64,19 +64,19 @@
 
 public static final short CONSUMER_PROTOCOL_V0 = 0;
 public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema(
-new Field(VERSION_KEY_NAME, Type.INT16));
+new Field(VERSION_KEY_NAME, Type.INT16, "Version number of 
consumer protocol."));
 private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new 
Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA)
 .set(VERSION_KEY_NAME, CONSUMER_PROTOCOL_V0);
 
 public static final Schema SUBSCRIPTION_V0 = new Schema(
-new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
-new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
+new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING), "Topics 
consumer subscribes to."),
+new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES, "Custom user 
data sent to partition assignor."));
 public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema(
-new Field(TOPIC_KEY_NAME, Type.STRING),
-new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32)));
+new Field(TOPIC_KEY_NAME, Type.STRING, "Topic consumer subscribed 
to."),
+new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32), 
"Partitions of the topic which are assigned to consumer."));
 public static final Schema ASSIGNMENT_V0 = new Schema(
-new Field(TOPIC_PARTITIONS_KEY_NAME, new 
ArrayOf(TOPIC_ASSIGNMENT_V0)),
-new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
+new Field(TOPIC_PARTITIONS_KEY_NAME, new 
ArrayOf(TOPIC_ASSIGNMENT_V0), "Topic partitions assigned to consumer."),
+new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES, "Custom user 
data sent by partition assignor."));
 
 public static ByteBuffer 
serializeSubscription(PartitionAssignor.Subscription subscription) {
 Struct struct = new Struct(SUBSCRIPTION_V0);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
index 14b39ae9dac..8fd92632b11 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -51,14 +51,14 @@
 new Field(CONFIG_VALUE, NULLABLE_STRING, "Configuration value"));
 
 private static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
-new Field(RESOURCE_TYPE_KEY_NAME, INT8),
-new Field(RESOURCE_NAME_KEY_NAME, STRING),
-new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY)));
+new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Id of the resource type 
to alter configuration of. Value 2 means topic, 4 means broker."),
+new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of the resource to 
alter configuration of."),
+new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY), 
"Configuration entries to alter."));
 
 private static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema(
 new Field(RESOURCES_KEY_NAME, new 
ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0),
 "An array of resources to update with the provided 
configs."),
-new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN));
+new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN, "If true, only 
validation takes place and the changes are not applied."));
 
 public static Schema[] schemaVersions() {
 return new Schema[] {ALTER_CONFIGS_REQUEST_V0};
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
index f292ef6731b..337c391f086 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
+++ 
b/client

[jira] [Created] (KAFKA-7691) Encypt-then-MAC Delegation token metadata

2018-11-30 Thread Attila Sasvari (JIRA)
Attila Sasvari created KAFKA-7691:
-

 Summary: Encypt-then-MAC Delegation token metadata
 Key: KAFKA-7691
 URL: https://issues.apache.org/jira/browse/KAFKA-7691
 Project: Kafka
  Issue Type: Improvement
Reporter: Attila Sasvari


Currently delegation token metadata is stored unencrypted in Zookeeper.

Kafka brokers could implement a strategy called 
[Encrypt-then-MAC|https://en.wikipedia.org/wiki/Authenticated_encryption#Encrypt-then-MAC_(EtM)]
 to encrypt sensitive metadata information about delegation tokens.

For more details, please read 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-395%3A+Encypt-then-MAC+Delegation+token+metadata



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6477) Add Support for Quorum-based Producer Acknowledgment

2018-11-30 Thread Corentin Chary (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704674#comment-16704674
 ] 

Corentin Chary commented on KAFKA-6477:
---

I meant "I would be interested to try any patch" :) I'll follow what is 
happening in this area and see if I can manage to find some time to help

> Add Support for Quorum-based Producer Acknowledgment
> 
>
> Key: KAFKA-6477
> URL: https://issues.apache.org/jira/browse/KAFKA-6477
> Project: Kafka
>  Issue Type: New Feature
>  Components: controller, producer 
>Reporter: Litao Deng
>Priority: Major
>
> Hey folks. I would like to add a feature to support the quorum-based 
> acknowledgment for the producer request. We have been running a modified 
> version of Kafka on our testing cluster for weeks, the improvement of P999 is 
> significant with very stable latency. Additionally, I have a proposal to 
> achieve a similar data durability as with the insync.replicas-based 
> acknowledgment through LEO-based leader election.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-30 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704760#comment-16704760
 ] 

Ismael Juma commented on KAFKA-7531:


Which Java version are you using? The Java code where the NPE is happening is 
seemingly:
{code:java}
return (obj == null) ? "null" : obj.toString();{code}
So, it's puzzling.

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
> Attachments: server.log.2018-11-29-16.gz
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Threa

[jira] [Commented] (KAFKA-7676) Compile Kafka failed with Gradle 5.0

2018-11-30 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704800#comment-16704800
 ] 

Ismael Juma commented on KAFKA-7676:


We are using Gradle 5.0 in trunk now, so I'm going to close this. If you are 
still seeing an issue, please reopen and provide more details.

> Compile Kafka failed with Gradle 5.0
> 
>
> Key: KAFKA-7676
> URL: https://issues.apache.org/jira/browse/KAFKA-7676
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangtao Liu
>Priority: Minor
>
> With the version described in README [Kafka requires Gradle 4.7 or 
> higher|[https://github.com/apache/kafka]]
> it'was failed with the following compiling error when I tried to compile 
> Kafka source code with Gradle 5.
> {noformat}
> org.gradle.api.ProjectConfigurationException: A problem occurred configuring 
> root project 'kafka'.
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator.wrapException(LifecycleProjectEvaluator.java:80)
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator.addConfigurationFailure(LifecycleProjectEvaluator.java:73)
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator.access$600(LifecycleProjectEvaluator.java:54)
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator$EvaluateProject$1.run(LifecycleProjectEvaluator.java:109)
> at org.gradle.internal.Factories$1.create(Factories.java:25)
> at 
> org.gradle.internal.work.DefaultWorkerLeaseService.withLocks(DefaultWorkerLeaseService.java:183)
> at 
> org.gradle.internal.work.StopShieldingWorkerLeaseService.withLocks(StopShieldingWorkerLeaseService.java:40)
> at 
> org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withProjectLock(DefaultProjectStateRegistry.java:226)
> at 
> org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withMutableState(DefaultProjectStateRegistry.java:220)
> at 
> org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withMutableState(DefaultProjectStateRegistry.java:186)
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator$EvaluateProject.run(LifecycleProjectEvaluator.java:96)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:301)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:293)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:175)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:91)
> at 
> org.gradle.internal.operations.DelegatingBuildOperationExecutor.run(DelegatingBuildOperationExecutor.java:31)
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator.evaluate(LifecycleProjectEvaluator.java:68)
> at 
> org.gradle.api.internal.project.DefaultProject.evaluate(DefaultProject.java:687)
> at 
> org.gradle.api.internal.project.DefaultProject.evaluate(DefaultProject.java:140)
> at 
> org.gradle.execution.TaskPathProjectEvaluator.configure(TaskPathProjectEvaluator.java:35)
> at 
> org.gradle.execution.TaskPathProjectEvaluator.configureHierarchy(TaskPathProjectEvaluator.java:60)
> at 
> org.gradle.configuration.DefaultBuildConfigurer.configure(DefaultBuildConfigurer.java:41)
> at 
> org.gradle.initialization.DefaultGradleLauncher$ConfigureBuild.run(DefaultGradleLauncher.java:286)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:301)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:293)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:175)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:91)
> at 
> org.gradle.internal.operations.DelegatingBuildOperationExecutor.run(DelegatingBuildOperationExecutor.java:31)
> at 
> org.gradle.initialization.DefaultGradleLauncher.configureBuild(DefaultGradleLauncher.java:194)
> at 
> org.gradle.initialization.DefaultGradleLauncher.doBuildStages(DefaultGradleLauncher.java:150)
> at 
> org.gradle.initialization.DefaultGradleLauncher.getConfiguredBuild(DefaultGradleLauncher.java:128)
> at 
> org.gradle.internal.invocation.GradleBuildController$2.execute(GradleBuildController.java:88)
> at 
> org.gradle.internal.invocation.GradleBuildController$2.execute(GradleBuildController.java:85)
> at 
> org.gradle.internal.invocation.GradleBuildController$3.create(GradleBuildController.java:103)
> at 
> org.gradle.internal.invocation.GradleBuildContr

[jira] [Resolved] (KAFKA-7676) Compile Kafka failed with Gradle 5.0

2018-11-30 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-7676.

Resolution: Cannot Reproduce

> Compile Kafka failed with Gradle 5.0
> 
>
> Key: KAFKA-7676
> URL: https://issues.apache.org/jira/browse/KAFKA-7676
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangtao Liu
>Priority: Minor
>
> With the version described in README [Kafka requires Gradle 4.7 or 
> higher|[https://github.com/apache/kafka]]
> it'was failed with the following compiling error when I tried to compile 
> Kafka source code with Gradle 5.
> {noformat}
> org.gradle.api.ProjectConfigurationException: A problem occurred configuring 
> root project 'kafka'.
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator.wrapException(LifecycleProjectEvaluator.java:80)
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator.addConfigurationFailure(LifecycleProjectEvaluator.java:73)
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator.access$600(LifecycleProjectEvaluator.java:54)
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator$EvaluateProject$1.run(LifecycleProjectEvaluator.java:109)
> at org.gradle.internal.Factories$1.create(Factories.java:25)
> at 
> org.gradle.internal.work.DefaultWorkerLeaseService.withLocks(DefaultWorkerLeaseService.java:183)
> at 
> org.gradle.internal.work.StopShieldingWorkerLeaseService.withLocks(StopShieldingWorkerLeaseService.java:40)
> at 
> org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withProjectLock(DefaultProjectStateRegistry.java:226)
> at 
> org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withMutableState(DefaultProjectStateRegistry.java:220)
> at 
> org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withMutableState(DefaultProjectStateRegistry.java:186)
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator$EvaluateProject.run(LifecycleProjectEvaluator.java:96)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:301)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:293)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:175)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:91)
> at 
> org.gradle.internal.operations.DelegatingBuildOperationExecutor.run(DelegatingBuildOperationExecutor.java:31)
> at 
> org.gradle.configuration.project.LifecycleProjectEvaluator.evaluate(LifecycleProjectEvaluator.java:68)
> at 
> org.gradle.api.internal.project.DefaultProject.evaluate(DefaultProject.java:687)
> at 
> org.gradle.api.internal.project.DefaultProject.evaluate(DefaultProject.java:140)
> at 
> org.gradle.execution.TaskPathProjectEvaluator.configure(TaskPathProjectEvaluator.java:35)
> at 
> org.gradle.execution.TaskPathProjectEvaluator.configureHierarchy(TaskPathProjectEvaluator.java:60)
> at 
> org.gradle.configuration.DefaultBuildConfigurer.configure(DefaultBuildConfigurer.java:41)
> at 
> org.gradle.initialization.DefaultGradleLauncher$ConfigureBuild.run(DefaultGradleLauncher.java:286)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:301)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:293)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:175)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:91)
> at 
> org.gradle.internal.operations.DelegatingBuildOperationExecutor.run(DelegatingBuildOperationExecutor.java:31)
> at 
> org.gradle.initialization.DefaultGradleLauncher.configureBuild(DefaultGradleLauncher.java:194)
> at 
> org.gradle.initialization.DefaultGradleLauncher.doBuildStages(DefaultGradleLauncher.java:150)
> at 
> org.gradle.initialization.DefaultGradleLauncher.getConfiguredBuild(DefaultGradleLauncher.java:128)
> at 
> org.gradle.internal.invocation.GradleBuildController$2.execute(GradleBuildController.java:88)
> at 
> org.gradle.internal.invocation.GradleBuildController$2.execute(GradleBuildController.java:85)
> at 
> org.gradle.internal.invocation.GradleBuildController$3.create(GradleBuildController.java:103)
> at 
> org.gradle.internal.invocation.GradleBuildController$3.create(GradleBuildController.java:96)
> at 
> org.gradle.internal.work.DefaultWorkerLeaseService.withLocks(DefaultWorkerLeaseService.java:183)
> at 
> org.g

[jira] [Updated] (KAFKA-7389) Upgrade spotBugs for Java 11 support

2018-11-30 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7389:
---
Fix Version/s: 2.1.1

> Upgrade spotBugs for Java 11 support
> 
>
> Key: KAFKA-7389
> URL: https://issues.apache.org/jira/browse/KAFKA-7389
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> KAFKA-5887 replaces findBugs with spotBugs adding support for Java 9 and 10. 
> However, Java 11 is not supported in spotbugs 3.1.5.
> Once this is fixed, we also need to update the build to enable spotBugs when 
> executed with Java 11 and we need to update the relevant Jenkins jobs to 
> execute spotBugs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2018-11-30 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704882#comment-16704882
 ] 

Ismael Juma commented on KAFKA-5146:


[~tom.dewolf] can you please paste a stacktrace of the error you see with Java 
11?

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael Noll
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7691) Encypt-then-MAC Delegation token metadata

2018-11-30 Thread Attila Sasvari (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Attila Sasvari reassigned KAFKA-7691:
-

Assignee: Attila Sasvari

> Encypt-then-MAC Delegation token metadata
> -
>
> Key: KAFKA-7691
> URL: https://issues.apache.org/jira/browse/KAFKA-7691
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Attila Sasvari
>Assignee: Attila Sasvari
>Priority: Major
>
> Currently delegation token metadata is stored unencrypted in Zookeeper.
> Kafka brokers could implement a strategy called 
> [Encrypt-then-MAC|https://en.wikipedia.org/wiki/Authenticated_encryption#Encrypt-then-MAC_(EtM)]
>  to encrypt sensitive metadata information about delegation tokens.
> For more details, please read 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-395%3A+Encypt-then-MAC+Delegation+token+metadata



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7685) Support loading trust stores from classpath

2018-11-30 Thread Noa Resare (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705040#comment-16705040
 ] 

Noa Resare commented on KAFKA-7685:
---

There is now a KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-398

> Support loading trust stores from classpath
> ---
>
> Key: KAFKA-7685
> URL: https://issues.apache.org/jira/browse/KAFKA-7685
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Noa Resare
>Priority: Minor
>
> Certificate pinning as well as authenticating kafka brokers using a 
> non-public CA certificate maintained inside an organisation is desirable to a 
> lot of users. This can be accomplished today using the 
> {{ssl.truststore.location}} configuration property. Unfortunately, this value 
> is always interpreted as a filesystem path which makes distribution of such 
> an alternative truststore a needlessly cumbersome process. If we had the 
> ability to load a trust store from the classpath as well as from a file, the 
> trust store could be shipped in a jar that could be declared as a regular 
> maven style dependency.
> If we did this by supporting prefixing {{ssl.truststore.location}} with 
> {{classpath:}} this could be a backwards compatible change, one that builds 
> on prior design patterns established by for example the Spring project.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-11-30 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705187#comment-16705187
 ] 

Matthias J. Sax commented on KAFKA-7580:


{quote}with or without any changes to the code base 
{quote}
For this case, feel free to open an PR – it's seems to be two different issues 
– we should still try to figure out the root cause for "pure virtual method" 
error, but there is no reason to block this Jira because of it.

Not sure about `LogCleanerParameterizedIntegrationTest` – in general, we suffer 
from flaky tests :( (it's an continuous effort to try keeping the tests 
stable). If you can reproduce an error reliably, it's work to open a ticket 
just for this single test with details how to reproduce it. It could depend on 
your environment – similar to the "pure virtual method" – maybe the used 
RocksDB version is not compatible with your Ubuntu version.

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  dr

[jira] [Resolved] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7660.
--
   Resolution: Fixed
 Assignee: John Roesler
Fix Version/s: 2.2.0

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Assignee: John Roesler
>Priority: Minor
> Fix For: 2.2.0
>
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705199#comment-16705199
 ] 

Guozhang Wang commented on KAFKA-7660:
--

Glad it works. I can help resolve the ticket, since the patch is merged into 
trunk the fix version would be 2.2.0.

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Fix For: 2.2.0
>
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7093) Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0

2018-11-30 Thread Gwen Shapira (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705220#comment-16705220
 ] 

Gwen Shapira commented on KAFKA-7093:
-

It is possible that https://issues.apache.org/jira/browse/KAFKA-7415 fixes this 
issue.

I didn't reproduce the problem, but KAFKA-7415 does clean the outdated offsets 
from cache and therefore should cause the warning to only appear once per 
partition.

If someone runs into this and can try to upgrade...  would be good to get 
validation.

> Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0
> 
>
> Key: KAFKA-7093
> URL: https://issues.apache.org/jira/browse/KAFKA-7093
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.1.0
>Reporter: Suleyman
>Priority: Major
>
> I upgraded to kafka version from 0.11.0.1 to 1.1.0. After the upgrade, I'm 
> getting the below warn message too much.
> WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. 
> This implies messages have arrived out of order. New: \{epoch:0, 
> offset:793868383}, Current: \{epoch:4, offset:792201264} for Partition: 
> __consumer_offsets-42 (kafka.server.epoch.LeaderEpochFileCache) 
> How can I resolve this warn messages? And why I'm getting this warn messages?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705256#comment-16705256
 ] 

John Roesler commented on KAFKA-7660:
-

Hey [~pkleindl],

If you have a chance to repeat the original memory analysis, I'm wondering if 
the duplicate strings are still listed as an issue, or if they were a 
consequence somehow of the leaked references. 

Thanks again for identifying this, diagnosing it, and verifying it. It's really 
nice to get these fixes in.

-John

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Assignee: John Roesler
>Priority: Minor
> Fix For: 2.2.0
>
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705312#comment-16705312
 ] 

ASF GitHub Bot commented on KAFKA-7660:
---

vvcephei opened a new pull request #5979: KAFKA-7660: fix streams and Metrics 
memory leaks.
URL: https://github.com/apache/kafka/pull/5979
 
 
   Backport two memory-leak fixes (#5974 and #5953)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Assignee: John Roesler
>Priority: Minor
> Fix For: 2.2.0
>
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7688) Allow byte array class for Decimal Logical Types to fix Debezium Issues

2018-11-30 Thread Eric C Abis (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eric C Abis updated KAFKA-7688:
---
Description: 
Decimal Logical Type fields are failing with Kafka Connect sink tasks and 
showing this error:
{code:java}
Invalid Java object for schema type BYTES: class [B for field: "null"{code}
There is an issue tracker for the problem here in the Confluent Schema Registry 
tracker (it's all related):  
[https://github.com/confluentinc/schema-registry/issues/833]

I've created a fix for this issue and tested and verified it in our CF4 cluster 
here at Shutterstock.

The issue boils down to the fact that Avro Decimal Logical types store values 
as a Byte Arrays. Debezium sets the Default Value as Base64 encoded Byte Arrays 
and record values as Big Integer Byte Arrays.    I'd like to submit a PR that 
changes the SCHEMA_TYPE_CLASSES hash map in 
org.apache.kafka.connect.data.ConnectSchema to allow Byte Arrays for Decimal 
fields. 

I reached out [to us...@kafka.apache.org|mailto:to%c2%a0us...@kafka.apache.org] 
to ask for GitHub permissions but if there is somewhere else I need to reach 
out to please let me know.

My GitHub user is TheGreatAbyss

Thank You!

Eric

  was:
Decimal Logical Type fields are failing with Kafka Connect sink tasks and 
showing this error:
{code:java}
Invalid Java object for schema type BYTES: class [B for field: "null"{code}
There is an issue tracker for the problem here in the Confluent Schema Registry 
tracker (it's all related):  
[https://github.com/confluentinc/schema-registry/issues/833]

I've created a fix for this issue and tested and verified it in our CF4 cluster 
here at Shutterstock.

Ultimately the issue boils down to the fact that in Avro, Decimal Logical types 
store values as a Base64 encoded Byte Arrays for the default values, and 
BigInteger Byte Arrays for the record values.   I'd like to submit a PR that 
changes the SCHEMA_TYPE_CLASSES hash map in 
org.apache.kafka.connect.data.ConnectSchema to allow Byte Arrays for Decimal 
fields. 

Separately I have a similar change in{color:#33} 
[io.confluent.connect.avro.AvroData|https://github.com/TheGreatAbyss/schema-registry/pull/1/files#diff-ac149179f9760319ccc772695cb21364]
 that I will submit a PR for as well.{color}

I reached out [to us...@kafka.apache.org|mailto:to%c2%a0us...@kafka.apache.org] 
to ask for GitHub permissions but if there is somewhere else I need to reach 
out to please let me know.

My GitHub user is TheGreatAbyss

Thank You!

Eric


> Allow byte array class for Decimal Logical Types to fix Debezium Issues
> ---
>
> Key: KAFKA-7688
> URL: https://issues.apache.org/jira/browse/KAFKA-7688
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.1
>Reporter: Eric C Abis
>Priority: Blocker
> Fix For: 1.1.1
>
>
> Decimal Logical Type fields are failing with Kafka Connect sink tasks and 
> showing this error:
> {code:java}
> Invalid Java object for schema type BYTES: class [B for field: "null"{code}
> There is an issue tracker for the problem here in the Confluent Schema 
> Registry tracker (it's all related):  
> [https://github.com/confluentinc/schema-registry/issues/833]
> I've created a fix for this issue and tested and verified it in our CF4 
> cluster here at Shutterstock.
> The issue boils down to the fact that Avro Decimal Logical types store values 
> as a Byte Arrays. Debezium sets the Default Value as Base64 encoded Byte 
> Arrays and record values as Big Integer Byte Arrays.    I'd like to submit a 
> PR that changes the SCHEMA_TYPE_CLASSES hash map in 
> org.apache.kafka.connect.data.ConnectSchema to allow Byte Arrays for Decimal 
> fields. 
> I reached out [to 
> us...@kafka.apache.org|mailto:to%c2%a0us...@kafka.apache.org] to ask for 
> GitHub permissions but if there is somewhere else I need to reach out to 
> please let me know.
> My GitHub user is TheGreatAbyss
> Thank You!
> Eric



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705417#comment-16705417
 ] 

ASF GitHub Bot commented on KAFKA-7660:
---

vvcephei opened a new pull request #5982: KAFKA-7660: fix streams and Metrics 
memory leaks
URL: https://github.com/apache/kafka/pull/5982
 
 
   Backport two memory-leak fixes (#5974 and #5953) (see also 2.1: #5979, 2.0: 
#5980, 1.1: #5981  )
   
   It looks like we already had the parentSensors fix in 1.0 and lost it in 
2.0. The change in
   this PR just tidies it up a little for consistency with later branches.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Assignee: John Roesler
>Priority: Minor
> Fix For: 2.2.0
>
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705422#comment-16705422
 ] 

ASF GitHub Bot commented on KAFKA-7660:
---

vvcephei opened a new pull request #5983: KAFKA-7660: fix streams and Metrics 
memory leaks
URL: https://github.com/apache/kafka/pull/5983
 
 
   Backport two memory-leak fixes (#5974 and #5953) (see also 2.1: #5979, 2.0: 
#5980, 1.1: #5981, 1.0: #5982  )
   
   It looks like we already had the parentSensors fix in 1.0 and lost it in 
2.0. The change in
   this PR just tidies it up a little for consistency with later branches.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Assignee: John Roesler
>Priority: Minor
> Fix For: 2.2.0
>
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705436#comment-16705436
 ] 

ASF GitHub Bot commented on KAFKA-7660:
---

vvcephei opened a new pull request #5984: KAFKA-7660: fix streams and Metrics 
memory leaks
URL: https://github.com/apache/kafka/pull/5984
 
 
   Backport two memory-leak fixes (#5974 and #5953) (see also 2.1: #5979, 2.0: 
#5980, 1.1: #5981, 1.0: #5982, 0.11: #5983  )
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Assignee: John Roesler
>Priority: Minor
> Fix For: 2.2.0
>
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6833) KafkaProducer throws "Invalid partition given with record" exception

2018-11-30 Thread Chris Egerton (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705465#comment-16705465
 ] 

Chris Egerton commented on KAFKA-6833:
--

[~bob-barrett] another possible approach could be to alter the behavior of the 
producer while it waits for metadata on a topic-partition.

Currently, when a record is given to the producer to send, it checks its 
metadata cache and, if the topic-partition exists in the cache, [the cached 
data is 
used|https://github.com/apache/kafka/blob/9b476bc5f4a2fdbd62ad84e50e65331c21b321d0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L967-L968].

If the topic doesn't exist or that partition doesn't exist in the topic, the 
producer requests a metadata update and then waits until either 
{{max.block.ms}} has been exceeded or until an update has occurred that results 
in the [topic being 
created|https://github.com/apache/kafka/blob/9b476bc5f4a2fdbd62ad84e50e65331c21b321d0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L998].

We could tweak the logic so that the producer waits until either 
{{max.block.ms}} has been exceeded or until the topic-partition exists.

This would relieve users of the KafkaProducer from having to handle retry logic 
for the suggested {{PartitionOutOfRangeException}}, although it could also 
potentially lead to slower failures when trying to send a record to an invalid 
partition.

> KafkaProducer throws "Invalid partition given with record" exception
> 
>
> Key: KAFKA-6833
> URL: https://issues.apache.org/jira/browse/KAFKA-6833
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Arjun Satish
>Assignee: Bob Barrett
>Priority: Minor
>
> Currently, when creating topics via ZooKeeper, there is a small but definite 
> delay between creating the nodes in ZK, and having the topics created in the 
> brokers. the KafkaProducer maintains a metadata cache about topics which get 
> updated after the broker metadata is updated. If an application adds 
> partitions to a topic, and immediately tries to produce records to a new 
> partition, a KafkaException is throw with a message similar to the following:
> {code:java}
> Caused by: org.apache.kafka.common.KafkaException: Invalid partition given 
> with record: 12 is not in the range [0...1).
> {code}
> In this case, since the application has context that it created the topics, 
> it might be worthwhile to consider if a more specific exception can be thrown 
> instead of KafkaException. For example:
> {code:java}
> public class PartitionNotFoundException extends KafkaException {...}{code}
> This could allow the application to be able to interpret such an error, and 
> act accordingly.
> EDIT: Correct "create topics" to "adds partitions to a topic".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6833) KafkaProducer throws "Invalid partition given with record" exception

2018-11-30 Thread Chris Egerton (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705465#comment-16705465
 ] 

Chris Egerton edited comment on KAFKA-6833 at 11/30/18 11:43 PM:
-

[~bob-barrett] another possible approach could be to alter the behavior of the 
producer while it waits for metadata on a topic-partition.

Currently, when a record is given to the producer to send, it checks its 
metadata cache and, if the topic-partition exists in the cache, [the cached 
data is 
used|https://github.com/apache/kafka/blob/9b476bc5f4a2fdbd62ad84e50e65331c21b321d0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L967-L968].

If the topic doesn't exist or that partition doesn't exist in the topic, the 
producer requests a metadata update and then waits until either 
{{max.block.ms}} has been exceeded or until an update has occurred that 
guarantees that the [topic 
exists|https://github.com/apache/kafka/blob/9b476bc5f4a2fdbd62ad84e50e65331c21b321d0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L998].

We could tweak the logic so that the producer waits until either 
{{max.block.ms}} has been exceeded or until the topic-partition exists.

This would relieve users of the KafkaProducer from having to handle retry logic 
for the suggested {{PartitionOutOfRangeException}}, although it could also 
potentially lead to slower failures when trying to send a record to an invalid 
partition.


was (Author: chrisegerton):
[~bob-barrett] another possible approach could be to alter the behavior of the 
producer while it waits for metadata on a topic-partition.

Currently, when a record is given to the producer to send, it checks its 
metadata cache and, if the topic-partition exists in the cache, [the cached 
data is 
used|https://github.com/apache/kafka/blob/9b476bc5f4a2fdbd62ad84e50e65331c21b321d0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L967-L968].

If the topic doesn't exist or that partition doesn't exist in the topic, the 
producer requests a metadata update and then waits until either 
{{max.block.ms}} has been exceeded or until an update has occurred that results 
in the [topic being 
created|https://github.com/apache/kafka/blob/9b476bc5f4a2fdbd62ad84e50e65331c21b321d0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L998].

We could tweak the logic so that the producer waits until either 
{{max.block.ms}} has been exceeded or until the topic-partition exists.

This would relieve users of the KafkaProducer from having to handle retry logic 
for the suggested {{PartitionOutOfRangeException}}, although it could also 
potentially lead to slower failures when trying to send a record to an invalid 
partition.

> KafkaProducer throws "Invalid partition given with record" exception
> 
>
> Key: KAFKA-6833
> URL: https://issues.apache.org/jira/browse/KAFKA-6833
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Arjun Satish
>Assignee: Bob Barrett
>Priority: Minor
>
> Currently, when creating topics via ZooKeeper, there is a small but definite 
> delay between creating the nodes in ZK, and having the topics created in the 
> brokers. the KafkaProducer maintains a metadata cache about topics which get 
> updated after the broker metadata is updated. If an application adds 
> partitions to a topic, and immediately tries to produce records to a new 
> partition, a KafkaException is throw with a message similar to the following:
> {code:java}
> Caused by: org.apache.kafka.common.KafkaException: Invalid partition given 
> with record: 12 is not in the range [0...1).
> {code}
> In this case, since the application has context that it created the topics, 
> it might be worthwhile to consider if a more specific exception can be thrown 
> instead of KafkaException. For example:
> {code:java}
> public class PartitionNotFoundException extends KafkaException {...}{code}
> This could allow the application to be able to interpret such an error, and 
> act accordingly.
> EDIT: Correct "create topics" to "adds partitions to a topic".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7662) Avro schema upgrade not supported on globalTable

2018-11-30 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7662:
-
Labels: user-experience  (was: )

> Avro schema upgrade not supported on globalTable 
> -
>
> Key: KAFKA-7662
> URL: https://issues.apache.org/jira/browse/KAFKA-7662
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Frederic Tardif
>Priority: Major
>  Labels: user-experience
> Attachments: avro-registry-http.txt, kafka-avro-upgrade-testing.zip
>
>
> I did quite a bit of testing around the avro upgrades, and it did not behave 
> as I would have expected when the avro is used as a Key for a global table 
> with a rocksDB store
> setup:
>  * local confluent suite 4.0.2
>  * test with stream app and producer (v 1.0.0)
>  * schemas (key) :
> schemas :
> {code:java}
> schema version @1
> {
> "namespace": "com.bell.cts.livecms.livemedia.topic",
> "type" : "record",
> "name" : "EventKey",
> "fields" : [
> {"name" : "keyAttribute1", "type" : "string"}
> ]
> }
> schema version @2
> {
> "namespace": "com.bell.cts.livecms.livemedia.topic",
> "type" : "record",
> "name" : "EventKey",
> "fields" : [
> {"name" : "keyAttribute1", "type" : "string"},
> {"name" : "keyAttribute2", "type" : ["null", "string"], "default" : null}
> ]
> }{code}
>  
>  * TEST1 (PASS)
>  ** using schema version @1 
>  ** produce record1=[k@1, v@1] 
>  ** stream apps loads record1 in global table and store locally in rocksdb 
>  ** asyncAssert that store.get(k@1)=v@1 : PASS
>  * TEST2 (PASS)
>  ** using schema version @1
>  ** delete local store (and checkpoint)
>  ** stream apps loads record1 in global table and store locally in rocksdb
>  ** asyncAssert that store.get(k@1)=v@1 : PASS
>  * TEST3 (FAIL)
>  ** using schema version @2 
>  ** keep local store
>  ** stream apps does not reload record1 from topic because of local offset
>  ** asyncAssert that store.get(k@1)=v@1 : FAIL
>  ** however store.all().next().key.equals(k@2) , as built using schema 
> version 2
>  ** this would be explained by the fact that the rocksdb store has some magic 
> byte persisted of the record based on schema version 1
>  ** Not ideal, but I could consider accceptable to delete the local store in 
> this cases.
>  * TEST4 (FAIL)
>  ** using schema version @2
>  ** delete local store (and checkpoint)
>  ** stream apps loads record1 (produced from schema @1) in global table and 
> store locally in rocksdb
>  ** asyncAssert that store.get(k@2)=v@2 : FAIL
>  ** however store.all().next().key.equals(k@2) , as built using schema 
> version 2
>  ** I can't quite understand this one. I would have expected that the rockdb 
> store should now be provisioned with a serialized version of the record based 
> on the schema v2 (as it went though the stream app underpinning the store 
> materialization)
>  * TEST5 (FAIL)
>  ** using schema version @2 
>  ** produce record2=[k@2, v@2] (meant to be backward compatible and logically 
> equals to record1) 
>  ** stream apps does the processing of record1(produced from schema @1) and 
> record2 (produced from schema @2) and materialize the global table stored 
> locally in rocksdb
>  ** asyncAssert that store.get(k@2)=v@2 : PASS but the store now has 2 
> entries !!!
>  ** it looks as if the stream.groupBy(key) of the topic underpinning the 
> globaltable materialization did not group the 2 record keys together, 
> although record1.key.equals(record2.key) is true in Java (by looping in the 
> store)
> reading from the upstream raw topic throughout the testing :
> {code:java}
> /tmp$ kafka-avro-console-consumer --topic topic-test-5 --bootstrap-server 
> localhost:9092 --property schema.registry.url=http://127.0.0.1:8081 
> --property print.key=true --from-beginning 
> {"keyAttribute1":"key-attribute-1"}   {"valueAttribute1":"value-1"}
> {"keyAttribute1":"key-attribute-1"}   {"valueAttribute1":"value-1"}
> {"keyAttribute1":"key-attribute-1"}   {"valueAttribute1":"value-1"}
> {"keyAttribute1":"key-attribute-1","keyAttribute2":null}  
> {"valueAttribute1":"value-1"}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7662) Avro schema upgrade not supported on globalTable

2018-11-30 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705535#comment-16705535
 ] 

Guozhang Wang commented on KAFKA-7662:
--

[~frederic.tardif] I looked at your source code, and I think the main issue is 
on the Confluent Avro serializers you used:

{code}
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
{code}

Since it used a magic byte indicating the version, upgrading the schema would 
cause the serialized bytes do be different even with exactly the same data, and 
hence causing it to be sent to different partitions of the repartition topic 
and hence the observed failure.

Your code seems not including the actual topology builder part so I'm not sure 
if the above is exactly the case, but to verify it, consider using 
{{Produced#withStreamPartitioner}} to override the partitioner and use a 
customized through topic before the aggregation, i.e. sth. like 
"selecKey().though(Produced..).groupByKey().aggregate()" and see if it actually 
resolves the problem.

> Avro schema upgrade not supported on globalTable 
> -
>
> Key: KAFKA-7662
> URL: https://issues.apache.org/jira/browse/KAFKA-7662
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Frederic Tardif
>Priority: Major
>  Labels: user-experience
> Attachments: avro-registry-http.txt, kafka-avro-upgrade-testing.zip
>
>
> I did quite a bit of testing around the avro upgrades, and it did not behave 
> as I would have expected when the avro is used as a Key for a global table 
> with a rocksDB store
> setup:
>  * local confluent suite 4.0.2
>  * test with stream app and producer (v 1.0.0)
>  * schemas (key) :
> schemas :
> {code:java}
> schema version @1
> {
> "namespace": "com.bell.cts.livecms.livemedia.topic",
> "type" : "record",
> "name" : "EventKey",
> "fields" : [
> {"name" : "keyAttribute1", "type" : "string"}
> ]
> }
> schema version @2
> {
> "namespace": "com.bell.cts.livecms.livemedia.topic",
> "type" : "record",
> "name" : "EventKey",
> "fields" : [
> {"name" : "keyAttribute1", "type" : "string"},
> {"name" : "keyAttribute2", "type" : ["null", "string"], "default" : null}
> ]
> }{code}
>  
>  * TEST1 (PASS)
>  ** using schema version @1 
>  ** produce record1=[k@1, v@1] 
>  ** stream apps loads record1 in global table and store locally in rocksdb 
>  ** asyncAssert that store.get(k@1)=v@1 : PASS
>  * TEST2 (PASS)
>  ** using schema version @1
>  ** delete local store (and checkpoint)
>  ** stream apps loads record1 in global table and store locally in rocksdb
>  ** asyncAssert that store.get(k@1)=v@1 : PASS
>  * TEST3 (FAIL)
>  ** using schema version @2 
>  ** keep local store
>  ** stream apps does not reload record1 from topic because of local offset
>  ** asyncAssert that store.get(k@1)=v@1 : FAIL
>  ** however store.all().next().key.equals(k@2) , as built using schema 
> version 2
>  ** this would be explained by the fact that the rocksdb store has some magic 
> byte persisted of the record based on schema version 1
>  ** Not ideal, but I could consider accceptable to delete the local store in 
> this cases.
>  * TEST4 (FAIL)
>  ** using schema version @2
>  ** delete local store (and checkpoint)
>  ** stream apps loads record1 (produced from schema @1) in global table and 
> store locally in rocksdb
>  ** asyncAssert that store.get(k@2)=v@2 : FAIL
>  ** however store.all().next().key.equals(k@2) , as built using schema 
> version 2
>  ** I can't quite understand this one. I would have expected that the rockdb 
> store should now be provisioned with a serialized version of the record based 
> on the schema v2 (as it went though the stream app underpinning the store 
> materialization)
>  * TEST5 (FAIL)
>  ** using schema version @2 
>  ** produce record2=[k@2, v@2] (meant to be backward compatible and logically 
> equals to record1) 
>  ** stream apps does the processing of record1(produced from schema @1) and 
> record2 (produced from schema @2) and materialize the global table stored 
> locally in rocksdb
>  ** asyncAssert that store.get(k@2)=v@2 : PASS but the store now has 2 
> entries !!!
>  ** it looks as if the stream.groupBy(key) of the topic underpinning the 
> globaltable materialization did not group the 2 record keys together, 
> although record1.key.equals(record2.key) is true in Java (by looping in the 
> store)
> reading from the upstream raw topic throughout the testing :
> {code:java}
> /tmp$ kafka-avro-console-consumer --topic topic-test-5 --bootstrap-server 
> localhost:9092 --property schema.registry.url=http://127.0.0.1:8081 
> --property print.key=true --from-beginning 
> {"keyAttribute1":"key-attribute-1"}   {"v

[jira] [Updated] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-30 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7678:
-
Labels: bug  (was: )

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
>  Labels: bug
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4748) Need a way to shutdown all workers in a Streams application at the same time

2018-11-30 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705560#comment-16705560
 ] 

Guozhang Wang commented on KAFKA-4748:
--

Just a heads up, we are working on KIP-345 which should resolve this issue. cc 
[~bchen225242]


> Need a way to shutdown all workers in a Streams application at the same time
> 
>
> Key: KAFKA-4748
> URL: https://issues.apache.org/jira/browse/KAFKA-4748
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Elias Levy
>Priority: Major
>
> If you have a fleet of Stream workers for an application and attempt to shut 
> them down simultaneously (e.g. via SIGTERM and 
> Runtime.getRuntime().addShutdownHook() and streams.close())), a large number 
> of the workers fail to shutdown.
> The problem appears to be a race condition between the shutdown signal and 
> the consumer rebalancing that is triggered by some of the workers existing 
> before others.  Apparently, workers that receive the signal later fail to 
> exit apparently as they are caught in the rebalance.
> Terminating workers in a rolling fashion is not advisable in some situations. 
>  The rolling shutdown will result in many unnecessary rebalances and may 
> fail, as the application may have large amount of local state that a smaller 
> number of nodes may not be able to store.
> It would appear that there is a need for a protocol change to allow the 
> coordinator to signal a consumer group to shutdown without leading to 
> rebalancing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7673) Upgrade RocksDB to include fix for WinEnvIO::GetSectorSize

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705563#comment-16705563
 ] 

ASF GitHub Bot commented on KAFKA-7673:
---

guozhangwang opened a new pull request #5985: KAFKA-7673: Upgrade rocksdb to 
5.15.10
URL: https://github.com/apache/kafka/pull/5985
 
 
   @bbejeck 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Upgrade RocksDB to include fix for WinEnvIO::GetSectorSize
> --
>
> Key: KAFKA-7673
> URL: https://issues.apache.org/jira/browse/KAFKA-7673
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Yanick Salzmann
>Priority: Major
> Fix For: 2.2.0
>
>
> The following fix would help making it possible to work with kafka streams in 
> Windows 7 (right now it is not possible to start an application using Kafka 
> Streams):
> [https://github.com/facebook/rocksdb/commit/9c7da963bc8b3df8f3ed3865f00dd7c483267ac0]
> According to the tags it would require an upgrade to one of the below 
> versions:
>  * [v5.17.2|https://github.com/facebook/rocksdb/releases/tag/v5.17.2]
>  * [v5.16.6|https://github.com/facebook/rocksdb/releases/tag/v5.16.6]
>  * [v5.15.10|https://github.com/facebook/rocksdb/releases/tag/v5.15.10]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4219) Permit setting of event time in stream processor

2018-11-30 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705564#comment-16705564
 ] 

Guozhang Wang commented on KAFKA-4219:
--

[~elevy] This issue is addressed in KIP-251: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API



> Permit setting of event time in stream processor
> 
>
> Key: KAFKA-4219
> URL: https://issues.apache.org/jira/browse/KAFKA-4219
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
> Fix For: 2.0.0
>
>
> Event time is assigned in stream sources via {{TimestampExtractor}}.  Once 
> the event time has been assigned, it remains the same, regardless of any 
> downstream processing in the topology.  This is insufficient for many 
> processing jobs, particularly when the output of the job is written back into 
> a Kafka topic, where the record's time is encoded outside of the record's 
> value.
> For instance:
> * When performing windowed aggregations it may be desirable for the timestamp 
> of the emitted record to be lower or higher limits of the time window, rather 
> than the timestamp of the last processed element, which may be anywhere 
> within the time window.
> * When joining two streams, it is non-deterministic which of the two record's 
> timestamps will be the timestamp of the emitted record.  It would be either 
> one depending on what order the records are processed.  Even where this 
> deterministic, it may be desirable for the emitted timestamp to be altogether 
> different from the timestamp of the joined records.  For instance, setting 
> the timestamp to the current processing time may be desirable.
> * In general, lower level processors may wish to set the timestamp of emitted 
> records to an arbitrary value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4219) Permit setting of event time in stream processor

2018-11-30 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4219.
--
   Resolution: Fixed
Fix Version/s: 2.0.0

> Permit setting of event time in stream processor
> 
>
> Key: KAFKA-4219
> URL: https://issues.apache.org/jira/browse/KAFKA-4219
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
> Fix For: 2.0.0
>
>
> Event time is assigned in stream sources via {{TimestampExtractor}}.  Once 
> the event time has been assigned, it remains the same, regardless of any 
> downstream processing in the topology.  This is insufficient for many 
> processing jobs, particularly when the output of the job is written back into 
> a Kafka topic, where the record's time is encoded outside of the record's 
> value.
> For instance:
> * When performing windowed aggregations it may be desirable for the timestamp 
> of the emitted record to be lower or higher limits of the time window, rather 
> than the timestamp of the last processed element, which may be anywhere 
> within the time window.
> * When joining two streams, it is non-deterministic which of the two record's 
> timestamps will be the timestamp of the emitted record.  It would be either 
> one depending on what order the records are processed.  Even where this 
> deterministic, it may be desirable for the emitted timestamp to be altogether 
> different from the timestamp of the joined records.  For instance, setting 
> the timestamp to the current processing time may be desirable.
> * In general, lower level processors may wish to set the timestamp of emitted 
> records to an arbitrary value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss

2018-11-30 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7672:
-
Labels: bug  (was: )

> The local state not fully restored after KafkaStream rebalanced, resulting in 
> data loss
> ---
>
> Key: KAFKA-7672
> URL: https://issues.apache.org/jira/browse/KAFKA-7672
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: linyue li
>Priority: Major
>  Labels: bug
> Fix For: 2.2.0, 2.1.1
>
>
> Normally, when a task is mitigated to a new thread and no checkpoint file was 
> found under its task folder, Kafka Stream needs to restore the local state 
> for remote changelog topic completely and then resume running. However, in 
> some scenarios, we found that Kafka Stream *NOT* restore this state even no 
> checkpoint was found, but just clean the state folder and transition to 
> running state directly, resulting the historic data loss. 
> To be specific, I will give the detailed logs for Kafka Stream in our project 
> to show this scenario: 
> {quote}2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to 
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting 
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to 
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread 
> AuditTrailBatch-StreamThread-1:
>  # the previous running task assigned to thread 1 is task 0_5 (the 
> corresponding partition is AuditTrailBatch-0-5)
>  # group begins to rebalance, the new task 1_1 is assigned to thread 1.
>  # no checkpoint was found under 1_1 state folder, so reset the offset to 0 
> and clean the local state folder.
>  # thread 1 transitions to RUNNING state directly without the restoration for 
> task 1_1, so the historic data for state 1_1 is lost for thread 1. 
> *ThoubleShoot*
> To investigate the cause for this issue, we analysis the source code in 
> KafkaStream and found the key is the variable named "completedRestorers".
> This is the definition of the variable:
> {code:java}
> private final Set completedRestorers = new HashSet<>();{code}
> Each thread object has its own completedRestorers, which is created in the 
> thread initialization, and not accessed crossly by other threads. The 
> completedRestorers is used to record the partitions 

[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705603#comment-16705603
 ] 

ASF GitHub Bot commented on KAFKA-6388:
---

apovzner opened a new pull request #5986: KAFKA-6388: Recover from rolling an 
empty segment that already exists
URL: https://github.com/apache/kafka/pull/5986
 
 
   There were several reported incidents where the log is rolled to a new 
segment with the same base offset as an active segment, causing KafkaException: 
Trying to roll a new log segment for topic partition X-N with start offset M 
while it already exists. From what I have investigated so far, this happens to 
an empty log segment where there is long idle time before the next append and 
somehow we get to a state where offsetIndex.isFull() returns true due to 
_maxEntries == 0. This PR recovers from the state where the active segment is 
empty and we try to roll to a new segment with the same offset: we delete 
segment and recreate it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork

[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-11-30 Thread Anna Povzner (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705605#comment-16705605
 ] 

Anna Povzner commented on KAFKA-6388:
-

>From what I have investigated so far, the issue happens when we roll an empty 
>log segment with the same base offset. This happens because offsetIndex 
>isFull() returns true as a result of  _maxEntries == 0. I haven't found the 
>cause of getting to this state. However, every time this happened, there was a 
>long idle time before an append. I opened PR to recover from this state – if 
>we roll an empty segment, we delete and recreate it. 
>[https://github.com/apache/kafka/pull/5986]

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
>

[jira] [Commented] (KAFKA-7160) Add check for group ID length

2018-11-30 Thread Tim Nguyen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705641#comment-16705641
 ] 

Tim Nguyen commented on KAFKA-7160:
---

Hi [~manme...@gmail.com] , are you still working on this task? If not, I can 
work on it

> Add check for group ID length
> -
>
> Key: KAFKA-7160
> URL: https://issues.apache.org/jira/browse/KAFKA-7160
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: lambdaliu
>Priority: Minor
>  Labels: newbie
>
> We should limit the length of the group ID, because other system(such as 
> monitor system) would use the group ID when we using  kafka in production 
> environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7390) Enable the find-sec-bugs spotBugs plugin for Gradle

2018-11-30 Thread Tim Nguyen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705640#comment-16705640
 ] 

Tim Nguyen commented on KAFKA-7390:
---

[~omkreddy] just a quick update, it seems like the find-sec-bugs plugin is 
consuming a lot of memory to do the static analysis to find bugs on my local. I 
tried to let it have 8GB in free memory and still hit either GC overhead issues 
or java heap space issues.

Here is my fork commit: 
[https://github.com/timmy2702/kafka/commit/2b3db41688738db1eadc14a6542fb5e3d796361e]

I'm not sure whether we should move along with this plugin or not as described 
in the task since Jenkins will probably fail due to memory issues

> Enable the find-sec-bugs spotBugs plugin for Gradle
> ---
>
> Key: KAFKA-7390
> URL: https://issues.apache.org/jira/browse/KAFKA-7390
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Tim Nguyen
>Priority: Major
>  Labels: newbie
>
> Once we switch to spotBugs (KAFKA-5887), we should try the find-sec-bugs 
> plugin that helps find security issues:
>  
> https://spotbugs.readthedocs.io/en/latest/gradle.html#introduce-spotbugs-plugin
> http://find-sec-bugs.github.io/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)