[GitHub] [kafka] dongjoon-hyun commented on pull request #11995: KAFKA-13782; Ensure correct partition added to txn after abort on full batch

2022-04-05 Thread GitBox


dongjoon-hyun commented on PR #11995:
URL: https://github.com/apache/kafka/pull/11995#issuecomment-1089831593

   Thank you so much all!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sciclon2 commented on pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs

2022-04-05 Thread GitBox


sciclon2 commented on PR #11842:
URL: https://github.com/apache/kafka/pull/11842#issuecomment-1089818537

   @junrao I committed the changes and ran the test, all good, thanks!
   ![Screenshot 2022-04-06 at 06 51 
59](https://user-images.githubusercontent.com/74413315/161898272-80c31461-3a80-41df-84aa-beca8ee022ed.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sciclon2 commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs

2022-04-05 Thread GitBox


sciclon2 commented on code in PR #11842:
URL: https://github.com/apache/kafka/pull/11842#discussion_r843459090


##
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##
@@ -310,6 +334,38 @@ class DumpLogSegmentsTest {
 None
   }
 
+  // Returns the total bytes of the batches specified
+  private def readPartialBatchesBytes(lines: util.ListIterator[String], limit: 
Int): Int = {
+val sizePattern: Regex = raw".+?size:\s(\d+).+".r
+var batchesBytes = 0
+var batchesCounter = 0
+while (lines.hasNext) {
+  if (batchesCounter >= limit){
+return batchesBytes
+  }
+  val line = lines.next()
+  if (line.startsWith("baseOffset")) {
+line match {
+  case sizePattern(size) => batchesBytes += size.toInt
+  case _ => throw new IllegalStateException(s"Failed to parse and find 
size value for batch line: $line")
+}
+batchesCounter += 1
+  }
+}
+return batchesBytes
+  }
+
+  private def countBatches(lines: util.ListIterator[String]): Int = {
+var countBatches = 0
+while (lines.hasNext) {
+  val line = lines.next()
+  if (line.startsWith("baseOffset")) {
+countBatches += 1
+  }
+}
+return countBatches

Review Comment:
   cool, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-6204) Interceptor and MetricsReporter should implement java.io.Closeable

2022-04-05 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-6204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté resolved KAFKA-6204.
--
Fix Version/s: 3.2.0
 Assignee: Xavier Léauté
   Resolution: Fixed

> Interceptor and MetricsReporter should implement java.io.Closeable
> --
>
> Key: KAFKA-6204
> URL: https://issues.apache.org/jira/browse/KAFKA-6204
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Charly Molter
>Assignee: Xavier Léauté
>Priority: Minor
> Fix For: 3.2.0
>
>
> The serializers and deserializers extends the Closeable interface, even 
> ConsumerInterceptors and ProducerInterceptors implement it.
> ConsumerInterceptor, ProducerInterceptor and MetricsReporter do not extend 
> the Closeable interface.
> Maybe they should for coherency with the rest of the apis.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei commented on pull request #11997: KAFKA-6204, KAFKA-7402: ProducerInterceptor should implement AutoCloseable

2022-04-05 Thread GitBox


vvcephei commented on PR #11997:
URL: https://github.com/apache/kafka/pull/11997#issuecomment-1089682577

   Merged! Thanks again, @xvrl . 
   
   About the non-green checks, Jenkins for some reason started a second test 
run with changes from another PR. I'm not sure what that's about, but all the 
tests passed on the change from _this_ PR (run 1) 
(https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11997/1/)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11997: KAFKA-6204, KAFKA-7402: ProducerInterceptor should implement AutoCloseable

2022-04-05 Thread GitBox


vvcephei merged PR #11997:
URL: https://github.com/apache/kafka/pull/11997


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ddrid commented on a diff in pull request #11991: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager

2022-04-05 Thread GitBox


ddrid commented on code in PR #11991:
URL: https://github.com/apache/kafka/pull/11991#discussion_r843398778


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -184,16 +184,22 @@ private void startSequencesAtBeginning(TopicPartition 
topicPartition, ProducerId
 // responses which are due to the retention period elapsing, and those 
which are due to actual lost data.
 private long lastAckedOffset;
 
+private static final Comparator 
PRODUCER_BATCH_COMPARATOR = (b1, b2) -> {
+if (b1.baseSequence() < b2.baseSequence()) return -1;
+else if (b1.baseSequence() > b2.baseSequence()) return 1;
+else return b1.equals(b2) ? 0 : 1;

Review Comment:
   Hi @artemlivshits, thanks for your comment. I don't think it violate the 
requirements for the `compare` method since we are comparing two batches using 
an integer.
   As for the stable order, I think it doesn't affect the current code, but I 
can fix this in another pr if you regard it necessary. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13799) Improve documentation for Kafka zero-copy

2022-04-05 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517772#comment-17517772
 ] 

RivenSun commented on KAFKA-13799:
--

Hi [~guozhang]  [~dajac] , [~showuon]
Could you give some suggestions for this issue?
Thanks.

> Improve documentation for Kafka zero-copy
> -
>
> Key: KAFKA-13799
> URL: https://issues.apache.org/jira/browse/KAFKA-13799
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: RivenSun
>Priority: Major
>
> Via documentation https://kafka.apache.org/documentation/#maximizingefficiency
> and [https://kafka.apache.org/documentation/#networklayer] ,
> We can know that Kafka combines pagecache and zero-copy when reading messages 
> in files on disk, which greatly improves the consumption rate of messages.
> But after browsing the source code:
> Look directly at the *FileRecords.writeTo(...)* method,
> 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), 
> and the bottom layer calls the sendfile method to implement zero-copy data 
> transfer.
> 2. The logic of the SslTransportLayer.transferFrom() method: 
> {code:java}
> fileChannel.read(fileChannelBuffer, pos) 
> -> 
> sslEngine.wrap(src, netWriteBuffer) 
> -> 
> flush(ByteBuffer buf) && socketChannel.write(buf){code}
> That is, first read the data on the disk or directly from the page cache, 
> then encrypt the data, and finally send the encrypted data to the network. 
> {*}FileChannel.transferTo() is not used in the whole process{*}.
>  
> Conclusion: 
> PlaintextTransportLayer and SslTransportLayer both use pagecache, but 
> SslTransportLayer does not implement zero-copy.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13793) Add validators for serialization and deserialization related configuration

2022-04-05 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517770#comment-17517770
 ] 

RivenSun commented on KAFKA-13793:
--

Hi [~guozhang]  [~dajac] , [~showuon]
Could you give some suggestions for this issue?
Thanks.

