[jira] [Resolved] (KAFKA-6181) Examining log messages with {{--deep-iteration}} should show superset of fields

2020-11-20 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-6181.
---
Fix Version/s: 2.8.0
   Resolution: Fixed

> Examining log messages with {{--deep-iteration}} should show superset of 
> fields
> ---
>
> Key: KAFKA-6181
> URL: https://issues.apache.org/jira/browse/KAFKA-6181
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.11.0.0
>Reporter: Yeva Byzek
>Assignee: Prithvi
>Priority: Minor
>  Labels: newbie
> Fix For: 2.8.0
>
>
> Printing log data on Kafka brokers using {{kafka.tools.DumpLogSegments}}:
>  {{--deep-iteration}} should show a superset of fields in each message, as 
> compared to without this parameter, however some fields are missing. Impact: 
> users need to execute both commands to get the full set of fields.
> {noformat}
> kafka-run-class kafka.tools.DumpLogSegments \
> --print-data-log \
> --files .log
> Dumping .log
> Starting offset: 0
> baseOffset: 0 lastOffset: 35 baseSequence: -1 lastSequence: -1 producerId: -1 
> producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0 
> CreateTime: 1509987569448 isvalid: true size: 3985 magic: 2 compresscodec: 
> NONE crc: 4227905507
> {noformat}
> {noformat}
> kafka-run-class kafka.tools.DumpLogSegments \
> --print-data-log \
> --files .log \
> --deep-iteration
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 CreateTime: 1509987569420 isvalid: true keysize: -1 
> valuesize: 100
> magic: 2 compresscodec: NONE producerId: -1 sequence: -1 isTransactional: 
> false headerKeys: [] payload: 
> SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHW
> {noformat}
> Notice, for example, that {{partitionLeaderEpoch}} and {{crc}} are missing. 
> Print these and all missing fields.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9204: KAFKA-6181 Examining log messages with {{--deep-iteration}} should show superset of fields

2020-11-20 Thread GitBox


chia7712 commented on pull request #9204:
URL: https://github.com/apache/kafka/pull/9204#issuecomment-731525092


   @iprithv Thanks for you patch. I have assigned 
https://issues.apache.org/jira/browse/KAFKA-6181 to you :) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-6181) Examining log messages with {{--deep-iteration}} should show superset of fields

2020-11-20 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-6181:
-

Assignee: Prithvi

> Examining log messages with {{--deep-iteration}} should show superset of 
> fields
> ---
>
> Key: KAFKA-6181
> URL: https://issues.apache.org/jira/browse/KAFKA-6181
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.11.0.0
>Reporter: Yeva Byzek
>Assignee: Prithvi
>Priority: Minor
>  Labels: newbie
>
> Printing log data on Kafka brokers using {{kafka.tools.DumpLogSegments}}:
>  {{--deep-iteration}} should show a superset of fields in each message, as 
> compared to without this parameter, however some fields are missing. Impact: 
> users need to execute both commands to get the full set of fields.
> {noformat}
> kafka-run-class kafka.tools.DumpLogSegments \
> --print-data-log \
> --files .log
> Dumping .log
> Starting offset: 0
> baseOffset: 0 lastOffset: 35 baseSequence: -1 lastSequence: -1 producerId: -1 
> producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0 
> CreateTime: 1509987569448 isvalid: true size: 3985 magic: 2 compresscodec: 
> NONE crc: 4227905507
> {noformat}
> {noformat}
> kafka-run-class kafka.tools.DumpLogSegments \
> --print-data-log \
> --files .log \
> --deep-iteration
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 CreateTime: 1509987569420 isvalid: true keysize: -1 
> valuesize: 100
> magic: 2 compresscodec: NONE producerId: -1 sequence: -1 isTransactional: 
> false headerKeys: [] payload: 
> SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHW
> {noformat}
> Notice, for example, that {{partitionLeaderEpoch}} and {{crc}} are missing. 
> Print these and all missing fields.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #9204: KAFKA-6181 Examining log messages with {{--deep-iteration}} should show superset of fields

2020-11-20 Thread GitBox


chia7712 merged pull request #9204:
URL: https://github.com/apache/kafka/pull/9204


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-20 Thread GitBox


showuon edited a comment on pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#issuecomment-731520316


   @ableegoldman , thanks for pointing it out. After investigation, I found the 
test 
`StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownThreadUsingOldHandler`
 failed is because we set 2 stream threads for this test. So when we got the 
`uncaughtException`, we shutdown the thread, and **rebalancing** to the other 
thread. And we have to wait for rebalancing completes, and later another 
exception thrown in the thread, then the stream will turn into `ERROR` state, 
which is why it is so flaky. 
   
   I default set to 1 stream thread in this test since other tests will set to 
the expected thread number before testing.
   
   The fix is in this commit: 
https://github.com/apache/kafka/pull/9629/commits/e6d39f6dc15a198a5a58d34d239a1021eeaf43b7.
 Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-20 Thread GitBox


showuon edited a comment on pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#issuecomment-731520316


   @ableegoldman , thanks for pointing it out. After investigation, I found the 
test 
`StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownThreadUsingOldHandler`
 failed is because we set 2 stream threads for this test. So when we got the 
`uncaughtException`, we shutdown the thread, and **rebalancing** to the other 
thread. And we have to wait for rebalancing completes, and later another 
exception thrown in the thread, then the stream will turn into `ERROR` state, 
which is why it is so flaky. 
   
   I default set to 1 stream thread in this test since other tests will set to 
the expected thread number before testing.
   
   The fix is in this commit: 
https://github.com/apache/kafka/pull/9629/commits/75e2d261aff819ed8c7a1ec154d64a8e2e1c626e.
 Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-20 Thread GitBox


showuon edited a comment on pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#issuecomment-731520316


   @ableegoldman , thanks for pointing it out. After investigation, I found the 
test 
`StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownThreadUsingOldHandler`
 failed is because we set 2 stream threads for this test. So when we got the 
`uncaughtException`, we shutdown the thread, and **rebalancing** to the other 
thread. And we have to wait for rebalancing completes, and later another 
exception thrown in the other thread, then the stream will turn into `ERROR` 
state, which is why it is so flaky. 
   
   I default set to 1 stream thread in this test since other tests will set to 
the expected thread number before testing.
   
   The fix is in this commit: 
https://github.com/apache/kafka/pull/9629/commits/75e2d261aff819ed8c7a1ec154d64a8e2e1c626e.
 Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-20 Thread GitBox


showuon commented on pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#issuecomment-731520316


   @ableegoldman , thanks for pointing it out. After investigation, I found the 
test 
`StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownThreadUsingOldHandler`
 failed is because we set 2 stream threads for this test. So when we got the 
`uncaughtException`, we shutdown the thread, and **rebalancing** to the other 
thread. And we have to wait for another exception thrown in the other thread, 
then the stream will turn into `ERROR` state, which is why it is so flaky. 
   
   I default set to 1 stream thread in this test since other tests will set to 
the expected thread number before testing.
   
   The fix is in this commit: 
https://github.com/apache/kafka/pull/9629/commits/75e2d261aff819ed8c7a1ec154d64a8e2e1c626e.
 Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9636: KAFKA-10757; resolve compile problems brought by KAFKA-10755

2020-11-20 Thread GitBox


ijuma commented on pull request #9636:
URL: https://github.com/apache/kafka/pull/9636#issuecomment-731513931


   I verified that the test passed locally and the Jenkins build passed the 
compilation part.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9636: KAFKA-10757; resolve compile problems brought by KAFKA-10755

2020-11-20 Thread GitBox


ijuma merged pull request #9636:
URL: https://github.com/apache/kafka/pull/9636


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9634: KAFKA-10755: Should consider commit latency when computing next commit timestamp

2020-11-20 Thread GitBox


ijuma commented on pull request #9634:
URL: https://github.com/apache/kafka/pull/9634#issuecomment-731513666


   Looks this broke the build. Did we check that the PR passed?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9636: KAFKA-10757; resolve compile problems brought by KAFKA-10755

2020-11-20 Thread GitBox


dengziming commented on pull request #9636:
URL: https://github.com/apache/kafka/pull/9636#issuecomment-731506777


   @guozhangwang @mjsax Hi, PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #9636: KAFKA-10757; resolve compile problems brought by KAFKA-10755

2020-11-20 Thread GitBox


dengziming opened a new pull request #9636:
URL: https://github.com/apache/kafka/pull/9636


   The KAFKA-10755 calls new TaskManager with 9 params 
   ```
   final TaskManager taskManager = new TaskManager(
   null,
   null,
   null,
   null,
   null,
   null,
   null,
   null,
   null
   )
   ```
   but `new TaskManager` has 10 params, which brought compile problems
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10757) KAFKA-10755 brings a compile error

2020-11-20 Thread dengziming (Jira)
dengziming created KAFKA-10757:
--

 Summary: KAFKA-10755 brings a compile error 
 Key: KAFKA-10757
 URL: https://issues.apache.org/jira/browse/KAFKA-10757
 Project: Kafka
  Issue Type: Bug
Reporter: dengziming
Assignee: dengziming


The `new TaskManager` has 10 params but StreamThreadTest call a `new 
StreamThreadTest` with 9 params.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #9635: KAFKA-10756; Add missing unit test for `UnattachedState`

2020-11-20 Thread GitBox


dengziming commented on pull request #9635:
URL: https://github.com/apache/kafka/pull/9635#issuecomment-731501943


   @hachikuji @guozhangwang ,Hi, PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #9635: KAFKA-10756; Add missing unit test for `UnattachedState`

2020-11-20 Thread GitBox


dengziming opened a new pull request #9635:
URL: https://github.com/apache/kafka/pull/9635


   Accidentally find that there is no unit test for UnattachedState, We should 
add missing unit tests, code is similar to ResignedStateTest and VotedStateTest.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10756) Add missing unit test for `UnattachedState`

2020-11-20 Thread dengziming (Jira)
dengziming created KAFKA-10756:
--

 Summary: Add missing unit test for `UnattachedState`
 Key: KAFKA-10756
 URL: https://issues.apache.org/jira/browse/KAFKA-10756
 Project: Kafka
  Issue Type: Sub-task
Reporter: dengziming
Assignee: dengziming


Add unit test for UnattachedState, similar to KAFKA-10519



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9634: KAFKA-10755: Should consider commit latency when computing next commit timestamp

2020-11-20 Thread GitBox


mjsax commented on pull request #9634:
URL: https://github.com/apache/kafka/pull/9634#issuecomment-731499093


   Merged to `trunk` and cherry-picked to `2.6`. Will cherry pick to `2.7` 
after release is done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9634: KAFKA-10755: Should consider commit latency when computing next commit timestamp

2020-11-20 Thread GitBox


mjsax merged pull request #9634:
URL: https://github.com/apache/kafka/pull/9634


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9633: KAFKA-10706; Ensure leader epoch cache is cleaned after truncation to end offset

2020-11-20 Thread GitBox


hachikuji commented on pull request #9633:
URL: https://github.com/apache/kafka/pull/9633#issuecomment-731493312


   @junrao Thanks for reviewing. I pushed a minor tweak. I think we need to 
truncate the epoch cache even if the truncation offset is larger than the end 
offset. This is what would have happened in the example from the JIRA.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions

2020-11-20 Thread GitBox


hachikuji commented on pull request #9632:
URL: https://github.com/apache/kafka/pull/9632#issuecomment-731477573


   Note that this change is causing 
`LogTest.testAppendToTransactionIndexFailure` to fail. I think this has 
surfaced an inconsistency in how we update the transaction state. I'm 
considering the best way to resolve it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2020-11-20 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17236547#comment-17236547
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10643:


Hey [~eran-levy]

Thanks for uploading a larger sample of logs. I just had a look through the 
latest and think you may be hitting 
[KAFKA-10455|https://issues.apache.org/jira/browse/KAFKA-10455] , although I 
don't think that's necessarily related to the problem that you're experiencing. 
It only explains why there seem to be so many probing rebalances according to 
the logs, but these probing rebalances aren't actually occuring.

I'll try to dig for more clues but it's difficult to say with only the 
INFO-level logs available, since a long gap between logs may be due to healthy 
processing or due to Streams being stuck somewhere. If it would be possible to 
collect DEBUG logs that would definitely expedite the debugging process.

The only other thing I can think of is that maybe there's something going on 
with rocksdb, eg a very long compaction that's causing write stalls. I'd 
recommend collecting the rocksdb metrics if you aren't already, and looking 
into those for clues

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-20 Thread GitBox


junrao commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r527972552



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -105,7 +105,9 @@ public AbstractConfig(ConfigDef definition, Map 
originals,  Map
 throw new ConfigException(entry.getKey().toString(), 
entry.getValue(), "Key must be a string.");
 
 this.originals = resolveConfigVariables(configProviderProps, 
(Map) originals);
-this.values = definition.parse(this.originals);
+// pass a copy to definition.parse. Otherwise, the definition.parse 
adds all keys of definitions to "used" group
+// since definition.parse needs to call "RecordingMap#get" when 
checking all definitions.
+this.values = definition.parse(new HashMap<>(this.originals));

Review comment:
   Ok. I guess the issue is in the following, where we pass in a 
RecordingMap to construct ProducerConfig.
   
   
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L384
   
   However, that code seems no longer necessary since we are now setting 
clientId in ProducerConfig.postProcessParsedConfig(). Could we just avoid 
constructing ProducerConfig there?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10754) Fix flaky shouldShutdownSingleThreadApplication test

2020-11-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10754:

Component/s: unit tests
 streams

> Fix flaky shouldShutdownSingleThreadApplication test
> 
>
> Key: KAFKA-10754
> URL: https://issues.apache.org/jira/browse/KAFKA-10754
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownSingleThreadApplication
>  failed, log available in 
> /home/jenkins/jenkins-agent/workspace/Kafka/kafka-trunk-jdk11/streams/build/reports/testOutput/org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownSingleThreadApplication.test.stdout
> org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
>  > shouldShutdownSingleThreadApplication FAILED
>  java.lang.AssertionError: Expected all streams instances in 
> [org.apache.kafka.streams.KafkaStreams@36c1250, 
> org.apache.kafka.streams.KafkaStreams@124268b5] to be ERROR within 3 ms, 
> but the following were not: 
> \{org.apache.kafka.streams.KafkaStreams@124268b5=RUNNING, 
> org.apache.kafka.streams.KafkaStreams@36c1250=RUNNING}
>  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitForApplicationState$12(IntegrationTestUtils.java:933)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:450)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:418)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState(IntegrationTestUtils.java:916)
>  at 
> org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownSingleThreadApplication(StreamsUncaughtExceptionHandlerIntegrationTest.java:186)
>  
>  
> [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk15/runs/267/log/?start=0]
> [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk11/runs/241/log/?start=0]
> [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk15/runs/270/log/?start=0]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10752) One topic partition multiple consumer

2020-11-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10752.
-
Resolution: Invalid

[~dnamicro] – we use the Jira board for bug reports, not to answer questions.