> Add validators for serialization and deserialization related configuration
> --
>
> Key: KAFKA-13793
> URL: https://issues.apache.org/jira/browse/KAFKA-13793
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, config
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
>
> These configurations of producer and consumer have the same problem.
> {code:java}
> key.serializer, value.serializer, key.deserializer, value.deserializer{code}
>  
> Take the `key.serializer` configuration as an example:
> {code:java}
> Map props = new HashMap<>(); 
> props.put("key.serializer", null);{code}
> It is expected that this abnormal configuration can be verified during the 
> startup process of kafkaProducer, but the actual startup result:
> {code:java}
> Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to 
> construct kafka producer
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:440)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:274)
>     at 
> us.zoom.mq.server.adapter.kafka.ProducerTest.main(ProducerTest.java:139)
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:368)
>     ... 3 more {code}
> There was a line of code that threw a null pointer, causing KafkaProducer 
> initialization to fail.
> I think we should be able to find this bad configuration during the 
> validation of all the configuration i.e. execute the 
> *ConfigDef.parseValue(ConfigKey key, Object value, boolean isSet) method* and 
> throw a *ConfigException* instead of NullPointerException.
> Solution:
> Add *NonNullValidator* to these configurations. 
> For example, when ProducerConfig defines `key.serializer` configuration, add 
> Validator:
> {code:java}
> .define(KEY_SERIALIZER_CLASS_CONFIG,
> Type.CLASS,
> ConfigDef.NO_DEFAULT_VALUE,
> new ConfigDef.NonNullValidator(),
> Importance.HIGH,
> KEY_SERIALIZER_CLASS_DOC) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] RivenSun2 commented on pull request #11985: MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type`

2022-04-05 Thread GitBox


RivenSun2 commented on PR #11985:
URL: https://github.com/apache/kafka/pull/11985#issuecomment-1089641523

   Hi @showuon @mimaison
   could you help to review the PR?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] samarthd commented on pull request #9794: Add a job to build on ARM64 at Amazon Graviton2 nodes

2022-04-05 Thread GitBox


samarthd commented on PR #9794:
URL: https://github.com/apache/kafka/pull/9794#issuecomment-1089560163

   @martin-g Thank you for this PR! I'd love to see this get merged. 
   
   @mimaison You recently closed the parent ticket 
(https://issues.apache.org/jira/browse/KAFKA-10759) referencing a closed PR but 
this PR is still open. Can you please reopen the ticket (or merge this PR)?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax closed pull request #10391: MINOR: disable flaky system test

2022-04-05 Thread GitBox


mjsax closed pull request #10391: MINOR: disable flaky system test
URL: https://github.com/apache/kafka/pull/10391


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13130) Deprecate long based range queries in SessionStore

2022-04-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-13130:
--
Fix Version/s: (was: 3.2.0)

> Deprecate long based range queries in SessionStore
> --
>
> Key: KAFKA-13130
> URL: https://issues.apache.org/jira/browse/KAFKA-13130
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Minor
>  Labels: needs-kip, newbie, newbie++
>
> Migrate long based queries in ReadOnlySessionStore (fetchSession*, 
> findSession*, etc.) to object based interfaces. Deprecate old long based 
> interface, similar to how it was done for the ReadOnlyWindowStore.
> Related KIPs:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times]
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-617%3A+Allow+Kafka+Streams+State+Stores+to+be+iterated+backwards]
>  
> Related Jiras:
> https://issues.apache.org/jira/browse/KAFKA-12419
> https://issues.apache.org/jira/browse/KAFKA-12526
> https://issues.apache.org/jira/browse/KAFKA-12451
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13130) Deprecate long based range queries in SessionStore

2022-04-05 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517724#comment-17517724
 ] 

Bruno Cadonna commented on KAFKA-13130:
---

Removing from the 3.2.0 release since feature freeze has passed.

> Deprecate long based range queries in SessionStore
> --
>
> Key: KAFKA-13130
> URL: https://issues.apache.org/jira/browse/KAFKA-13130
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Minor
>  Labels: needs-kip, newbie, newbie++
> Fix For: 3.2.0
>
>
> Migrate long based queries in ReadOnlySessionStore (fetchSession*, 
> findSession*, etc.) to object based interfaces. Deprecate old long based 
> interface, similar to how it was done for the ReadOnlyWindowStore.
> Related KIPs:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times]
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-617%3A+Allow+Kafka+Streams+State+Stores+to+be+iterated+backwards]
>  
> Related Jiras:
> https://issues.apache.org/jira/browse/KAFKA-12419
> https://issues.apache.org/jira/browse/KAFKA-12526
> https://issues.apache.org/jira/browse/KAFKA-12451
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12781) Improve the endOffsets accuracy in TaskMetadata

2022-04-05 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517723#comment-17517723
 ] 

Bruno Cadonna commented on KAFKA-12781:
---

Removing from the 3.2.0 release since feature freeze has passed.

> Improve the endOffsets accuracy in TaskMetadata 
> 
>
> Key: KAFKA-12781
> URL: https://issues.apache.org/jira/browse/KAFKA-12781
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Priority: Minor
> Fix For: 3.2.0
>
>
> Currently `TaskMetadata#endOffsets()` returns the highest offset seen by the 
> main consumer in streams. It should be possible to get the highest offset in 
> the topic via the consumer instead.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-12781) Improve the endOffsets accuracy in TaskMetadata

2022-04-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-12781:
--
Fix Version/s: (was: 3.2.0)

> Improve the endOffsets accuracy in TaskMetadata 
> 
>
> Key: KAFKA-12781
> URL: https://issues.apache.org/jira/browse/KAFKA-12781
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Priority: Minor
>
> Currently `TaskMetadata#endOffsets()` returns the highest offset seen by the 
> main consumer in streams. It should be possible to get the highest offset in 
> the topic via the consumer instead.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] artemlivshits commented on a diff in pull request #11991: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager

2022-04-05 Thread GitBox


artemlivshits commented on code in PR #11991:
URL: https://github.com/apache/kafka/pull/11991#discussion_r843285049


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -184,16 +184,22 @@ private void startSequencesAtBeginning(TopicPartition 
topicPartition, ProducerId
 // responses which are due to the retention period elapsing, and those 
which are due to actual lost data.
 private long lastAckedOffset;
 
+private static final Comparator 
PRODUCER_BATCH_COMPARATOR = (b1, b2) -> {
+if (b1.baseSequence() < b2.baseSequence()) return -1;
+else if (b1.baseSequence() > b2.baseSequence()) return 1;
+else return b1.equals(b2) ? 0 : 1;

Review Comment:
   Wouldn't this violate the requirements for the `compare` method?
   
   >The implementor must ensure that sgn(compare(x, y)) == -sgn(compare(y, x)) 
for all x and y. (This implies that compare(x, y) must throw an exception if 
and only if compare(y, x) throws an exception.)
   
   > The implementor must also ensure that the relation is transitive: 
((compare(x, y)>0) && (compare(y, z)>0)) implies compare(x, z)>0.
   
   Objects that are not equal need to have a stable order otherwise, binary 
search may not find the objects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #10390: KAFKA-12536: Add Instant-based methods to ReadOnlySessionStore

2022-04-05 Thread GitBox


jeqo commented on PR #10390:
URL: https://github.com/apache/kafka/pull/10390#issuecomment-1089370471

   Thanks @dotjdk ! 
   Do you mean 
https://github.com/apache/kafka/blob/8f8f914efc2dfb2a6638553038dcb17393d7de96/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java#L92
 specifically? If so, I agree. Check if this PR fixes the issue: to open it for 
review: https://github.com/apache/kafka/pull/11999. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo opened a new pull request, #11999: [MINOR] fix(streams): align variable names

2022-04-05 Thread GitBox


jeqo opened a new pull request, #11999:
URL: https://github.com/apache/kafka/pull/11999

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs

2022-04-05 Thread GitBox


junrao commented on code in PR #11842:
URL: https://github.com/apache/kafka/pull/11842#discussion_r843226602


##
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##
@@ -310,6 +334,38 @@ class DumpLogSegmentsTest {
 None
   }
 
+  // Returns the total bytes of the batches specified
+  private def readPartialBatchesBytes(lines: util.ListIterator[String], limit: 
Int): Int = {
+val sizePattern: Regex = raw".+?size:\s(\d+).+".r
+var batchesBytes = 0
+var batchesCounter = 0
+while (lines.hasNext) {
+  if (batchesCounter >= limit){
+return batchesBytes
+  }
+  val line = lines.next()
+  if (line.startsWith("baseOffset")) {
+line match {
+  case sizePattern(size) => batchesBytes += size.toInt
+  case _ => throw new IllegalStateException(s"Failed to parse and find 
size value for batch line: $line")
+}
+batchesCounter += 1
+  }
+}
+return batchesBytes

Review Comment:
   I just meant that in scala you can just do `batchesBytes` instead of `return 
batchesBytes` in the last statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs

2022-04-05 Thread GitBox


junrao commented on code in PR #11842:
URL: https://github.com/apache/kafka/pull/11842#discussion_r843225969


##
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##
@@ -310,6 +334,38 @@ class DumpLogSegmentsTest {
 None
   }
 
+  // Returns the total bytes of the batches specified
+  private def readPartialBatchesBytes(lines: util.ListIterator[String], limit: 
Int): Int = {
+val sizePattern: Regex = raw".+?size:\s(\d+).+".r
+var batchesBytes = 0
+var batchesCounter = 0
+while (lines.hasNext) {
+  if (batchesCounter >= limit){
+return batchesBytes
+  }
+  val line = lines.next()
+  if (line.startsWith("baseOffset")) {
+line match {
+  case sizePattern(size) => batchesBytes += size.toInt
+  case _ => throw new IllegalStateException(s"Failed to parse and find 
size value for batch line: $line")
+}
+batchesCounter += 1
+  }
+}
+return batchesBytes
+  }
+
+  private def countBatches(lines: util.ListIterator[String]): Int = {
+var countBatches = 0
+while (lines.hasNext) {
+  val line = lines.next()
+  if (line.startsWith("baseOffset")) {
+countBatches += 1
+  }
+}
+return countBatches

Review Comment:
   I just meant that in scala you can just do `countBatches` instead of `return 
countBatches` in the last statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #11432: KAFKA-13399 towards scala3

2022-04-05 Thread GitBox


jlprat commented on PR #11432:
URL: https://github.com/apache/kafka/pull/11432#issuecomment-1089271503

   I'll resolve the conflicts tomorrow and ping people for reviews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #11350: Scala3 migration

2022-04-05 Thread GitBox


jlprat commented on PR #11350:
URL: https://github.com/apache/kafka/pull/11350#issuecomment-1089268807

   @jvican some intermediate work is here: 
https://github.com/apache/kafka/pull/11432
   
   I'd need to resolve s couple if conflicts as the PR got no traction for a 
while. But I'm happy to do it!
   
   What I think is that migration to Scala 3 might happen when Kafka 4.0.0 is 
closer in time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jvican commented on pull request #11350: Scala3 migration

2022-04-05 Thread GitBox


jvican commented on PR #11350:
URL: https://github.com/apache/kafka/pull/11350#issuecomment-1089258871

   Any plans to prioritize the Scala 3 migration? Would be nice to see this 
happening.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor

2022-04-05 Thread GitBox


mimaison merged PR #11974:
URL: https://github.com/apache/kafka/pull/11974


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13801) Kafka server does not respect MetricsReporter interface contract for dynamically configured reporters

2022-04-05 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-13801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté updated KAFKA-13801:
--
Component/s: metrics

> Kafka server does not respect MetricsReporter interface contract for 
> dynamically configured reporters
> -
>
> Key: KAFKA-13801
> URL: https://issues.apache.org/jira/browse/KAFKA-13801
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Reporter: Xavier Léauté
>Priority: Minor
>
> MetricsReporter.contextChange contract states the method should always
> be called first before MetricsReporter.init is called. This is done
> correctly for reporters enabled by default (e.g. JmxReporter) but not
> for metrics reporters configured dynamically



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] xvrl opened a new pull request, #11998: KAFKA-13801 Kafka server does not respect MetricsReporter contract for dynamically configured reporters

2022-04-05 Thread GitBox


xvrl opened a new pull request, #11998:
URL: https://github.com/apache/kafka/pull/11998

   MetricsReporter.contextChange contract states the method should always
   be called first before MetricsReporter.init is called. This is done
   correctly for reporters enabled by default (e.g. JmxReporter) but not
   for metrics reporters configured dynamically.
   
   This fixes the call ordering for dynamically configured metrics
   reporter and updates tests to enforce ordering.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13801) Kafka server does not respect MetricsReporter interface contract for dynamically configured reporters

2022-04-05 Thread Jira
Xavier Léauté created KAFKA-13801:
-

 Summary: Kafka server does not respect MetricsReporter interface 
contract for dynamically configured reporters
 Key: KAFKA-13801
 URL: https://issues.apache.org/jira/browse/KAFKA-13801
 Project: Kafka
  Issue Type: Bug
Reporter: Xavier Léauté


MetricsReporter.contextChange contract states the method should always
be called first before MetricsReporter.init is called. This is done
correctly for reporters enabled by default (e.g. JmxReporter) but not
for metrics reporters configured dynamically



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] xvrl opened a new pull request, #11997: KAFKA-6204 KAFKA-7402 ProducerInterceptor should implement AutoCloseable

2022-04-05 Thread GitBox


xvrl opened a new pull request, #11997:
URL: https://github.com/apache/kafka/pull/11997

   As part of KIP-376 we had ConsumerInterceptor implement AutoCloseable
   but forgot to do the same for ProducerInterceptor. This fixes the
   inconsistency and also fixes KAFKA-6204 at the same time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-6204) Interceptor and MetricsReporter should implement java.io.Closeable

2022-04-05 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-6204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517651#comment-17517651
 ] 

Xavier Léauté commented on KAFKA-6204:
--

this was mostly fixed by KIP-376 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-376%3A+Implement+AutoClosable+on+appropriate+classes+that+want+to+be+used+in+a+try-with-resource+statement

it looks like ProducerInterceptor might have been missed, so it should be 
fairly uncontroversial to add it.

> Interceptor and MetricsReporter should implement java.io.Closeable
> --
>
> Key: KAFKA-6204
> URL: https://issues.apache.org/jira/browse/KAFKA-6204
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Charly Molter
>Priority: Minor
>
> The serializers and deserializers extends the Closeable interface, even 
> ConsumerInterceptors and ProducerInterceptors implement it.
> ConsumerInterceptor, ProducerInterceptor and MetricsReporter do not extend 
> the Closeable interface.
> Maybe they should for coherency with the rest of the apis.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] lihaosky commented on a diff in pull request #11896: KAFKA-13785: [Emit final][5/N] emit final for TimeWindowedKStreamImpl

2022-04-05 Thread GitBox