If you have questions, please sign up to the user mailing list as described on 
the webpage ([https://kafka.apache.org/contact]) and ask your question there. 
Thanks.

> One topic partition multiple consumer
> -
>
> Key: KAFKA-10752
> URL: https://issues.apache.org/jira/browse/KAFKA-10752
> Project: Kafka
>  Issue Type: Task
>Reporter: AaronTrazona
>Priority: Minor
>
> # Does this means that single partition cannot be consumed by multiple 
> consumers? Cant we have single partition and a consumer group with more than 
> one consumer and make them all consume from single partition?
>  # If single partition can be consumed by only single consumer, I was 
> thinking why is this design decision?
>  # What if I need total order over records and still need it to be consumed 
> parallel? Is it undoable in Kafka? Or such scenario does not make sense?
> I need clarification if this is doable in kafka, that have  1 topic partition 
> with multiple consumer  (round robin strategy)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10745) Please let me know how I check the time which Source connector receive the data from source table.

2020-11-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10745.
-
Resolution: Invalid

[~nayusik] – we use the Jira board for bug reports, not to answer questions.

If you have questions, please sign up to the user mailing list as described on 
the webpage ([https://kafka.apache.org/contact]) and ask your question there. 
Thanks.

> Please let me know how I check the time which Source connector receive the 
> data from source table.
> --
>
> Key: KAFKA-10745
> URL: https://issues.apache.org/jira/browse/KAFKA-10745
> Project: Kafka
>  Issue Type: Improvement
>Reporter: NAYUSIK
>Priority: Major
>
> Please let me know how I check the time which Source connector receive the 
> data from source table.
> I want to check the time by section.
> We are currently using JDBC Connector.
> The time we can see is the time when the data is created on the source table, 
> the time when the data is entered into Kafka, and the time when the data is 
> generated on the target table.
> But I also want to know the time when Source connector receive the data from 
> source table.
> Please tell me what settings I need to set up on the Source connector.
> Thank you for your support.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on pull request #9497: KAFKA-10619: Producer will enable idempotence and acks all by default

2020-11-20 Thread GitBox


d8tltanc commented on pull request #9497:
URL: https://github.com/apache/kafka/pull/9497#issuecomment-731462819


   Hi, @warrenzhu25. We'll wait at least until 3.0 for merging as we are making 
the ACL changes. Please see KIP-679 for more details and join our discussion. 
Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10755) Should consider commit latency when computing next commit timestamp

2020-11-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10755:

Priority: Critical  (was: Major)

> Should consider commit latency when computing next commit timestamp
> ---
>
> Key: KAFKA-10755
> URL: https://issues.apache.org/jira/browse/KAFKA-10755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Critical
>
> In 2.6, we reworked the main processing/commit loop in `StreamThread` and 
> introduced a regression, by _not_ updating the current time after committing. 
> This implies that we compute the next commit timestamp too low (ie, too 
> early).
> For small commit intervals and high commit latency (like in EOS), this big 
> may lead to an increased commit frequency and fewer processed records between 
> two commits, and thus to reduced throughput.
> For example, assume that the commit interval is 100ms and the commit latency 
> is 50ms, and we start the commit at timestamp 1. The commit finishes at 
> 10050, and the next commit should happen at 10150. However, if we don't 
> update the current timestamp, we incorrectly compute the next commit time as 
> 10100, ie, 50ms too early, and we have only 50ms to process data instead of 
> the intended 100ms.
> In the worst case, if the commit latency is larger than the commit interval, 
> it would imply that we commit after processing a single record per task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9634: KAFKA-10755: Should consider commit latency when computing next commit timestamp

2020-11-20 Thread GitBox


guozhangwang commented on pull request #9634:
URL: https://github.com/apache/kafka/pull/9634#issuecomment-731450981


   LGTM! Please feel free to merge if local unit test passes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10755) Should consider commit latency when computing next commit timestamp

2020-11-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10755:
---

Assignee: Matthias J. Sax

> Should consider commit latency when computing next commit timestamp
> ---
>
> Key: KAFKA-10755
> URL: https://issues.apache.org/jira/browse/KAFKA-10755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> In 2.6, we reworked the main processing/commit loop in `StreamThread` and 
> introduced a regression, by _not_ updating the current time after committing. 
> This implies that we compute the next commit timestamp too low (ie, too 
> early).
> For small commit intervals and high commit latency (like in EOS), this big 
> may lead to an increased commit frequency and fewer processed records between 
> two commits, and thus to reduced throughput.
> For example, assume that the commit interval is 100ms and the commit latency 
> is 50ms, and we start the commit at timestamp 1. The commit finishes at 
> 10050, and the next commit should happen at 10150. However, if we don't 
> update the current timestamp, we incorrectly compute the next commit time as 
> 10100, ie, 50ms too early, and we have only 50ms to process data instead of 
> the intended 100ms.
> In the worst case, if the commit latency is larger than the commit interval, 
> it would imply that we commit after processing a single record per task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax opened a new pull request #9634: KAFKA-10755: Should consider commit latency when computing next commit timestamp

2020-11-20 Thread GitBox


mjsax opened a new pull request #9634:
URL: https://github.com/apache/kafka/pull/9634


   Call for review @wcarlson5 @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10755) Should consider commit latency when computing next commit timestamp

2020-11-20 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-10755:
---

 Summary: Should consider commit latency when computing next commit 
timestamp
 Key: KAFKA-10755
 URL: https://issues.apache.org/jira/browse/KAFKA-10755
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Matthias J. Sax


In 2.6, we reworked the main processing/commit loop in `StreamThread` and 
introduced a regression, by _not_ updating the current time after committing. 
This implies that we compute the next commit timestamp too low (ie, too early).

For small commit intervals and high commit latency (like in EOS), this big may 
lead to an increased commit frequency and fewer processed records between two 
commits, and thus to reduced throughput.

For example, assume that the commit interval is 100ms and the commit latency is 
50ms, and we start the commit at timestamp 1. The commit finishes at 10050, 
and the next commit should happen at 10150. However, if we don't update the 
current timestamp, we incorrectly compute the next commit time as 10100, ie, 
50ms too early, and we have only 50ms to process data instead of the intended 
100ms.

In the worst case, if the commit latency is larger than the commit interval, it 
would imply that we commit after processing a single record per task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions

2020-11-20 Thread GitBox


hachikuji commented on a change in pull request #9632:
URL: https://github.com/apache/kafka/pull/9632#discussion_r528007637



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -301,26 +304,28 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
 }
   }
 
-  def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
- producerEpoch: Short,
- offset: Long,
- timestamp: Long): CompletedTxn = {
+  def appendEndTxnMarker(
+endTxnMarker: EndTransactionMarker,
+producerEpoch: Short,
+offset: Long,
+timestamp: Long
+  ): Option[CompletedTxn] = {
 checkProducerEpoch(producerEpoch, offset)
 checkCoordinatorEpoch(endTxnMarker, offset)
 
-val firstOffset = updatedEntry.currentTxnFirstOffset match {
-  case Some(txnFirstOffset) => txnFirstOffset
-  case None =>
-transactions += new TxnMetadata(producerId, offset)
-offset
+// Only emit the `CompletedTxn` for non-empty transactions. A transaction 
marker
+// without any associated data will not have any impact on the last stable 
offset
+// and would not need to be reflected in the transaction index.
+val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
+  CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType 
== ControlRecordType.ABORT)

Review comment:
   Yes, that is right. Additionally, we are not adding the transaction to 
the list of started transactions which are accumulated in the 
`ProducerAppendInfo`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions

2020-11-20 Thread GitBox


lbradstreet commented on a change in pull request #9632:
URL: https://github.com/apache/kafka/pull/9632#discussion_r528006935



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -301,26 +304,28 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
 }
   }
 