lihaosky commented on code in PR #11896:
URL: https://github.com/apache/kafka/pull/11896#discussion_r843123337


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java:
##
@@ -232,11 +247,19 @@
 );
 break;
 case ROCKS_DB:
-supplier = Stores.persistentTimestampedWindowStore(
-materialized.storeName(),
-Duration.ofMillis(retentionPeriod),
-Duration.ofMillis(windows.size()),
-false
+supplier = emitStrategy.type() == 
StrategyType.ON_WINDOW_CLOSE ?
+
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+materialized.storeName(),

Review Comment:
   @guozhangwang , should we call this another name to avoid overriding 
existing emit eager's store?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896

2022-04-05 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li reassigned KAFKA-13800:
--

Assignee: Hao Li

> Remove force cast of TimeWindowKStreamImpl in tests of 
> https://github.com/apache/kafka/pull/11896
> -
>
> Key: KAFKA-13800
> URL: https://issues.apache.org/jira/browse/KAFKA-13800
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>
> We can remove the cast after `emitStrategy` is added to public api



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13797) Adding metric to indicate metadata response outgoing bytes rate

2022-04-05 Thread Lucas Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-13797:
---
Description: 
It's not a common case, but we experienced the following problem in one of our 
clusters.

The use case involves dynamically creating and deleting topics in the cluster, 
and the clients were constantly using the special type of Metadata requests 
whose topics field is null in order to retrieve all topics before checking a 
topic's existence.

A high rate of such Metadata requests generated a heavy load on brokers in the 
cluster. 

Yet, currently, there is no metric to indicate the metadata response outgoing 
bytes rate.

We propose to add such a metric in order to make the troubleshooting of such 
cases easier.

  was:
It's not a common case, but we experienced the following problem in one of our 
clusters.

The use case involves dynamically creating and deleting topics in the cluster, 
and the clients were constantly checking if a topic exists in a cluster using 
the special type of Metadata requests whose topics field is null in order to 
retrieve all topics before checking a topic's existence.

A high rate of such Metadata requests generated a heavy load on brokers in the 
cluster. 

Yet, currently, there is no metric to indicate the metadata response outgoing 
bytes rate.

We propose to add such a metric in order to make the troubleshooting of such 
cases easier.


> Adding metric to indicate metadata response outgoing bytes rate
> ---
>
> Key: KAFKA-13797
> URL: https://issues.apache.org/jira/browse/KAFKA-13797
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Priority: Minor
>
> It's not a common case, but we experienced the following problem in one of 
> our clusters.
> The use case involves dynamically creating and deleting topics in the 
> cluster, and the clients were constantly using the special type of Metadata 
> requests whose topics field is null in order to retrieve all topics before 
> checking a topic's existence.
> A high rate of such Metadata requests generated a heavy load on brokers in 
> the cluster. 
> Yet, currently, there is no metric to indicate the metadata response outgoing 
> bytes rate.
> We propose to add such a metric in order to make the troubleshooting of such 
> cases easier.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896

2022-04-05 Thread Hao Li (Jira)
Hao Li created KAFKA-13800:
--

 Summary: Remove force cast of TimeWindowKStreamImpl in tests of 
https://github.com/apache/kafka/pull/11896
 Key: KAFKA-13800
 URL: https://issues.apache.org/jira/browse/KAFKA-13800
 Project: Kafka
  Issue Type: Improvement
Reporter: Hao Li


We can remove the cast after `emitStrategy` is added to public api



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13794) Producer batch lost silently in TransactionManager

2022-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13794:

Fix Version/s: 3.2.0

> Producer batch lost silently in TransactionManager
> --
>
> Key: KAFKA-13794
> URL: https://issues.apache.org/jira/browse/KAFKA-13794
> Project: Kafka
>  Issue Type: Bug
>Reporter: xuexiaoyue
>Priority: Major
> Fix For: 3.2.0, 3.1.1, 3.0.2
>
>
> Under the case of idempotence is enabled, when a batch reaches its 
> request.timeout.ms but not yet reaches delivery.timeout.ms, it will be 
> retried and wait for another request.timeout.ms. During the time of this 
> interval, the delivery.timeout.ms may be reached and Sender will remove this 
> in flight batch and bump the producer epoch because of the unresolved 
> sequence, then the sequence of this partition will be reset to 0.
> At this time, if a new batch is sent to the same partition and the former 
> batch reaches request.timeout.ms again, we will see an exception being thrown 
> out by NetworkClient:
> {code:java}
> [ERROR] [kafka-producer-network-thread | producer-1] 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
> Uncaught error in request completion:
>  java.lang.IllegalStateException: We are re-enqueueing a batch which is not 
> tracked as part of the in flight requests. batch.topicPartition: 
> txn_test_1648891362900-2; batch.baseSequence: 0
>    at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code}
> The cause of this is the inflightBatchesBySequence in TransactionManager is 
> not being remove correctly. One batch may be removed by another batch with 
> the same sequence number.
> The potential consequence of this I can think out is that the send progress 
> will be blocked until the latter batch being expired by delivery.timeout.ms
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] hachikuji commented on pull request #11995: KAFKA-13782; Ensure correct partition added to txn after abort on full batch

2022-04-05 Thread GitBox


hachikuji commented on PR #11995:
URL: https://github.com/apache/kafka/pull/11995#issuecomment-1089065258

   @tombentley This is merged to 3.1 along with #11991. Thanks for managing the 
release by the way!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13782) Producer may fail to add the correct partition to transaction

2022-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13782.
-
Resolution: Fixed

> Producer may fail to add the correct partition to transaction
> -
>
> Key: KAFKA-13782
> URL: https://issues.apache.org/jira/browse/KAFKA-13782
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.2.0, 3.1.1
>
>
> In KAFKA-13412, we changed the logic to add partitions to transactions in the 
> producer. The intention was to ensure that the partition is added in 
> `TransactionManager` before the record is appended to the 
> `RecordAccumulator`. However, this does not take into account the possibility 
> that the originally selected partition may be changed if `abortForNewBatch` 
> is set in `RecordAppendResult` in the call to `RecordAccumulator.append`. 
> When this happens, the partitioner can choose a different partition, which 
> means that the `TransactionManager` would be tracking the wrong partition.
> I think the consequence of this is that the batches sent to this partition 
> would get stuck in the `RecordAccumulator` until they timed out because we 
> validate before sending that the partition has been added correctly to the 
> transaction.
> Note that KAFKA-13412 has not been included in any release, so there are no 
> affected versions.
> Thanks to [~alivshits] for identifying the bug.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ijuma commented on pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection

2022-04-05 Thread GitBox


ijuma commented on PR #11965:
URL: https://github.com/apache/kafka/pull/11965#issuecomment-1089061398

   Is it worth backporting this to release branches?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13794) Producer batch lost silently in TransactionManager

2022-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13794.
-
Fix Version/s: 3.1.1
   3.0.2
   Resolution: Fixed