-  def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
- producerEpoch: Short,
- offset: Long,
- timestamp: Long): CompletedTxn = {
+  def appendEndTxnMarker(
+endTxnMarker: EndTransactionMarker,
+producerEpoch: Short,
+offset: Long,
+timestamp: Long
+  ): Option[CompletedTxn] = {
 checkProducerEpoch(producerEpoch, offset)
 checkCoordinatorEpoch(endTxnMarker, offset)
 
-val firstOffset = updatedEntry.currentTxnFirstOffset match {
-  case Some(txnFirstOffset) => txnFirstOffset
-  case None =>
-transactions += new TxnMetadata(producerId, offset)
-offset
+// Only emit the `CompletedTxn` for non-empty transactions. A transaction 
marker
+// without any associated data will not have any impact on the last stable 
offset
+// and would not need to be reflected in the transaction index.
+val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
+  CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType 
== ControlRecordType.ABORT)

Review comment:
   Could you check my understanding? If we have a a non-empty 
currentTxnFirstOffset value (indicating a non-empty transaction), we'll return 
a valid CompletedTxn, otherwise we will return None. For the empty transactions 
this means that we aren't accumulating completed transactions. This saves us 
from having to call lastStableOffset on every empty completed transaction 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1240?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9633: KAFKA-10706; Ensure leader epoch cache is cleaned after truncation to end offset

2020-11-20 Thread GitBox


hachikuji opened a new pull request #9633:
URL: https://github.com/apache/kafka/pull/9633


   This patch fixes a liveness bug which prevents follower truncation from 
completing after a leader election. If there are consecutive leader elections 
without writing any data entries, then the leader and follower may have 
conflicting epoch entries at the end of the log. The JIRA explains a specific 
scenario in more detail: https://issues.apache.org/jira/browse/KAFKA-10706.
   
   The problem is the shortcut return in `Log.truncateTo` when the truncation 
offset is larger than or equal to the end offset, which prevents the 
conflicting entries from being resolved. Here we change this case to ensure 
`LeaderEpochFileCache.truncateFromEnd` is still called.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions

2020-11-20 Thread GitBox


hachikuji commented on pull request #9632:
URL: https://github.com/apache/kafka/pull/9632#issuecomment-731411122


   Addressing this problem more generally so that we can also handle small 
transactions is difficult because of the need to maintain the index. I believe 
there is still room for improvement by looking only at the aborted 
transactions, but this is more complex and I need to think it through.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions

2020-11-20 Thread GitBox


hachikuji opened a new pull request #9632:
URL: https://github.com/apache/kafka/pull/9632


   Compacted topics can accumulate a large number of empty transaction markers 
as the data from the transactions gets cleaned. For each transaction, there is 
some bookkeeping that leaders and followers much do to keep the transaction 
index up to date. The cost of this overhead can degrade performance when a 
replica needs to catch up if the log has mostly empty or small transactions. 
This patch improves the cost by skipping over empty transactions since these 
will have no effect on the last stable offset and do not need to be reflected 
in the transaction index.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

2020-11-20 Thread GitBox


jsancio opened a new pull request #9631:
URL: https://github.com/apache/kafka/pull/9631


   It is possible for the the controller to send LeaderAndIsr requests with
   an ISR that contains ids not in the replica set. This is used during
   reassignment so that the partition leader doesn't add replicas back to
   the ISR. This is needed because the controller updates ZK and the
   replicas through two rounds:
   
   1. The first round of ZK updates and LeaderAndIsr requests shrinks the ISR.
   
   2. The second round of ZK updates and LeaderAndIsr requests shrinks the 
replica
   set.
   
   This could be avoided by doing 1. and 2. in one round. Unfortunately the
   current controller implementation makes that non-trivial.
   
   This commit changes the leader to allow the state where the ISR contains
   ids that are not in the replica set and to remove such ids from the ISR
   if required.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9672) Dead brokers in ISR cause isr-expiration to fail with exception

2020-11-20 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17236420#comment-17236420
 ] 

Jose Armando Garcia Sancio edited comment on KAFKA-9672 at 11/20/20, 8:14 PM:
--

Based on my observations here: 
https://issues.apache.org/jira/browse/KAFKA-9672?focusedCommentId=17236416=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17236416

Solution 1: I think the ideal solution is to never allow the ISR to be a 
superset of the replica set. Unfortunately, this is not easy to with how the 
controller implementation manages writes to ZK.

Solution 2: Another solution is to allow the ISR to be a superset of the 
replica set but also allow the Leader to remove replicas from the ISR if they 
are not in the replica set.


was (Author: jagsancio):
Based on my observations here: 
https://issues.apache.org/jira/browse/KAFKA-9672?focusedCommentId=17236416=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17236416

I think the ideal solution is to never allow the ISR to be a superset of the 
replica set. Unfortunately, this is not easy to with how the controller 
implementation manages writes to ZK.

Another solution is to allow the ISR to be a superset of the replica set but 
also allow the Leader to remove replicas from the ISR if they are not in the 
replica set.

> Dead brokers in ISR cause isr-expiration to fail with exception
> ---
>
> Key: KAFKA-9672
> URL: https://issues.apache.org/jira/browse/KAFKA-9672
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0, 2.4.1
>Reporter: Ivan Yurchenko
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> We're running Kafka 2.4 and facing a pretty strange situation.
>  Let's say there were three brokers in the cluster 0, 1, and 2. Then:
>  1. Broker 3 was added.
>  2. Partitions were reassigned from broker 0 to broker 3.
>  3. Broker 0 was shut down (not gracefully) and removed from the cluster.
>  4. We see the following state in ZooKeeper:
> {code:java}
> ls /brokers/ids
> [1, 2, 3]
> get /brokers/topics/foo
> {"version":2,"partitions":{"0":[2,1,3]},"adding_replicas":{},"removing_replicas":{}}
> get /brokers/topics/foo/partitions/0/state
> {"controller_epoch":123,"leader":1,"version":1,"leader_epoch":42,"isr":[0,2,3,1]}
> {code}
> It means, the dead broker 0 remains in the partitions's ISR. A big share of 
> the partitions in the cluster have this issue.
> This is actually causing an errors:
> {code:java}
> Uncaught exception in scheduled task 'isr-expiration' 
> (kafka.utils.KafkaScheduler)
> org.apache.kafka.common.errors.ReplicaNotAvailableException: Replica with id 
> 12 is not available on broker 17
> {code}
> It means that effectively {{isr-expiration}} task is not working any more.
> I have a suspicion that this was introduced by [this commit (line 
> selected)|https://github.com/apache/kafka/commit/57baa4079d9fc14103411f790b9a025c9f2146a4#diff-5450baca03f57b9f2030f93a480e6969R856]
> Unfortunately, I haven't been able to reproduce this in isolation.
> Any hints about how to reproduce (so I can write a patch) or mitigate the 
> issue on a running cluster are welcome.
> Generally, I assume that not throwing {{ReplicaNotAvailableException}} on a 
> dead (i.e. non-existent) broker, considering them out-of-sync and removing 
> from the ISR should fix the problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9672) Dead brokers in ISR cause isr-expiration to fail with exception

2020-11-20 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17236420#comment-17236420
 ] 

Jose Armando Garcia Sancio commented on KAFKA-9672:
---

Based on my observations here: 
https://issues.apache.org/jira/browse/KAFKA-9672?focusedCommentId=17236416=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17236416

I think the ideal solution is to never allow the ISR to be a superset of the 
replica set. Unfortunately, this is not easy to with how the controller 
implementation manages writes to ZK.

Another solution is to allow the ISR to be a superset of the replica set but 
also allow the Leader to remove replicas from the ISR if they are not in the 
replica set.

> Dead brokers in ISR cause isr-expiration to fail with exception
> ---
>
> Key: KAFKA-9672
> URL: https://issues.apache.org/jira/browse/KAFKA-9672
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0, 2.4.1
>Reporter: Ivan Yurchenko
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> We're running Kafka 2.4 and facing a pretty strange situation.
>  Let's say there were three brokers in the cluster 0, 1, and 2. Then:
>  1. Broker 3 was added.
>  2. Partitions were reassigned from broker 0 to broker 3.
>  3. Broker 0 was shut down (not gracefully) and removed from the cluster.
>  4. We see the following state in ZooKeeper:
> {code:java}
> ls /brokers/ids
> [1, 2, 3]
> get /brokers/topics/foo
> {"version":2,"partitions":{"0":[2,1,3]},"adding_replicas":{},"removing_replicas":{}}
> get /brokers/topics/foo/partitions/0/state
> {"controller_epoch":123,"leader":1,"version":1,"leader_epoch":42,"isr":[0,2,3,1]}
> {code}
> It means, the dead broker 0 remains in the partitions's ISR. A big share of 
> the partitions in the cluster have this issue.
> This is actually causing an errors:
> {code:java}
> Uncaught exception in scheduled task 'isr-expiration' 
> (kafka.utils.KafkaScheduler)
> org.apache.kafka.common.errors.ReplicaNotAvailableException: Replica with id 
> 12 is not available on broker 17
> {code}
> It means that effectively {{isr-expiration}} task is not working any more.
> I have a suspicion that this was introduced by [this commit (line 
> selected)|https://github.com/apache/kafka/commit/57baa4079d9fc14103411f790b9a025c9f2146a4#diff-5450baca03f57b9f2030f93a480e6969R856]
> Unfortunately, I haven't been able to reproduce this in isolation.
> Any hints about how to reproduce (so I can write a patch) or mitigate the 
> issue on a running cluster are welcome.
> Generally, I assume that not throwing {{ReplicaNotAvailableException}} on a 
> dead (i.e. non-existent) broker, considering them out-of-sync and removing 
> from the ISR should fix the problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9672) Dead brokers in ISR cause isr-expiration to fail with exception