> Producer batch lost silently in TransactionManager
> --
>
> Key: KAFKA-13794
> URL: https://issues.apache.org/jira/browse/KAFKA-13794
> Project: Kafka
>  Issue Type: Bug
>Reporter: xuexiaoyue
>Priority: Major
> Fix For: 3.1.1, 3.0.2
>
>
> Under the case of idempotence is enabled, when a batch reaches its 
> request.timeout.ms but not yet reaches delivery.timeout.ms, it will be 
> retried and wait for another request.timeout.ms. During the time of this 
> interval, the delivery.timeout.ms may be reached and Sender will remove this 
> in flight batch and bump the producer epoch because of the unresolved 
> sequence, then the sequence of this partition will be reset to 0.
> At this time, if a new batch is sent to the same partition and the former 
> batch reaches request.timeout.ms again, we will see an exception being thrown 
> out by NetworkClient:
> {code:java}
> [ERROR] [kafka-producer-network-thread | producer-1] 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
> Uncaught error in request completion:
>  java.lang.IllegalStateException: We are re-enqueueing a batch which is not 
> tracked as part of the in flight requests. batch.topicPartition: 
> txn_test_1648891362900-2; batch.baseSequence: 0
>    at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code}
> The cause of this is the inflightBatchesBySequence in TransactionManager is 
> not being remove correctly. One batch may be removed by another batch with 
> the same sequence number.
> The potential consequence of this I can think out is that the send progress 
> will be blocked until the latter batch being expired by delivery.timeout.ms
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] hachikuji merged pull request #11991: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager

2022-04-05 Thread GitBox


hachikuji merged PR #11991:
URL: https://github.com/apache/kafka/pull/11991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13778) Fetch from follower should never run the preferred read replica selection

2022-04-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13778:

Affects Version/s: 2.6.0
   (was: 3.2.0)

> Fetch from follower should never run the preferred read replica selection
> -
>
> Key: KAFKA-13778
> URL: https://issues.apache.org/jira/browse/KAFKA-13778
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: zhaobo
>Assignee: zhaobo
>Priority: Minor
> Fix For: 3.3.0
>
>
> The design purpose of the code is that only the leader broker can determine 
> the preferred read-replica.
>  
> {code:java}
> readFromLocalLog()
> 
> // If we are the leader, determine the preferred read-replica
> val preferredReadReplica = clientMetadata.flatMap(
>   metadata => findPreferredReadReplica(partition, metadata, replicaId, 
> fetchInfo.fetchOffset, fetchTimeMs)) {code}
>  
> But in fact, since the broker does not judge whether it is the leader or not, 
> the follower will also execute the preferred read-replica selection.
> {code:java}
> partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>
>   // Don't look up preferred for follower fetches via normal replication and
>   if (Request.isValidBrokerId(replicaId))
> None
>   else { {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13778) Fetch from follower should never run the preferred read replica selection

2022-04-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13778.
-
Fix Version/s: 3.3.0
 Reviewer: David Jacot
   Resolution: Fixed

> Fetch from follower should never run the preferred read replica selection
> -
>
> Key: KAFKA-13778
> URL: https://issues.apache.org/jira/browse/KAFKA-13778
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.2.0
>Reporter: zhaobo
>Assignee: zhaobo
>Priority: Minor
> Fix For: 3.3.0
>
>
> The design purpose of the code is that only the leader broker can determine 
> the preferred read-replica.
>  
> {code:java}
> readFromLocalLog()
> 
> // If we are the leader, determine the preferred read-replica
> val preferredReadReplica = clientMetadata.flatMap(
>   metadata => findPreferredReadReplica(partition, metadata, replicaId, 
> fetchInfo.fetchOffset, fetchTimeMs)) {code}
>  
> But in fact, since the broker does not judge whether it is the leader or not, 
> the follower will also execute the preferred read-replica selection.
> {code:java}
> partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>
>   // Don't look up preferred for follower fetches via normal replication and
>   if (Request.isValidBrokerId(replicaId))
> None
>   else { {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac merged pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection

2022-04-05 Thread GitBox


dajac merged PR #11965:
URL: https://github.com/apache/kafka/pull/11965


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11995: KAFKA-13782; Ensure correct partition added to txn after abort on full batch

2022-04-05 Thread GitBox


hachikuji merged PR #11995:
URL: https://github.com/apache/kafka/pull/11995


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11995: KAFKA-13782; Ensure correct partition added to txn after abort on full batch

2022-04-05 Thread GitBox


ijuma commented on PR #11995:
URL: https://github.com/apache/kafka/pull/11995#issuecomment-1088982164

   In addition to this, we will also include 
https://github.com/apache/kafka/pull/11991. @hachikuji should be able to merge 
both this morning, I believe.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dongjoon-hyun commented on pull request #11995: KAFKA-13782; Ensure correct partition added to txn after abort on full batch

2022-04-05 Thread GitBox


dongjoon-hyun commented on PR #11995:
URL: https://github.com/apache/kafka/pull/11995#issuecomment-1088909166

   Hi, @ijuma . According to your dev mailing list email, is this the last 
blocker issue before Apache Kafka 3.1.1 RC?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-12699) Streams no longer overrides the java default uncaught exception handler

2022-04-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-12699:
--
Fix Version/s: (was: 3.2.0)

> Streams no longer overrides the java default uncaught exception handler  
> -
>
> Key: KAFKA-12699
> URL: https://issues.apache.org/jira/browse/KAFKA-12699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Priority: Minor
>
> If a user used `Thread.setUncaughtExceptionHanlder()` to set the handler for 
> all threads in the runtime streams would override that with its own handler. 
> However since streams does not use the `Thread` handler anymore it will no 
> longer do so. This can cause problems if the user does something like 
> `System.exit(1)` in the handler. 
>  
> If using the old handler in streams it will still work as it used to



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12699) Streams no longer overrides the java default uncaught exception handler

2022-04-05 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517525#comment-17517525
 ] 

Bruno Cadonna commented on KAFKA-12699:
---

Removing from the 3.2.0 release since code freeze has passed.

> Streams no longer overrides the java default uncaught exception handler  
> -
>
> Key: KAFKA-12699
> URL: https://issues.apache.org/jira/browse/KAFKA-12699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Priority: Minor
> Fix For: 3.2.0
>
>
> If a user used `Thread.setUncaughtExceptionHanlder()` to set the handler for 
> all threads in the runtime streams would override that with its own handler. 
> However since streams does not use the `Thread` handler anymore it will no 
> longer do so. This can cause problems if the user does something like 
> `System.exit(1)` in the handler. 
>  
> If using the old handler in streams it will still work as it used to



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2022-04-05 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517524#comment-17517524
 ] 

Bruno Cadonna commented on KAFKA-12774:
---

Removing from the 3.2.0 release since code freeze has passed.

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> 
>
> Key: KAFKA-12774
> URL: https://issues.apache.org/jira/browse/KAFKA-12774
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Jørgen
>Priority: Minor
> Fix For: 3.2.0
>
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> 
> 
>  stacktraceAsString="true" properties="true">
>  value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/>
> 
> 
>  {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
> logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
> 
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2022-04-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-12774:
--
Fix Version/s: (was: 3.2.0)

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> 
>
> Key: KAFKA-12774
> URL: https://issues.apache.org/jira/browse/KAFKA-12774
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Jørgen
>Priority: Minor
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> 
> 
>  stacktraceAsString="true" properties="true">
>  value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/>
> 
> 
>  {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
> logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
> 
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2022-04-05 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517523#comment-17517523
 ] 

Bruno Cadonna commented on KAFKA-12774:
---

[~wcarlson5] [~ableegoldman] [~jorgenringen] since we cannot reproduce the 
issue and nobody else has reported this issue since the ticket was opened, 
should we close it as "not reproducible"?

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> 
>
> Key: KAFKA-12774
> URL: https://issues.apache.org/jira/browse/KAFKA-12774
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Jørgen
>Priority: Minor
> Fix For: 3.2.0
>
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> 
> 
>  stacktraceAsString="true" properties="true">
>  value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/>
> 
> 
>  {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
> logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
> 
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] C0urante commented on pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor

2022-04-05 Thread GitBox


C0urante commented on PR #11974:
URL: https://github.com/apache/kafka/pull/11974#issuecomment-105947

   Thanks @mimaison. I hope you don't mind but I've pushed one more change that 
simplifies how the configured set of connectors and tasks is simulated (this 
came in handy when designing tests for one of the scenarios described in 
[KAFKA-13764](https://issues.apache.org/jira/browse/KAFKA-13764)).
   
   It should improve readability by removing the existing `clusterConfigState` 
methods (and the `fillMap` method that was introduced in this PR to clean those 
up) and making changes to the set of configured connectors and tasks more 
explicit. However, if this is too much then let me know and I can revert the 
commit from this PR and add it in a follow-up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor

2022-04-05 Thread GitBox


C0urante commented on code in PR #11974:
URL: https://github.com/apache/kafka/pull/11974#discussion_r842931708


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -76,23 +83,21 @@
 
 @Parameters
 public static Iterable mode() {
-return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1, 
CONNECT_PROTOCOL_V2}});
+return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1}, 
{CONNECT_PROTOCOL_V2}});

Review Comment:
   Blegh, thanks. I'll simplify it anyways; rather leave `trunk` in a healthy 
place and not risk FUD from someone else copy+pasting this style.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -1170,78 +983,128 @@ public void 
testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted()
 
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
 
 // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
-assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-++rebalanceNum;
-returnedAssignments = assignmentsCapture.getValue();
-assertDelay(0, returnedAssignments);
-expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-assertNoReassignments(memberConfigs, expectedMemberConfigs);
-assertAssignment(2, 8, 0, 0, "worker1");
-
-//delete connector1
+performStandardRebalance();
+assertDelay(0);
+assertWorkers("worker1");
+assertConnectorAllocations(2);
+assertTaskAllocations(8);
+assertBalancedAndCompleteAllocation();
+
+// Delete connector1
 configState = clusterConfigState(offset, 2, 1, 4);
 when(coordinator.configSnapshot()).thenReturn(configState);
 
 // Second assignment with a second worker with duplicate assignment 
joining and the duplicated assignment is deleted at the same time
-applyAssignments(returnedAssignments);
-memberConfigs = memberConfigs(leader, offset, assignments);
-ExtendedAssignment duplicatedWorkerAssignment = 
newExpandableAssignment();
-duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2));
-duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 
4));
-memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, duplicatedWorkerAssignment));
-assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-++rebalanceNum;
-returnedAssignments = assignmentsCapture.getValue();
-assertDelay(0, returnedAssignments);
-expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-assertNoReassignments(memberConfigs, expectedMemberConfigs);
-assertAssignment(0, 0, 2, 8, "worker1", "worker2");
+addNewWorker("worker2", newConnectors(1, 2), newTasks("connector1", 0, 
4));
+performStandardRebalance();
+assertDelay(0);
+assertWorkers("worker1", "worker2");
+assertConnectorAllocations(0, 1);
+assertTaskAllocations(0, 4);
 
 // Third assignment after revocations
-applyAssignments(returnedAssignments);
-memberConfigs = memberConfigs(leader, offset, assignments);
-assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-++rebalanceNum;
-returnedAssignments = assignmentsCapture.getValue();
-assertDelay(0, returnedAssignments);
-expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-assertNoReassignments(memberConfigs, expectedMemberConfigs);
-assertAssignment(0, 0, 0, 2, "worker1", "worker2");
+performStandardRebalance();
+assertDelay(0);
+assertConnectorAllocations(0, 1);
+assertTaskAllocations(0, 2);
 
 // fourth rebalance after revocations
-applyAssignments(returnedAssignments);
-memberConfigs = memberConfigs(leader, offset, assignments);
-assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-++rebalanceNum;
-returnedAssignments = assignmentsCapture.getValue();
-assertDelay(0, returnedAssignments);
-expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-assertNoReassignments(memberConfigs, expectedMemberConfigs);
-assertAssignment(0, 2, 0, 0, "worker1", "worker2");
+performStandardRebalance();
+assertDelay(0);
+assertConnectorAllocations(0, 1);
+assertTaskAllocations(2, 2);
+assertBalancedAndCompleteAllocation();
 
 // Fifth rebalance should not change assignments
-

[GitHub] [kafka] mimaison commented on pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-04-05 Thread GitBox


mimaison commented on PR #11748:
URL: https://github.com/apache/kafka/pull/11748#issuecomment-1088831443

   @tombentley @showuon Can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-04-05 Thread GitBox


mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r842896247


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -169,6 +172,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs

Review Comment:
   Thanks @viktorsomogyi that's useful to know!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-04-05 Thread GitBox


viktorsomogyi commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r842854543


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -169,6 +172,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs

Review Comment:
   @mimaison our fix is pretty much the same but a bit more simplistic (uses -1 
instead of Optional). I also think no one would rely on that weird behavior so 
I'm favor in removing that functionality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11991: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager

2022-04-05 Thread GitBox


ijuma commented on PR #11991:
URL: https://github.com/apache/kafka/pull/11991#issuecomment-1088659695

   Thanks, looks like it's been there since 2.4.0 then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13799) Improve documentation for Kafka zero-copy

2022-04-05 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517382#comment-17517382
 ] 

RivenSun edited comment on KAFKA-13799 at 4/5/22 12:37 PM:
---