2020-11-20 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17236416#comment-17236416
 ] 

Jose Armando Garcia Sancio commented on KAFKA-9672:
---

I was not able to reproduce this issue but looking at the code and the trace of 
messages sent by the controller this is what I think it is happening.

Assuming that the initial partition assignment and state is:
{code:java}
Replicas: 0, 1, 2
ISR: 0, 1, 2
Leader: 0
LeaderEpoch: 1{code}
This state is replicated to all of the replicas (0, 1, 2) using the 
LeaderAndIsr requests.

When the user attempts to perform a reassignment of replacing 0 with 3, the 
controller bumps the epoch and assignment info
{code:java}
Replicas: 0, 1, 2, 3
Adding: 3
Removing: 0
ISR: 0, 1, 2
Leader: 0
LeaderEpoch: 2{code}
This state is replicated to all of the replicas (0, 1, 2, 3) using the 
LeaderAndIsr request.

The system roughly stays in this state until the all of the target replicas 
have join the ISR. When all of the target replicas have join the ISR the 
controller wants to perform the following flow:

1 - The controller moves the leader if necessary (leader is not in the new 
replicas set) and stops the leader from letting "removing" replicas to join the 
ISR.

The second requirement (stopping the leader from adding "removing" replicas to 
the ISR) is accomplished by bumping the leader epoch and only sending the new 
leader epoch to the target replicas (1, 2, 3). Unfortunately, due to how the 
controller is implemented this is accomplished by deleting the "removing" 
replicas from the in memory state without modifying the ISR state. At this 
point we have the ZK state:
{code:java}
Replicas: 0, 1, 2, 3
Adding:
Removing: 0
ISR: 0, 1, 2, 3
Leader: 1
LeaderEpoch: 3{code}
but the following LeaderAndIsr requests are sent to replicas 1, 2, 3
{code:java}
Replicas: 1, 2, 3
Adding:
Removing:
ISR: 0, 1, 2, 3
Leader: 1
LeaderEpoch: 3{code}
This works because replica 0 will have an invalid leader epoch which means that 
it's Fetch request will be ignored by the (new) leader.

2 - The controller removes replica 0 from the ISR by updating ZK and sending 
the appropriate LeaderAndIsr requests.

3 - The controller removes replica 0 from the replica set by updating ZK and 
sending the appropriate LeaderAndIsr requests.

 

Conclusion

If this flow executes to completion, everything is okay. The problem is what 
happens if step 2. and 3. don't get to execute. I am unable to reproduce this 
with tests or by walking the code but if 2. and 3. don't execute but the 
controller stays alive there is a flow where the controller persists the 
following state to ZK
{code:java}
Replicas: 1, 2, 3
Adding:
Removing:
ISR: 0, 1, 2, 3
Leader: 1
LeaderEpoch: 3{code}
Which causes the reassignment flow to terminate with the system staying in this 
state. This state is persistent at this line in the controller code:

https://github.com/apache/kafka/blob/43fd630d80332f2b3b3512a712100825a8417704/core/src/main/scala/kafka/controller/KafkaController.scala#L728

> Dead brokers in ISR cause isr-expiration to fail with exception
> ---
>
> Key: KAFKA-9672
> URL: https://issues.apache.org/jira/browse/KAFKA-9672
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0, 2.4.1
>Reporter: Ivan Yurchenko
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> We're running Kafka 2.4 and facing a pretty strange situation.
>  Let's say there were three brokers in the cluster 0, 1, and 2. Then:
>  1. Broker 3 was added.
>  2. Partitions were reassigned from broker 0 to broker 3.
>  3. Broker 0 was shut down (not gracefully) and removed from the cluster.
>  4. We see the following state in ZooKeeper:
> {code:java}
> ls /brokers/ids
> [1, 2, 3]
> get /brokers/topics/foo
> {"version":2,"partitions":{"0":[2,1,3]},"adding_replicas":{},"removing_replicas":{}}
> get /brokers/topics/foo/partitions/0/state
> {"controller_epoch":123,"leader":1,"version":1,"leader_epoch":42,"isr":[0,2,3,1]}
> {code}
> It means, the dead broker 0 remains in the partitions's ISR. A big share of 
> the partitions in the cluster have this issue.
> This is actually causing an errors:
> {code:java}
> Uncaught exception in scheduled task 'isr-expiration' 
> (kafka.utils.KafkaScheduler)
> org.apache.kafka.common.errors.ReplicaNotAvailableException: Replica with id 
> 12 is not available on broker 17
> {code}
> It means that effectively {{isr-expiration}} task is not working any more.
> I have a suspicion that this was introduced by [this commit (line 
> selected)|https://github.com/apache/kafka/commit/57baa4079d9fc14103411f790b9a025c9f2146a4#diff-5450baca03f57b9f2030f93a480e6969R856]
> Unfortunately, I haven't been able to reproduce this in isolation.
> Any hints about 

[GitHub] [kafka] ableegoldman commented on pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-20 Thread GitBox


ableegoldman commented on pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#issuecomment-731359086


   Hey @showuon thanks for the quick fix! I notice that 
`StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownThreadUsingOldHandler`
 still failed on the JDK15 PR build, can you look into that?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abc863377 commented on pull request #9603: MINOR: Initialize ConnectorConfig constructor with emptyMap and avoid instantiating a new Map

2020-11-20 Thread GitBox


abc863377 commented on pull request #9603:
URL: https://github.com/apache/kafka/pull/9603#issuecomment-731336490


   @kkonstantine  
   Thanks a lot.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2020-11-20 Thread GitBox


ryannedolan commented on pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#issuecomment-731307334


   > Looking into my configuration, I now understand that I was lucky because I 
have one link going up back up replica_OLS->replica_CENTRAL which is now the 
single emitter of beats (which are then replicated in every other cluster)
   
   Ah, that's lucky indeed. If you sever that link, I suppose your heartbeats 
will stop flowing across the entire topology. Also, if you look at the 
heartbeat message content, you'll probably find that all heartbeats are 
originating from that same cluster.
   
   Sounds like disabling heartbeats altogether isn't what you want either. So 
we may need a way to emit heartbeats to source clusters without spinning up 
N*(N-1) herders just to do so.
   
   What if we created N additional herders just for heartbeats to each cluster? 
Say you had 20 clusters and only one flow enabled (A->B). In that case you 
could have 21 herders total: one for the A->B flow and 20 for per-cluster 
heartbeats. That's at least better than 20*19=380 herders. This raises the 
question of what "source" to associated with heartbeats, but seems like a 
solvable problem.
   
   Another idea is to have herders emit heartbeats _upstream_ within 
MirrorSourceConnector. In that case, the A->B herder would target B but, 
paradoxically, emit heartbeats to A. Hard to say if that would break anything, 
but as @twobeeb  points out, this may be a reasonable interpretation of the 
documentation.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-20 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527857267



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {

Review comment:
   @cadonna added





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-20 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527836277



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {

Review comment:
   Okay we can check then only synchronize around the start of the thread 
to make sure it doesn't shutdown between the check and the starting 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

2020-11-20 Thread GitBox


jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r527812774



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1445,15 +1483,38 @@ class ReplicaManager(val config: KafkaConfig,
   replicaFetcherManager.shutdownIdleFetcherThreads()
   replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
   onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-  val responsePartitions = responseMap.iterator.map { case (tp, error) 
=>
-new LeaderAndIsrPartitionError()
-  .setTopicName(tp.topic)
-  .setPartitionIndex(tp.partition)
-  .setErrorCode(error.code)
-  }.toBuffer
-  new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-.setErrorCode(Errors.NONE.code)
-.setPartitionErrors(responsePartitions.asJava))
+  if (leaderAndIsrRequest.version() < 4) {

Review comment:
   Good catch. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #9630: KAFKA-10739; WIP

2020-11-20 Thread GitBox


dajac opened a new pull request #9630:
URL: https://github.com/apache/kafka/pull/9630


   TBD
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ramesh-muthusamy commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-11-20 Thread GitBox


ramesh-muthusamy commented on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-731236707


   We will get the same results because the task distribution considers all
   the tasks independent of a connector.
   
   On Fri, Nov 20, 2020 at 8:26 PM Basile Deustua 
   wrote:
   
   > @BDeus  it is currently not possible to
   > differentiate the load parameters of a connector, it is assumed that all
   > connectors are equally while ingestion. We do have similar scenarios where
   > we run multiple clusters instead of loading a single cluster with variety
   > of loads.
   >
   > I'm aware that the connector's load cannot be differentiate.
   > But currently, in eager mode, it's assure that each connector is equally
   > balanced on each workers and that worker as the same amount of tasks.
   > I'm asking myself if it will be the same result or if it randomly move
   > tasks from all connectors to workers to assure an equally balanced tasks
   > (and can lead to have a worker with more task from "connector1" than
   > another worker) ?
   > Thank you for your answers
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   >
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] iprithv edited a comment on pull request #9204: KAFKA-6181 Examining log messages with {{--deep-iteration}} should show superset of fields

2020-11-20 Thread GitBox


iprithv edited a comment on pull request #9204:
URL: https://github.com/apache/kafka/pull/9204#issuecomment-731225730


   @chia7712 Sincere apologies for the delay in response. My jira account is 
ipri...@gmail.com
   Username: iprithv
   Full name: Prithvi
   Also I have commented in Jira.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] iprithv commented on pull request #9204: KAFKA-6181 Examining log messages with {{--deep-iteration}} should show superset of fields

2020-11-20 Thread GitBox


iprithv commented on pull request #9204:
URL: https://github.com/apache/kafka/pull/9204#issuecomment-731225730


   Sincere apologies for the delay in response. My jira account is 
ipri...@gmail.com
   Username: iprithv
   Full name: Prithvi
   Also I have commented in Jira.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6181) Examining log messages with {{--deep-iteration}} should show superset of fields

2020-11-20 Thread Prithvi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17236217#comment-17236217
 ] 