Hi [~showuon] [~dengziming] and [~guozhang] 
I think we can state in Kafka's documentation that when Kafka reads disk file 
messages, Kafka combines pagecache and zero-copy to greatly improve message 
consumption efficiency. But zero-copy only works in PlaintextTransportLayer. 
Maybe add this declaration in the sendfile module section of the documentation 
below.
[https://kafka.apache.org/documentation/#maximizingefficiency]


This combination of pagecache and sendfile means that on a Kafka cluster where 
the consumers are mostly caught up you will see no read activity on the disks 
whatsoever as they will be serving data entirely from cache. *When the 
transport layer uses the SSL protocol, sendfile will not be used due to the 
need to encrypt the data read.*

WDYT?
Thanks.


was (Author: rivensun):
Hi [~showuon] [~dengziming] and [~guozhang] 
I think we can state in Kafka's documentation that when Kafka reads disk file 
messages, Kafka combines pagecache and zero-copy to greatly improve message 
consumption efficiency. But zero-copy only works in PlaintextTransportLayer. 
Maybe add this declaration in the sendfile module section of the documentation 
below.
[https://kafka.apache.org/documentation/#maximizingefficiency]

WDYT?
Thanks.

> Improve documentation for Kafka zero-copy
> -
>
> Key: KAFKA-13799
> URL: https://issues.apache.org/jira/browse/KAFKA-13799
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: RivenSun
>Priority: Major
>
> Via documentation https://kafka.apache.org/documentation/#maximizingefficiency
> and [https://kafka.apache.org/documentation/#networklayer] ,
> We can know that Kafka combines pagecache and zero-copy when reading messages 
> in files on disk, which greatly improves the consumption rate of messages.
> But after browsing the source code:
> Look directly at the *FileRecords.writeTo(...)* method,
> 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), 
> and the bottom layer calls the sendfile method to implement zero-copy data 
> transfer.
> 2. The logic of the SslTransportLayer.transferFrom() method: 
> {code:java}
> fileChannel.read(fileChannelBuffer, pos) 
> -> 
> sslEngine.wrap(src, netWriteBuffer) 
> -> 
> flush(ByteBuffer buf) && socketChannel.write(buf){code}
> That is, first read the data on the disk or directly from the page cache, 
> then encrypt the data, and finally send the encrypted data to the network. 
> {*}FileChannel.transferTo() is not used in the whole process{*}.
>  
> Conclusion: 
> PlaintextTransportLayer and SslTransportLayer both use pagecache, but 
> SslTransportLayer does not implement zero-copy.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13799) Improve documentation for Kafka zero-copy

2022-04-05 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517382#comment-17517382
 ] 

RivenSun commented on KAFKA-13799:
--

Hi [~showuon] [~dengziming] and [~guozhang] 
I think we can state in Kafka's documentation that when Kafka reads disk file 
messages, Kafka combines pagecache and zero-copy to greatly improve message 
consumption efficiency. But zero-copy only works in PlaintextTransportLayer. 
Maybe add this declaration in the sendfile module section of the documentation 
below.
[https://kafka.apache.org/documentation/#maximizingefficiency]

WDYT?
Thanks.

> Improve documentation for Kafka zero-copy
> -
>
> Key: KAFKA-13799
> URL: https://issues.apache.org/jira/browse/KAFKA-13799
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: RivenSun
>Priority: Major
>
> Via documentation https://kafka.apache.org/documentation/#maximizingefficiency
> and [https://kafka.apache.org/documentation/#networklayer] ,
> We can know that Kafka combines pagecache and zero-copy when reading messages 
> in files on disk, which greatly improves the consumption rate of messages.
> But after browsing the source code:
> Look directly at the *FileRecords.writeTo(...)* method,
> 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), 
> and the bottom layer calls the sendfile method to implement zero-copy data 
> transfer.
> 2. The logic of the SslTransportLayer.transferFrom() method: 
> {code:java}
> fileChannel.read(fileChannelBuffer, pos) 
> -> 
> sslEngine.wrap(src, netWriteBuffer) 
> -> 
> flush(ByteBuffer buf) && socketChannel.write(buf){code}
> That is, first read the data on the disk or directly from the page cache, 
> then encrypt the data, and finally send the encrypted data to the network. 
> {*}FileChannel.transferTo() is not used in the whole process{*}.
>  
> Conclusion: 
> PlaintextTransportLayer and SslTransportLayer both use pagecache, but 
> SslTransportLayer does not implement zero-copy.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13799) Improve documentation for Kafka zero-copy

2022-04-05 Thread RivenSun (Jira)
RivenSun created KAFKA-13799:


 Summary: Improve documentation for Kafka zero-copy
 Key: KAFKA-13799
 URL: https://issues.apache.org/jira/browse/KAFKA-13799
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Reporter: RivenSun


Via documentation https://kafka.apache.org/documentation/#maximizingefficiency
and [https://kafka.apache.org/documentation/#networklayer] ,
We can know that Kafka combines pagecache and zero-copy when reading messages 
in files on disk, which greatly improves the consumption rate of messages.
But after browsing the source code:
Look directly at the *FileRecords.writeTo(...)* method,
1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), 
and the bottom layer calls the sendfile method to implement zero-copy data 
transfer.
2. The logic of the SslTransportLayer.transferFrom() method: 
{code:java}
fileChannel.read(fileChannelBuffer, pos) 
-> 
sslEngine.wrap(src, netWriteBuffer) 
-> 
flush(ByteBuffer buf) && socketChannel.write(buf){code}
That is, first read the data on the disk or directly from the page cache, then 
encrypt the data, and finally send the encrypted data to the network. 
{*}FileChannel.transferTo() is not used in the whole process{*}.

 

Conclusion: 

PlaintextTransportLayer and SslTransportLayer both use pagecache, but 
SslTransportLayer does not implement zero-copy.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] sciclon2 commented on pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs

2022-04-05 Thread GitBox


sciclon2 commented on PR #11842:
URL: https://github.com/apache/kafka/pull/11842#issuecomment-1088562983

   @junrao the tests that I added are working locally so I guess the errors are 
not relate to my tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor

2022-04-05 Thread GitBox


mimaison commented on code in PR #11974:
URL: https://github.com/apache/kafka/pull/11974#discussion_r842632626


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -769,16 +609,16 @@ public void testLostAssignmentHandlingWhenWorkerBounces() 
{
 new ArrayList<>(configuredAssignment.values()),
 memberConfigs);
 
-assertThat("Wrong set of workers for reassignments",
+assertEquals("Wrong set of workers for reassignments",

Review Comment:
   What about simplifying these assertions into:
   ```
   assertTrue("Wrong set of workers for reassignments", 
assignor.candidateWorkersForReassignment.isEmpty());
   ```
   



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -1170,78 +983,128 @@ public void 
testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted()
 
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
 
 // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
-assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-++rebalanceNum;
-returnedAssignments = assignmentsCapture.getValue();
-assertDelay(0, returnedAssignments);
-expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-assertNoReassignments(memberConfigs, expectedMemberConfigs);
-assertAssignment(2, 8, 0, 0, "worker1");
-
-//delete connector1
+performStandardRebalance();
+assertDelay(0);
+assertWorkers("worker1");
+assertConnectorAllocations(2);
+assertTaskAllocations(8);
+assertBalancedAndCompleteAllocation();
+
+// Delete connector1
 configState = clusterConfigState(offset, 2, 1, 4);
 when(coordinator.configSnapshot()).thenReturn(configState);
 
 // Second assignment with a second worker with duplicate assignment 
joining and the duplicated assignment is deleted at the same time
-applyAssignments(returnedAssignments);
-memberConfigs = memberConfigs(leader, offset, assignments);
-ExtendedAssignment duplicatedWorkerAssignment = 
newExpandableAssignment();
-duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2));
-duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 
4));
-memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, duplicatedWorkerAssignment));
-assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-++rebalanceNum;
-returnedAssignments = assignmentsCapture.getValue();
-assertDelay(0, returnedAssignments);
-expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-assertNoReassignments(memberConfigs, expectedMemberConfigs);
-assertAssignment(0, 0, 2, 8, "worker1", "worker2");
+addNewWorker("worker2", newConnectors(1, 2), newTasks("connector1", 0, 
4));
+performStandardRebalance();
+assertDelay(0);
+assertWorkers("worker1", "worker2");
+assertConnectorAllocations(0, 1);
+assertTaskAllocations(0, 4);
 
 // Third assignment after revocations