Prithvi commented on KAFKA-6181:


[~spright] Hey, please assign it to me.

> Examining log messages with {{--deep-iteration}} should show superset of 
> fields
> ---
>
> Key: KAFKA-6181
> URL: https://issues.apache.org/jira/browse/KAFKA-6181
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.11.0.0
>Reporter: Yeva Byzek
>Priority: Minor
>  Labels: newbie
>
> Printing log data on Kafka brokers using {{kafka.tools.DumpLogSegments}}:
>  {{--deep-iteration}} should show a superset of fields in each message, as 
> compared to without this parameter, however some fields are missing. Impact: 
> users need to execute both commands to get the full set of fields.
> {noformat}
> kafka-run-class kafka.tools.DumpLogSegments \
> --print-data-log \
> --files .log
> Dumping .log
> Starting offset: 0
> baseOffset: 0 lastOffset: 35 baseSequence: -1 lastSequence: -1 producerId: -1 
> producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0 
> CreateTime: 1509987569448 isvalid: true size: 3985 magic: 2 compresscodec: 
> NONE crc: 4227905507
> {noformat}
> {noformat}
> kafka-run-class kafka.tools.DumpLogSegments \
> --print-data-log \
> --files .log \
> --deep-iteration
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 CreateTime: 1509987569420 isvalid: true keysize: -1 
> valuesize: 100
> magic: 2 compresscodec: NONE producerId: -1 sequence: -1 isTransactional: 
> false headerKeys: [] payload: 
> SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHW
> {noformat}
> Notice, for example, that {{partitionLeaderEpoch}} and {{crc}} are missing. 
> Print these and all missing fields.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] BDeus commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-11-20 Thread GitBox


BDeus commented on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-731217584


   > @BDeus it is currently not possible to differentiate the load parameters 
of a connector, it is assumed that all connectors are equally while ingestion. 
We do have similar scenarios where we run multiple clusters instead of loading 
a single cluster with variety of loads.
   
   I'm aware that the connector's load cannot be differentiate.
   But currently, in eager mode, it's assure that each connector is equally 
balanced on each workers and that worker as the same amount of tasks.
   I'm asking myself if it will be the same result or if it randomly move tasks 
from all connectors to workers to assure an equally balanced tasks (and can 
lead to have a worker with more task from "connector1" than another worker) ?
   Thank you for your answers
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-20 Thread GitBox


showuon commented on a change in pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#discussion_r527741948



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##
@@ -110,83 +110,45 @@ public void teardown() throws IOException {
 }
 
 @Test
-public void shouldShutdownThreadUsingOldHandler() throws Exception {
+public void shouldShutdownThreadUsingOldHandler() throws 
InterruptedException {
 try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
-final CountDownLatch latch = new CountDownLatch(1);
 final AtomicBoolean flag = new AtomicBoolean(false);
 kafkaStreams.setUncaughtExceptionHandler((t, e) -> flag.set(true));
 
 
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
 produceMessages(0L, inputTopic, "A");
-waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, Duration.ofSeconds(15));
 
 TestUtils.waitForCondition(flag::get, "Handler was called");
+waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, DEFAULT_DURATION);

Review comment:
   We should wait for the `uncaughtExceptionHandler` got called before 
waiting for the streams state change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ramesh-muthusamy commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-11-20 Thread GitBox


ramesh-muthusamy commented on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-731210676


   > @BDeus, I'm running @ramesh-muthusamy fix in production as a hotfix and 
can confirm this is now the behavior. Our cluster is processing more than a 
million messages a second and is stable with this fix.
   
   Awesome , thanks @seankumar-tl 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ramesh-muthusamy commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-11-20 Thread GitBox


ramesh-muthusamy commented on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-731210369


   > > @BDeus, I'm running @ramesh-muthusamy fix in production as a hotfix and 
can confirm this is now the behavior. Our cluster is processing more than a 
million messages a second and is stable with this fix.
   > 
   > Awesome job.
   > Do you run multiple connectors with different load @seankumar-tl ? We had 
to rollback the connect.protocol to EAGER to bypass this issue (with the cons 
we know)
   
   @BDeus  it is currently not possible to differentiate the load parameters of 
a connector, it is  assumed that all connectors are equally while ingestion. We 
do have similar scenarios where we run multiple clusters  instead of loading a 
single cluster with variety of loads. 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2020-11-20 Thread GitBox


dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-731208645


   Hi @vvcephei,
   
   I re-based the branch onto the latest trunk, reorganized the changes into 
two commits, and fixed 
`EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore`
 - Sure, its contents do not follow the description of its name. :wink: 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] BDeus edited a comment on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-11-20 Thread GitBox


BDeus edited a comment on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-731203761


   > @BDeus, I'm running @ramesh-muthusamy fix in production as a hotfix and 
can confirm this is now the behavior. Our cluster is processing more than a 
million messages a second and is stable with this fix.
   
   Awesome job.
   Do you run multiple connectors with different load @seankumar-tl ? We had to 
rollback the connect.protocol to EAGER to bypass this issue (with the cons we 
know)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] BDeus edited a comment on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-11-20 Thread GitBox


BDeus edited a comment on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-731203761


   > @BDeus, I'm running @ramesh-muthusamy fix in production as a hotfix and 
can confirm this is now the behavior. Our cluster is processing more than a 
million messages a second and is stable with this fix.
   
   Awesome job.  Do you run multiple connectors with different load 
@seankumar-tl ? We had to rollback the connect.protocol to EAGER to bypass this 
issue (with the cons we know)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10753) check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0

2020-11-20 Thread lqjacklee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17236189#comment-17236189
 ] 

lqjacklee commented on KAFKA-10753:
---


{code:java}
public void reset(long timeoutMs) {
if (timeoutMs < 0)
throw new IllegalArgumentException("Invalid negative timeout " + 
timeoutMs);

this.timeoutMs = timeoutMs;
this.startMs = this.currentTimeMs;

if (currentTimeMs > Long.MAX_VALUE - timeoutMs)
this.deadlineMs = Long.MAX_VALUE;
else
this.deadlineMs = currentTimeMs + timeoutMs;
}
{code}




> check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> ---
>
> Key: KAFKA-10753
> URL: https://issues.apache.org/jira/browse/KAFKA-10753
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: shiqihao
>Assignee: lqjacklee
>Priority: Minor
>
> I accidentally set ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG = 0, CPU 
> running at 100%.
> Could we add a check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> while start consumer?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] BDeus edited a comment on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-11-20 Thread GitBox