-applyAssignments(returnedAssignments);
-memberConfigs = memberConfigs(leader, offset, assignments);
-assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-++rebalanceNum;
-returnedAssignments = assignmentsCapture.getValue();
-assertDelay(0, returnedAssignments);
-expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-assertNoReassignments(memberConfigs, expectedMemberConfigs);
-assertAssignment(0, 0, 0, 2, "worker1", "worker2");
+performStandardRebalance();
+assertDelay(0);
+assertConnectorAllocations(0, 1);
+assertTaskAllocations(0, 2);
 
 // fourth rebalance after revocations
-applyAssignments(returnedAssignments);
-memberConfigs = memberConfigs(leader, offset, assignments);
-assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-++rebalanceNum;
-returnedAssignments = assignmentsCapture.getValue();
-assertDelay(0, returnedAssignments);
-expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-assertNoReassignments(memberConfigs, expectedMemberConfigs);
-assertAssignment(0, 2, 0, 0, "worker1", "worker2");
+performStandardRebalance();
+assertDelay(0);
+assertConnectorAllocations(0, 1);
+assertTaskAllocations(2, 2);
+assertBalancedAndCompleteAllocation();
 
 // 

[GitHub] [kafka] sciclon2 commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs

2022-04-05 Thread GitBox


sciclon2 commented on code in PR #11842:
URL: https://github.com/apache/kafka/pull/11842#discussion_r842615733


##
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##
@@ -430,6 +431,11 @@ object DumpLogSegments {
   .describedAs("size")
   .ofType(classOf[java.lang.Integer])
   .defaultsTo(5 * 1024 * 1024)
+val maxBytesOpt = parser.accepts("max-bytes", "Limit the amount of total 
batches in bytes avoiding reading the whole file(s).")

Review Comment:
   makes sense, added!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sciclon2 commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs

2022-04-05 Thread GitBox


sciclon2 commented on code in PR #11842:
URL: https://github.com/apache/kafka/pull/11842#discussion_r842615490


##
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##
@@ -310,6 +334,38 @@ class DumpLogSegmentsTest {
 None
   }
 
+  // Returns the total bytes of the batches specified
+  private def readPartialBatchesBytes(lines: util.ListIterator[String], limit: 
Int): Int = {
+val sizePattern: Regex = raw".+?size:\s(\d+).+".r
+var batchesBytes = 0
+var batchesCounter = 0
+while (lines.hasNext) {
+  if (batchesCounter >= limit){
+return batchesBytes
+  }
+  val line = lines.next()
+  if (line.startsWith("baseOffset")) {
+line match {
+  case sizePattern(size) => batchesBytes += size.toInt
+  case _ => throw new IllegalStateException(s"Failed to parse and find 
size value for batch line: $line")
+}
+batchesCounter += 1
+  }
+}
+return batchesBytes
+  }
+
+  private def countBatches(lines: util.ListIterator[String]): Int = {
+var countBatches = 0
+while (lines.hasNext) {
+  val line = lines.next()
+  if (line.startsWith("baseOffset")) {
+countBatches += 1
+  }
+}
+return countBatches

Review Comment:
   sorry I dont get why the return is not needed here :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sciclon2 commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs

2022-04-05 Thread GitBox


sciclon2 commented on code in PR #11842:
URL: https://github.com/apache/kafka/pull/11842#discussion_r842614837


##
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##
@@ -310,6 +334,38 @@ class DumpLogSegmentsTest {
 None
   }
 
+  // Returns the total bytes of the batches specified
+  private def readPartialBatchesBytes(lines: util.ListIterator[String], limit: 
Int): Int = {
+val sizePattern: Regex = raw".+?size:\s(\d+).+".r
+var batchesBytes = 0
+var batchesCounter = 0
+while (lines.hasNext) {
+  if (batchesCounter >= limit){
+return batchesBytes
+  }
+  val line = lines.next()
+  if (line.startsWith("baseOffset")) {
+line match {
+  case sizePattern(size) => batchesBytes += size.toInt
+  case _ => throw new IllegalStateException(s"Failed to parse and find 
size value for batch line: $line")
+}
+batchesCounter += 1
+  }
+}
+return batchesBytes

Review Comment:
   I agree  in this scenario `limit` always be lower (half) than the total 
lines, but what if tomorrow we use this method passing a limit which is bigger 
than the total of batches?  for example the batches = 3 and limit = 5 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor

2022-04-05 Thread GitBox


mimaison commented on code in PR #11974:
URL: https://github.com/apache/kafka/pull/11974#discussion_r842589656


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -76,23 +83,21 @@
 
 @Parameters
 public static Iterable mode() {
-return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1, 
CONNECT_PROTOCOL_V2}});
+return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1}, 
{CONNECT_PROTOCOL_V2}});

Review Comment:
   This can be simplified into `return Arrays.asList(CONNECT_PROTOCOL_V1, 
CONNECT_PROTOCOL_V2);` but since you're removing it in 
https://github.com/apache/kafka/pull/11983, we can ignore it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor

2022-04-05 Thread GitBox


mimaison commented on code in PR #11974:
URL: https://github.com/apache/kafka/pull/11974#discussion_r842589656


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -76,23 +83,21 @@
 
 @Parameters
 public static Iterable mode() {
-return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1, 
CONNECT_PROTOCOL_V2}});
+return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1}, 
{CONNECT_PROTOCOL_V2}});

Review Comment:
   This can be simplified into `return Arrays.asList(new Object[] 
{CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2});` but since you're removing it in 
https://github.com/apache/kafka/pull/11983, we can ignore it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ddrid commented on pull request #11991: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager

2022-04-05 Thread GitBox


ddrid commented on PR #11991:
URL: https://github.com/apache/kafka/pull/11991#issuecomment-1088446711

   I think it was introduced in #6883, but I'm not sure if it affects all the 
versions since then. What do you think? @hachikuji 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13791) Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of lazy-initialized members should be the last step with double-checked locking

2022-04-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13791.
---
Fix Version/s: 3.3.0
   Resolution: Fixed

> Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of 
> lazy-initialized members should be the last step with double-checked locking
> ---
>
> Key: KAFKA-13791
> URL: https://issues.apache.org/jira/browse/KAFKA-13791
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.1
>Reporter: YunKui Lu
>Priority: Trivial
> Fix For: 3.3.0
>
>
> Double-checked locking can be used for lazy initialization of volatile 
> fields, but only if field assignment is the last step in the synchronized 
> block. Otherwise, you run the risk of threads accessing a half-initialized 
> object.
> The problem is consistent with 
> [KAFKA-13777|https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-13777]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon merged pull request #11981: KAFKA-13791: Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of lazy-initialized members should be the last step with double-checke

2022-04-05 Thread GitBox


showuon merged PR #11981:
URL: https://github.com/apache/kafka/pull/11981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11981: KAFKA-13791: Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of lazy-initialized members should be the last step with double-

2022-04-05 Thread GitBox


showuon commented on PR #11981:
URL: https://github.com/apache/kafka/pull/11981#issuecomment-1088355283

   Failed tests are unrelated.
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.api.ClientIdQuotaTest.testProducerConsumerOverrideUnthrottled()
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, 
Name=testElectionResultOutput, Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org