BDeus edited a comment on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-731203761


   > @BDeus, I'm running @ramesh-muthusamy fix in production as a hotfix and 
can confirm this is now the behavior. Our cluster is processing more than a 
million messages a second and is stable with this fix.
   
   Awesome job. @seankumar-tl Do you run multiple connectors with different 
load ? We had to rollback the connect.protocol to EAGER to bypass this issue 
(with the cons we know)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] BDeus commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-11-20 Thread GitBox


BDeus commented on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-731203761


   > @BDeus, I'm running @ramesh-muthusamy fix in production as a hotfix and 
can confirm this is now the behavior. Our cluster is processing more than a 
million messages a second and is stable with this fix.
   
   Awesome job. Do you run multiple connectors with different load ? We had to 
rollback the connect.protocol to EAGER to bypass this issue (with the cons we 
know)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] seankumar-tl commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-11-20 Thread GitBox


seankumar-tl commented on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-731200868


   @BDeus, I'm running @ramesh-muthusamy fix in production as a hotfix and can 
confirm this is now the behavior. Our cluster is processing more than a million 
messages a second and is stable with this fix. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

2020-11-20 Thread GitBox


lct45 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r527724338



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -377,7 +378,7 @@ private void shouldAddMetricsOnAllLevels(final String 
builtInMetricsVersion) thr
 builtInMetricsVersion
 );
 checkCacheMetrics(builtInMetricsVersion);
-
+verifyFailedStreamThreadsSensor(0.0);

Review comment:
   After looking at both test classes, I think it actually might make the 
most sense to put the test for this metric in 
`StreamsUncaughtExceptionHandlerIntegrationTest`, since the metric is so 
closely aligned with the exception handler anyways and the setup works nicely 
with what we're trying to test with the metric. From the size + complexity of 
the other test classes, I think creating an overloaded processor for one test 
out of 20+ tests seems tricky.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10746) Consumer poll timeout Expiration should be logged as WARNING not INFO.

2020-11-20 Thread Martin Dengler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17236181#comment-17236181
 ] 

Martin Dengler commented on KAFKA-10746:


Hi Luke and Benedikt, thanks for the quick response! Very much appreciated !!!

+1

> Consumer poll timeout Expiration should be logged as WARNING not INFO. 
> ---
>
> Key: KAFKA-10746
> URL: https://issues.apache.org/jira/browse/KAFKA-10746
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0, 2.6.0, 2.5.1
>Reporter: Benedikt Linse
>Assignee: Luke Chen
>Priority: Minor
>
> When a consumer does not poll regularly, and the `max.poll.interval.ms` 
> threshold is reached, the consumer leaves the consumer group, and the reason 
> is logged as an INFO message:
> [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1356]
> [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1016]
> Most Kafka users ignore INFO messages or have the log level set to WARN. 
> Still many users run into this issue, since their applications take too long 
> to process the polled records, and then the consumer fails to commit the 
> offsets, which leads to duplicate message processing. Not seeing the error 
> message in the first place means that users lose a lot of time debugging and 
> searching for the reason for duplicate message processing.
> Therefore it seems like the log level of this message should be increased to 
> WARN. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-20 Thread GitBox


showuon commented on a change in pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#discussion_r527720582



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -903,16 +903,19 @@ public static void 
startApplicationAndWaitUntilRunning(final List
 }
 
 /**
- * Waits for the given {@link KafkaStreams} instances to all be in a 
{@link State#RUNNING}
- * state. Prefer {@link #startApplicationAndWaitUntilRunning(List, 
Duration)} when possible
+ * Waits for the given {@link KafkaStreams} instances to all be in a 
specific {@link State}.
+ * Prefer {@link #startApplicationAndWaitUntilRunning(List, Duration)} 
when possible
  * because this method uses polling, which can be more error prone and 
slightly slower.
  *
  * @param streamsList the list of streams instances to run.
- * @param timeout the time to wait for the streams to all be in {@link 
State#RUNNING} state.
+ * @param state the expected state that all the streams to be in within 
timeout
+ * @param timeout the time to wait for the streams to all be in the 
specific state.
+ *
+ * @throws InterruptedException if the streams doesn't change to the 
expected state in time.
  */
 public static void waitForApplicationState(final List 
streamsList,
final State state,
-   final Duration timeout) throws 
Exception {
+   final Duration timeout) throws 
InterruptedException {

Review comment:
   We should throw a specific kind of exception, not an `Exception`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-20 Thread GitBox


showuon commented on a change in pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#discussion_r527719599



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##
@@ -110,83 +110,45 @@ public void teardown() throws IOException {
 }
 
 @Test
-public void shouldShutdownThreadUsingOldHandler() throws Exception {
+public void shouldShutdownThreadUsingOldHandler() throws 
InterruptedException {
 try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
-final CountDownLatch latch = new CountDownLatch(1);

Review comment:
   delete unused `latch`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] BDeus commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-11-20 Thread GitBox


BDeus commented on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-731189266


   Just a quick question.
   Can we confirmed that when a KafkaConnect Cluster process mutiple 
connectors, the task of each connector as equally balanced on the workers ?
   Here an example:
   With two connectors, one "HeavyLoad" (4 tasks) and another "LowLoad" (4 
tasks) on 2 workers KafkaConnect.
   
   I expect to have:
   - Broker1:HeavyLoad1
   - Broker1:HeavyLoad2
   - Broker1:LowLoad1
   - Broker1:LowLoad2
   - Broker2:HeavyLoad3
   - Broker2:HeavyLoad4
   - Broker2:LowLoad3
   - Broker2:LowLoad4
   
   And not a random assignement that can lead to:
   - Broker1:HeavyLoad1
   - Broker1:HeavyLoad2
   - Broker1:HeavyLoad3
   - Broker1:HeavyLoad4
   - Broker2:LowLoad1
   - Broker2:LowLoad2
   - Broker2:LowLoad3
   - Broker2:LowLoad4
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

2020-11-20 Thread GitBox


dengziming commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r527643380



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1445,15 +1483,38 @@ class ReplicaManager(val config: KafkaConfig,
   replicaFetcherManager.shutdownIdleFetcherThreads()
   replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
   onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-  val responsePartitions = responseMap.iterator.map { case (tp, error) 
=>
-new LeaderAndIsrPartitionError()
-  .setTopicName(tp.topic)
-  .setPartitionIndex(tp.partition)
-  .setErrorCode(error.code)
-  }.toBuffer
-  new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-.setErrorCode(Errors.NONE.code)
-.setPartitionErrors(responsePartitions.asJava))
+  if (leaderAndIsrRequest.version() < 4) {

Review comment:
   Should here be version() < 5 ?

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1445,15 +1483,38 @@ class ReplicaManager(val config: KafkaConfig,
   replicaFetcherManager.shutdownIdleFetcherThreads()
   replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
   onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-  val responsePartitions = responseMap.iterator.map { case (tp, error) 
=>
-new LeaderAndIsrPartitionError()
-  .setTopicName(tp.topic)
-  .setPartitionIndex(tp.partition)
-  .setErrorCode(error.code)
-  }.toBuffer
-  new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-.setErrorCode(Errors.NONE.code)
-.setPartitionErrors(responsePartitions.asJava))
+  if (leaderAndIsrRequest.version() < 4) {
+val responsePartitions = responseMap.iterator.map { case (tp, 
error) =>
+  new LeaderAndIsrPartitionError()
+.setTopicName(tp.topic)
+.setPartitionIndex(tp.partition)
+.setErrorCode(error.code)
+}.toBuffer
+new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+  .setErrorCode(Errors.NONE.code)
+  .setPartitionErrors(responsePartitions.asJava))
+  } else {
+val topics = new mutable.HashMap[String, 
List[LeaderAndIsrPartitionError]]
+responseMap.asJava.forEach { case (tp, error) =>
+  if (topics.get(tp.topic) == None) {

Review comment:
   can be changed into topics.contains()





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

2020-11-20 Thread GitBox


cadonna commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r527583614



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##
@@ -103,6 +105,19 @@ public void putAll(final List> 
entries) {
 }
 }
 
+@Override
+public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+Objects.requireNonNull(prefix, "prefix cannot be null");
+Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
+
+final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, 
prefix));
+final Bytes to = Bytes.increment(from);
+
+return new DelegatingPeekingKeyValueIterator<>(
+name,
+new InMemoryKeyValueIterator(map.subMap(from, true, to, 
false).keySet(), true));

Review comment:
   nit: The last parenthesis should go to a new line. I know that we are 
not consistent throughout the code base (that is why this comment is prefixed 
with "nit") but that is actually the code style, we agreed upon. If you need to 
push another commit you can fix this, otherwise it's fine. 
   ```suggestion
   return new DelegatingPeekingKeyValueIterator<>(
   name,
   new InMemoryKeyValueIterator(map.subMap(from, true, to, 
false).keySet(), true)
   );
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
##
@@ -359,6 +361,31 @@ public void shouldReverseIterateOverRange() {
 ), results);
 }
 
+@Test
+public void shouldGetRecordsWithPrefixKey() {
+final List> entries = new ArrayList<>();
+entries.add(new KeyValue<>(bytesKey("k1"), bytesValue("1")));
+entries.add(new KeyValue<>(bytesKey("k2"), bytesValue("2")));
+entries.add(new KeyValue<>(bytesKey("p2"), bytesValue("2")));
+entries.add(new KeyValue<>(bytesKey("p1"), bytesValue("2")));
+entries.add(new KeyValue<>(bytesKey("p0"), bytesValue("2")));
+store.putAll(entries);
+final KeyValueIterator keysWithPrefix = 
store.prefixScan("p", new StringSerializer());
+final List keys = new ArrayList<>();
+final List values = new ArrayList<>();
+int numberOfKeysReturned = 0;
+
+while (keysWithPrefix.hasNext()) {
+final KeyValue next = keysWithPrefix.next();
+keys.add(next.key.toString());
+values.add(new String(next.value));
+numberOfKeysReturned++;
+}
+assertThat(numberOfKeysReturned, is(3));
+assertThat(keys, is(Arrays.asList("p0", "p1", "p2")));
+assertThat(values, is(Arrays.asList("2", "2", "2")));
+}

Review comment:
   The following request is not a requirement to get the PR approved, but 
rather optional extra work to improve the code base. Could you rewrite this 
test with a mock for the inner state store as in `MeteredKeyValueStoreTest`?  

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -229,6 +230,15 @@ public V delete(final K key) {
 }
 }
 
+@Override
+public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+
+return new MeteredKeyValueIterator(
+wrapped().prefixScan(prefix, prefixKeySerializer),
+rangeSensor

Review comment:
   I am not sure whether we should use the `rangeSensor` here or introduce 
a new `prefixScanSensor`. I looked into the KIP discussion but could not find 
any reference to metrics. Either I missed it or we missed it in the KIP 
discussion. What do the reviewers of the KIP think @ableegoldman @vvcephei 
@guozhangwang?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
##
@@ -284,6 +284,12 @@ public boolean isEmpty() {
 return keySetIterator(cache.navigableKeySet().subSet(from, true, to, 
true), true);
 }
 
+synchronized Iterator keyRange(final Bytes from, final Bytes to, 
final boolean toInclusive) {
+if (toInclusive)
+keyRange(from, to);
+return keySetIterator(cache.navigableKeySet().subSet(from, true, to, 
false), true);
+}

Review comment:
   Why not simply:
   ```suggestion
   synchronized Iterator keyRange(final Bytes from, final Bytes to, 
final boolean toInclusive) {
   return keySetIterator(cache.navigableKeySet().subSet(from, true, to, 
toInclusive), true);
   }
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##
@@ -360,6 +361,115 @@ public void shouldPutAll() {
 rocksDBStore.get(new Bytes(stringSerializer.serialize(null, 
"3");
 }
 
+@Test
+public void 

[jira] [Commented] (KAFKA-10746) Consumer poll timeout Expiration should be logged as WARNING not INFO.

2020-11-20 Thread Benedikt Linse (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17236031#comment-17236031
 ] 

Benedikt Linse commented on KAFKA-10746:


Hi Luke, thanks for creating a Patch so quickly! This is very much appreciated. 
I looked at the commit, and it this would certainly help many users. 

> Consumer poll timeout Expiration should be logged as WARNING not INFO. 
> ---
>
> Key: KAFKA-10746
> URL: https://issues.apache.org/jira/browse/KAFKA-10746
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0, 2.6.0, 2.5.1
>Reporter: Benedikt Linse
>Assignee: Luke Chen
>Priority: Minor
>
> When a consumer does not poll regularly, and the `max.poll.interval.ms` 
> threshold is reached, the consumer leaves the consumer group, and the reason 
> is logged as an INFO message:
> [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1356]
> [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1016]
> Most Kafka users ignore INFO messages or have the log level set to WARN. 
> Still many users run into this issue, since their applications take too long 
> to process the polled records, and then the consumer fails to commit the 
> offsets, which leads to duplicate message processing. Not seeing the error 
> message in the first place means that users lose a lot of time debugging and 
> searching for the reason for duplicate message processing.
> Therefore it seems like the log level of this message should be increased to 
> WARN. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-20 Thread GitBox


cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527530444



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +885,77 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread makeThread(final long cacheSizePerThread, final int 
threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(threads.size() + 1);

Review comment:
   I think it would be cleaner to pass `cacheSizePerThread` to 
`resizeThreadCache()` instead of the number of stream threads. We would then 
just call `getCacheSizePerThread()` once instead of once in `addStreamThread()` 
and once in `resizeThreadCache()`. We would also just need to compute 
`threads.size() + 1` once.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +885,77 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread makeThread(final long cacheSizePerThread, final int 
threadIdx) {

Review comment:
   IMO, `createStreamThread()` would describe the behavior better.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could 

[GitHub] [kafka] ning2008wisc edited a comment on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-11-20 Thread GitBox


ning2008wisc edited a comment on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-731014605


   Thanks @mimaison  for your suggestive and insightful feedback. 
   
   Regarding to your 2 major concerns, I agree and I believe there exists a 
feasible solution, especially for a very lean SSL test. I will update this PR 
early next week by addressing all your major and minor comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-11-20 Thread GitBox


ning2008wisc commented on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-731014605


   Thanks @mimaison  for your suggestive and insightful feedback. 
   
   Regarding to your 2 major concerns, I agree and I believe there exists a 
feasible solution. I will update this PR early next week by addressing all your 
major and minor comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-11-20 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r527508609



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import static org.apache.kafka.connect.mirror.TestUtils.expectedRecords;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import org.apache.kafka.test.IntegrationTest;
+import kafka.server.KafkaConfig$;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertFalse;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Common Test functions for MM2 integration tests
+ */
+@Category(IntegrationTest.class)
+public class MirrorConnectorsIntegrationBaseTest {
+private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
+
+protected static final int NUM_RECORDS_PER_PARTITION = 10;
+public static final int NUM_PARTITIONS = 10;
+protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
+protected static final int CHECKPOINT_DURATION_MS = 20_000;
+protected static final int RECORD_CONSUME_DURATION_MS = 20_000;
+protected static final int OFFSET_SYNC_DURATION_MS = 30_000;
+protected static final int NUM_WORKERS = 3;
+protected static final int CONSUMER_POLL_TIMEOUT_MS = 500;
+protected static final int BROKER_RESTART_TIMEOUT_MS = 10_000;
+protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
+protected static final String BACKUP_CLUSTER_ALIAS = "backup";
+
+protected Map mm2Props;
+protected MirrorMakerConfig mm2Config; 
+protected EmbeddedConnectCluster primary;
+protected EmbeddedConnectCluster backup;
+
+private final AtomicBoolean exited = new AtomicBoolean(false);
+private Properties primaryBrokerProps = new Properties();
+protected Properties backupBrokerProps = new Properties();
+private Map primaryWorkerProps = new HashMap<>();
+private Map backupWorkerProps = new HashMap<>();
+private Properties sslProps = new Properties();
+
+private void loadSslPropsFromBrokerConfig() {   

Review comment:
   > Could we move all the SSL bits into the SSL class?
   I believe yes
   
   > We have fields for the configurations. So we could set them accordingly