[GitHub] [kafka] dongjoon-hyun commented on pull request #11995: KAFKA-13782; Ensure correct partition added to txn after abort on full batch
dongjoon-hyun commented on PR #11995: URL: https://github.com/apache/kafka/pull/11995#issuecomment-1089831593 Thank you so much all! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sciclon2 commented on pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs
sciclon2 commented on PR #11842: URL: https://github.com/apache/kafka/pull/11842#issuecomment-1089818537 @junrao I committed the changes and ran the test, all good, thanks! ![Screenshot 2022-04-06 at 06 51 59](https://user-images.githubusercontent.com/74413315/161898272-80c31461-3a80-41df-84aa-beca8ee022ed.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sciclon2 commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs
sciclon2 commented on code in PR #11842: URL: https://github.com/apache/kafka/pull/11842#discussion_r843459090 ## core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala: ## @@ -310,6 +334,38 @@ class DumpLogSegmentsTest { None } + // Returns the total bytes of the batches specified + private def readPartialBatchesBytes(lines: util.ListIterator[String], limit: Int): Int = { +val sizePattern: Regex = raw".+?size:\s(\d+).+".r +var batchesBytes = 0 +var batchesCounter = 0 +while (lines.hasNext) { + if (batchesCounter >= limit){ +return batchesBytes + } + val line = lines.next() + if (line.startsWith("baseOffset")) { +line match { + case sizePattern(size) => batchesBytes += size.toInt + case _ => throw new IllegalStateException(s"Failed to parse and find size value for batch line: $line") +} +batchesCounter += 1 + } +} +return batchesBytes + } + + private def countBatches(lines: util.ListIterator[String]): Int = { +var countBatches = 0 +while (lines.hasNext) { + val line = lines.next() + if (line.startsWith("baseOffset")) { +countBatches += 1 + } +} +return countBatches Review Comment: cool, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-6204) Interceptor and MetricsReporter should implement java.io.Closeable
[ https://issues.apache.org/jira/browse/KAFKA-6204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xavier Léauté resolved KAFKA-6204. -- Fix Version/s: 3.2.0 Assignee: Xavier Léauté Resolution: Fixed > Interceptor and MetricsReporter should implement java.io.Closeable > -- > > Key: KAFKA-6204 > URL: https://issues.apache.org/jira/browse/KAFKA-6204 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Charly Molter >Assignee: Xavier Léauté >Priority: Minor > Fix For: 3.2.0 > > > The serializers and deserializers extends the Closeable interface, even > ConsumerInterceptors and ProducerInterceptors implement it. > ConsumerInterceptor, ProducerInterceptor and MetricsReporter do not extend > the Closeable interface. > Maybe they should for coherency with the rest of the apis. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] vvcephei commented on pull request #11997: KAFKA-6204, KAFKA-7402: ProducerInterceptor should implement AutoCloseable
vvcephei commented on PR #11997: URL: https://github.com/apache/kafka/pull/11997#issuecomment-1089682577 Merged! Thanks again, @xvrl . About the non-green checks, Jenkins for some reason started a second test run with changes from another PR. I'm not sure what that's about, but all the tests passed on the change from _this_ PR (run 1) (https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11997/1/) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #11997: KAFKA-6204, KAFKA-7402: ProducerInterceptor should implement AutoCloseable
vvcephei merged PR #11997: URL: https://github.com/apache/kafka/pull/11997 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ddrid commented on a diff in pull request #11991: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager
ddrid commented on code in PR #11991: URL: https://github.com/apache/kafka/pull/11991#discussion_r843398778 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -184,16 +184,22 @@ private void startSequencesAtBeginning(TopicPartition topicPartition, ProducerId // responses which are due to the retention period elapsing, and those which are due to actual lost data. private long lastAckedOffset; +private static final Comparator PRODUCER_BATCH_COMPARATOR = (b1, b2) -> { +if (b1.baseSequence() < b2.baseSequence()) return -1; +else if (b1.baseSequence() > b2.baseSequence()) return 1; +else return b1.equals(b2) ? 0 : 1; Review Comment: Hi @artemlivshits, thanks for your comment. I don't think it violate the requirements for the `compare` method since we are comparing two batches using an integer. As for the stable order, I think it doesn't affect the current code, but I can fix this in another pr if you regard it necessary. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13799) Improve documentation for Kafka zero-copy
[ https://issues.apache.org/jira/browse/KAFKA-13799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517772#comment-17517772 ] RivenSun commented on KAFKA-13799: -- Hi [~guozhang] [~dajac] , [~showuon] Could you give some suggestions for this issue? Thanks. > Improve documentation for Kafka zero-copy > - > > Key: KAFKA-13799 > URL: https://issues.apache.org/jira/browse/KAFKA-13799 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: RivenSun >Priority: Major > > Via documentation https://kafka.apache.org/documentation/#maximizingefficiency > and [https://kafka.apache.org/documentation/#networklayer] , > We can know that Kafka combines pagecache and zero-copy when reading messages > in files on disk, which greatly improves the consumption rate of messages. > But after browsing the source code: > Look directly at the *FileRecords.writeTo(...)* method, > 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), > and the bottom layer calls the sendfile method to implement zero-copy data > transfer. > 2. The logic of the SslTransportLayer.transferFrom() method: > {code:java} > fileChannel.read(fileChannelBuffer, pos) > -> > sslEngine.wrap(src, netWriteBuffer) > -> > flush(ByteBuffer buf) && socketChannel.write(buf){code} > That is, first read the data on the disk or directly from the page cache, > then encrypt the data, and finally send the encrypted data to the network. > {*}FileChannel.transferTo() is not used in the whole process{*}. > > Conclusion: > PlaintextTransportLayer and SslTransportLayer both use pagecache, but > SslTransportLayer does not implement zero-copy. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13793) Add validators for serialization and deserialization related configuration
[ https://issues.apache.org/jira/browse/KAFKA-13793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517770#comment-17517770 ] RivenSun commented on KAFKA-13793: -- Hi [~guozhang] [~dajac] , [~showuon] Could you give some suggestions for this issue? Thanks. > Add validators for serialization and deserialization related configuration > -- > > Key: KAFKA-13793 > URL: https://issues.apache.org/jira/browse/KAFKA-13793 > Project: Kafka > Issue Type: Improvement > Components: clients, config >Reporter: RivenSun >Assignee: RivenSun >Priority: Major > > These configurations of producer and consumer have the same problem. > {code:java} > key.serializer, value.serializer, key.deserializer, value.deserializer{code} > > Take the `key.serializer` configuration as an example: > {code:java} > Map props = new HashMap<>(); > props.put("key.serializer", null);{code} > It is expected that this abnormal configuration can be verified during the > startup process of kafkaProducer, but the actual startup result: > {code:java} > Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to > construct kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:440) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:274) > at > us.zoom.mq.server.adapter.kafka.ProducerTest.main(ProducerTest.java:139) > Caused by: java.lang.NullPointerException > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:368) > ... 3 more {code} > There was a line of code that threw a null pointer, causing KafkaProducer > initialization to fail. > I think we should be able to find this bad configuration during the > validation of all the configuration i.e. execute the > *ConfigDef.parseValue(ConfigKey key, Object value, boolean isSet) method* and > throw a *ConfigException* instead of NullPointerException. > Solution: > Add *NonNullValidator* to these configurations. > For example, when ProducerConfig defines `key.serializer` configuration, add > Validator: > {code:java} > .define(KEY_SERIALIZER_CLASS_CONFIG, > Type.CLASS, > ConfigDef.NO_DEFAULT_VALUE, > new ConfigDef.NonNullValidator(), > Importance.HIGH, > KEY_SERIALIZER_CLASS_DOC) {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] RivenSun2 commented on pull request #11985: MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type`
RivenSun2 commented on PR #11985: URL: https://github.com/apache/kafka/pull/11985#issuecomment-1089641523 Hi @showuon @mimaison could you help to review the PR? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] samarthd commented on pull request #9794: Add a job to build on ARM64 at Amazon Graviton2 nodes
samarthd commented on PR #9794: URL: https://github.com/apache/kafka/pull/9794#issuecomment-1089560163 @martin-g Thank you for this PR! I'd love to see this get merged. @mimaison You recently closed the parent ticket (https://issues.apache.org/jira/browse/KAFKA-10759) referencing a closed PR but this PR is still open. Can you please reopen the ticket (or merge this PR)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax closed pull request #10391: MINOR: disable flaky system test
mjsax closed pull request #10391: MINOR: disable flaky system test URL: https://github.com/apache/kafka/pull/10391 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13130) Deprecate long based range queries in SessionStore
[ https://issues.apache.org/jira/browse/KAFKA-13130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-13130: -- Fix Version/s: (was: 3.2.0) > Deprecate long based range queries in SessionStore > -- > > Key: KAFKA-13130 > URL: https://issues.apache.org/jira/browse/KAFKA-13130 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Patrick Stuedi >Assignee: Patrick Stuedi >Priority: Minor > Labels: needs-kip, newbie, newbie++ > > Migrate long based queries in ReadOnlySessionStore (fetchSession*, > findSession*, etc.) to object based interfaces. Deprecate old long based > interface, similar to how it was done for the ReadOnlyWindowStore. > Related KIPs: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times] > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-617%3A+Allow+Kafka+Streams+State+Stores+to+be+iterated+backwards] > > Related Jiras: > https://issues.apache.org/jira/browse/KAFKA-12419 > https://issues.apache.org/jira/browse/KAFKA-12526 > https://issues.apache.org/jira/browse/KAFKA-12451 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13130) Deprecate long based range queries in SessionStore
[ https://issues.apache.org/jira/browse/KAFKA-13130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517724#comment-17517724 ] Bruno Cadonna commented on KAFKA-13130: --- Removing from the 3.2.0 release since feature freeze has passed. > Deprecate long based range queries in SessionStore > -- > > Key: KAFKA-13130 > URL: https://issues.apache.org/jira/browse/KAFKA-13130 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Patrick Stuedi >Assignee: Patrick Stuedi >Priority: Minor > Labels: needs-kip, newbie, newbie++ > Fix For: 3.2.0 > > > Migrate long based queries in ReadOnlySessionStore (fetchSession*, > findSession*, etc.) to object based interfaces. Deprecate old long based > interface, similar to how it was done for the ReadOnlyWindowStore. > Related KIPs: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times] > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-617%3A+Allow+Kafka+Streams+State+Stores+to+be+iterated+backwards] > > Related Jiras: > https://issues.apache.org/jira/browse/KAFKA-12419 > https://issues.apache.org/jira/browse/KAFKA-12526 > https://issues.apache.org/jira/browse/KAFKA-12451 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12781) Improve the endOffsets accuracy in TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517723#comment-17517723 ] Bruno Cadonna commented on KAFKA-12781: --- Removing from the 3.2.0 release since feature freeze has passed. > Improve the endOffsets accuracy in TaskMetadata > > > Key: KAFKA-12781 > URL: https://issues.apache.org/jira/browse/KAFKA-12781 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Priority: Minor > Fix For: 3.2.0 > > > Currently `TaskMetadata#endOffsets()` returns the highest offset seen by the > main consumer in streams. It should be possible to get the highest offset in > the topic via the consumer instead. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-12781) Improve the endOffsets accuracy in TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-12781: -- Fix Version/s: (was: 3.2.0) > Improve the endOffsets accuracy in TaskMetadata > > > Key: KAFKA-12781 > URL: https://issues.apache.org/jira/browse/KAFKA-12781 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Priority: Minor > > Currently `TaskMetadata#endOffsets()` returns the highest offset seen by the > main consumer in streams. It should be possible to get the highest offset in > the topic via the consumer instead. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] artemlivshits commented on a diff in pull request #11991: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager
artemlivshits commented on code in PR #11991: URL: https://github.com/apache/kafka/pull/11991#discussion_r843285049 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -184,16 +184,22 @@ private void startSequencesAtBeginning(TopicPartition topicPartition, ProducerId // responses which are due to the retention period elapsing, and those which are due to actual lost data. private long lastAckedOffset; +private static final Comparator PRODUCER_BATCH_COMPARATOR = (b1, b2) -> { +if (b1.baseSequence() < b2.baseSequence()) return -1; +else if (b1.baseSequence() > b2.baseSequence()) return 1; +else return b1.equals(b2) ? 0 : 1; Review Comment: Wouldn't this violate the requirements for the `compare` method? >The implementor must ensure that sgn(compare(x, y)) == -sgn(compare(y, x)) for all x and y. (This implies that compare(x, y) must throw an exception if and only if compare(y, x) throws an exception.) > The implementor must also ensure that the relation is transitive: ((compare(x, y)>0) && (compare(y, z)>0)) implies compare(x, z)>0. Objects that are not equal need to have a stable order otherwise, binary search may not find the objects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on pull request #10390: KAFKA-12536: Add Instant-based methods to ReadOnlySessionStore
jeqo commented on PR #10390: URL: https://github.com/apache/kafka/pull/10390#issuecomment-1089370471 Thanks @dotjdk ! Do you mean https://github.com/apache/kafka/blob/8f8f914efc2dfb2a6638553038dcb17393d7de96/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java#L92 specifically? If so, I agree. Check if this PR fixes the issue: to open it for review: https://github.com/apache/kafka/pull/11999. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo opened a new pull request, #11999: [MINOR] fix(streams): align variable names
jeqo opened a new pull request, #11999: URL: https://github.com/apache/kafka/pull/11999 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs
junrao commented on code in PR #11842: URL: https://github.com/apache/kafka/pull/11842#discussion_r843226602 ## core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala: ## @@ -310,6 +334,38 @@ class DumpLogSegmentsTest { None } + // Returns the total bytes of the batches specified + private def readPartialBatchesBytes(lines: util.ListIterator[String], limit: Int): Int = { +val sizePattern: Regex = raw".+?size:\s(\d+).+".r +var batchesBytes = 0 +var batchesCounter = 0 +while (lines.hasNext) { + if (batchesCounter >= limit){ +return batchesBytes + } + val line = lines.next() + if (line.startsWith("baseOffset")) { +line match { + case sizePattern(size) => batchesBytes += size.toInt + case _ => throw new IllegalStateException(s"Failed to parse and find size value for batch line: $line") +} +batchesCounter += 1 + } +} +return batchesBytes Review Comment: I just meant that in scala you can just do `batchesBytes` instead of `return batchesBytes` in the last statement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs
junrao commented on code in PR #11842: URL: https://github.com/apache/kafka/pull/11842#discussion_r843225969 ## core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala: ## @@ -310,6 +334,38 @@ class DumpLogSegmentsTest { None } + // Returns the total bytes of the batches specified + private def readPartialBatchesBytes(lines: util.ListIterator[String], limit: Int): Int = { +val sizePattern: Regex = raw".+?size:\s(\d+).+".r +var batchesBytes = 0 +var batchesCounter = 0 +while (lines.hasNext) { + if (batchesCounter >= limit){ +return batchesBytes + } + val line = lines.next() + if (line.startsWith("baseOffset")) { +line match { + case sizePattern(size) => batchesBytes += size.toInt + case _ => throw new IllegalStateException(s"Failed to parse and find size value for batch line: $line") +} +batchesCounter += 1 + } +} +return batchesBytes + } + + private def countBatches(lines: util.ListIterator[String]): Int = { +var countBatches = 0 +while (lines.hasNext) { + val line = lines.next() + if (line.startsWith("baseOffset")) { +countBatches += 1 + } +} +return countBatches Review Comment: I just meant that in scala you can just do `countBatches` instead of `return countBatches` in the last statement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11432: KAFKA-13399 towards scala3
jlprat commented on PR #11432: URL: https://github.com/apache/kafka/pull/11432#issuecomment-1089271503 I'll resolve the conflicts tomorrow and ping people for reviews -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11350: Scala3 migration
jlprat commented on PR #11350: URL: https://github.com/apache/kafka/pull/11350#issuecomment-1089268807 @jvican some intermediate work is here: https://github.com/apache/kafka/pull/11432 I'd need to resolve s couple if conflicts as the PR got no traction for a while. But I'm happy to do it! What I think is that migration to Scala 3 might happen when Kafka 4.0.0 is closer in time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jvican commented on pull request #11350: Scala3 migration
jvican commented on PR #11350: URL: https://github.com/apache/kafka/pull/11350#issuecomment-1089258871 Any plans to prioritize the Scala 3 migration? Would be nice to see this happening. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
mimaison merged PR #11974: URL: https://github.com/apache/kafka/pull/11974 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13801) Kafka server does not respect MetricsReporter interface contract for dynamically configured reporters
[ https://issues.apache.org/jira/browse/KAFKA-13801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xavier Léauté updated KAFKA-13801: -- Component/s: metrics > Kafka server does not respect MetricsReporter interface contract for > dynamically configured reporters > - > > Key: KAFKA-13801 > URL: https://issues.apache.org/jira/browse/KAFKA-13801 > Project: Kafka > Issue Type: Bug > Components: metrics >Reporter: Xavier Léauté >Priority: Minor > > MetricsReporter.contextChange contract states the method should always > be called first before MetricsReporter.init is called. This is done > correctly for reporters enabled by default (e.g. JmxReporter) but not > for metrics reporters configured dynamically -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] xvrl opened a new pull request, #11998: KAFKA-13801 Kafka server does not respect MetricsReporter contract for dynamically configured reporters
xvrl opened a new pull request, #11998: URL: https://github.com/apache/kafka/pull/11998 MetricsReporter.contextChange contract states the method should always be called first before MetricsReporter.init is called. This is done correctly for reporters enabled by default (e.g. JmxReporter) but not for metrics reporters configured dynamically. This fixes the call ordering for dynamically configured metrics reporter and updates tests to enforce ordering. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13801) Kafka server does not respect MetricsReporter interface contract for dynamically configured reporters
Xavier Léauté created KAFKA-13801: - Summary: Kafka server does not respect MetricsReporter interface contract for dynamically configured reporters Key: KAFKA-13801 URL: https://issues.apache.org/jira/browse/KAFKA-13801 Project: Kafka Issue Type: Bug Reporter: Xavier Léauté MetricsReporter.contextChange contract states the method should always be called first before MetricsReporter.init is called. This is done correctly for reporters enabled by default (e.g. JmxReporter) but not for metrics reporters configured dynamically -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] xvrl opened a new pull request, #11997: KAFKA-6204 KAFKA-7402 ProducerInterceptor should implement AutoCloseable
xvrl opened a new pull request, #11997: URL: https://github.com/apache/kafka/pull/11997 As part of KIP-376 we had ConsumerInterceptor implement AutoCloseable but forgot to do the same for ProducerInterceptor. This fixes the inconsistency and also fixes KAFKA-6204 at the same time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6204) Interceptor and MetricsReporter should implement java.io.Closeable
[ https://issues.apache.org/jira/browse/KAFKA-6204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517651#comment-17517651 ] Xavier Léauté commented on KAFKA-6204: -- this was mostly fixed by KIP-376 https://cwiki.apache.org/confluence/display/KAFKA/KIP-376%3A+Implement+AutoClosable+on+appropriate+classes+that+want+to+be+used+in+a+try-with-resource+statement it looks like ProducerInterceptor might have been missed, so it should be fairly uncontroversial to add it. > Interceptor and MetricsReporter should implement java.io.Closeable > -- > > Key: KAFKA-6204 > URL: https://issues.apache.org/jira/browse/KAFKA-6204 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Charly Molter >Priority: Minor > > The serializers and deserializers extends the Closeable interface, even > ConsumerInterceptors and ProducerInterceptors implement it. > ConsumerInterceptor, ProducerInterceptor and MetricsReporter do not extend > the Closeable interface. > Maybe they should for coherency with the rest of the apis. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] lihaosky commented on a diff in pull request #11896: KAFKA-13785: [Emit final][5/N] emit final for TimeWindowedKStreamImpl
lihaosky commented on code in PR #11896: URL: https://github.com/apache/kafka/pull/11896#discussion_r843123337 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java: ## @@ -232,11 +247,19 @@ ); break; case ROCKS_DB: -supplier = Stores.persistentTimestampedWindowStore( -materialized.storeName(), -Duration.ofMillis(retentionPeriod), -Duration.ofMillis(windows.size()), -false +supplier = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE ? + RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create( +materialized.storeName(), Review Comment: @guozhangwang , should we call this another name to avoid overriding existing emit eager's store? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896
[ https://issues.apache.org/jira/browse/KAFKA-13800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-13800: -- Assignee: Hao Li > Remove force cast of TimeWindowKStreamImpl in tests of > https://github.com/apache/kafka/pull/11896 > - > > Key: KAFKA-13800 > URL: https://issues.apache.org/jira/browse/KAFKA-13800 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > > We can remove the cast after `emitStrategy` is added to public api -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13797) Adding metric to indicate metadata response outgoing bytes rate
[ https://issues.apache.org/jira/browse/KAFKA-13797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-13797: --- Description: It's not a common case, but we experienced the following problem in one of our clusters. The use case involves dynamically creating and deleting topics in the cluster, and the clients were constantly using the special type of Metadata requests whose topics field is null in order to retrieve all topics before checking a topic's existence. A high rate of such Metadata requests generated a heavy load on brokers in the cluster. Yet, currently, there is no metric to indicate the metadata response outgoing bytes rate. We propose to add such a metric in order to make the troubleshooting of such cases easier. was: It's not a common case, but we experienced the following problem in one of our clusters. The use case involves dynamically creating and deleting topics in the cluster, and the clients were constantly checking if a topic exists in a cluster using the special type of Metadata requests whose topics field is null in order to retrieve all topics before checking a topic's existence. A high rate of such Metadata requests generated a heavy load on brokers in the cluster. Yet, currently, there is no metric to indicate the metadata response outgoing bytes rate. We propose to add such a metric in order to make the troubleshooting of such cases easier. > Adding metric to indicate metadata response outgoing bytes rate > --- > > Key: KAFKA-13797 > URL: https://issues.apache.org/jira/browse/KAFKA-13797 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Priority: Minor > > It's not a common case, but we experienced the following problem in one of > our clusters. > The use case involves dynamically creating and deleting topics in the > cluster, and the clients were constantly using the special type of Metadata > requests whose topics field is null in order to retrieve all topics before > checking a topic's existence. > A high rate of such Metadata requests generated a heavy load on brokers in > the cluster. > Yet, currently, there is no metric to indicate the metadata response outgoing > bytes rate. > We propose to add such a metric in order to make the troubleshooting of such > cases easier. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896
Hao Li created KAFKA-13800: -- Summary: Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896 Key: KAFKA-13800 URL: https://issues.apache.org/jira/browse/KAFKA-13800 Project: Kafka Issue Type: Improvement Reporter: Hao Li We can remove the cast after `emitStrategy` is added to public api -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13794) Producer batch lost silently in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-13794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13794: Fix Version/s: 3.2.0 > Producer batch lost silently in TransactionManager > -- > > Key: KAFKA-13794 > URL: https://issues.apache.org/jira/browse/KAFKA-13794 > Project: Kafka > Issue Type: Bug >Reporter: xuexiaoyue >Priority: Major > Fix For: 3.2.0, 3.1.1, 3.0.2 > > > Under the case of idempotence is enabled, when a batch reaches its > request.timeout.ms but not yet reaches delivery.timeout.ms, it will be > retried and wait for another request.timeout.ms. During the time of this > interval, the delivery.timeout.ms may be reached and Sender will remove this > in flight batch and bump the producer epoch because of the unresolved > sequence, then the sequence of this partition will be reset to 0. > At this time, if a new batch is sent to the same partition and the former > batch reaches request.timeout.ms again, we will see an exception being thrown > out by NetworkClient: > {code:java} > [ERROR] [kafka-producer-network-thread | producer-1] > org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] > Uncaught error in request completion: > java.lang.IllegalStateException: We are re-enqueueing a batch which is not > tracked as part of the in flight requests. batch.topicPartition: > txn_test_1648891362900-2; batch.baseSequence: 0 > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code} > The cause of this is the inflightBatchesBySequence in TransactionManager is > not being remove correctly. One batch may be removed by another batch with > the same sequence number. > The potential consequence of this I can think out is that the send progress > will be blocked until the latter batch being expired by delivery.timeout.ms > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] hachikuji commented on pull request #11995: KAFKA-13782; Ensure correct partition added to txn after abort on full batch
hachikuji commented on PR #11995: URL: https://github.com/apache/kafka/pull/11995#issuecomment-1089065258 @tombentley This is merged to 3.1 along with #11991. Thanks for managing the release by the way! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13782) Producer may fail to add the correct partition to transaction
[ https://issues.apache.org/jira/browse/KAFKA-13782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13782. - Resolution: Fixed > Producer may fail to add the correct partition to transaction > - > > Key: KAFKA-13782 > URL: https://issues.apache.org/jira/browse/KAFKA-13782 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.2.0, 3.1.1 > > > In KAFKA-13412, we changed the logic to add partitions to transactions in the > producer. The intention was to ensure that the partition is added in > `TransactionManager` before the record is appended to the > `RecordAccumulator`. However, this does not take into account the possibility > that the originally selected partition may be changed if `abortForNewBatch` > is set in `RecordAppendResult` in the call to `RecordAccumulator.append`. > When this happens, the partitioner can choose a different partition, which > means that the `TransactionManager` would be tracking the wrong partition. > I think the consequence of this is that the batches sent to this partition > would get stuck in the `RecordAccumulator` until they timed out because we > validate before sending that the partition has been added correctly to the > transaction. > Note that KAFKA-13412 has not been included in any release, so there are no > affected versions. > Thanks to [~alivshits] for identifying the bug. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ijuma commented on pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection
ijuma commented on PR #11965: URL: https://github.com/apache/kafka/pull/11965#issuecomment-1089061398 Is it worth backporting this to release branches? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13794) Producer batch lost silently in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-13794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13794. - Fix Version/s: 3.1.1 3.0.2 Resolution: Fixed > Producer batch lost silently in TransactionManager > -- > > Key: KAFKA-13794 > URL: https://issues.apache.org/jira/browse/KAFKA-13794 > Project: Kafka > Issue Type: Bug >Reporter: xuexiaoyue >Priority: Major > Fix For: 3.1.1, 3.0.2 > > > Under the case of idempotence is enabled, when a batch reaches its > request.timeout.ms but not yet reaches delivery.timeout.ms, it will be > retried and wait for another request.timeout.ms. During the time of this > interval, the delivery.timeout.ms may be reached and Sender will remove this > in flight batch and bump the producer epoch because of the unresolved > sequence, then the sequence of this partition will be reset to 0. > At this time, if a new batch is sent to the same partition and the former > batch reaches request.timeout.ms again, we will see an exception being thrown > out by NetworkClient: > {code:java} > [ERROR] [kafka-producer-network-thread | producer-1] > org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] > Uncaught error in request completion: > java.lang.IllegalStateException: We are re-enqueueing a batch which is not > tracked as part of the in flight requests. batch.topicPartition: > txn_test_1648891362900-2; batch.baseSequence: 0 > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] > at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code} > The cause of this is the inflightBatchesBySequence in TransactionManager is > not being remove correctly. One batch may be removed by another batch with > the same sequence number. > The potential consequence of this I can think out is that the send progress > will be blocked until the latter batch being expired by delivery.timeout.ms > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] hachikuji merged pull request #11991: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager
hachikuji merged PR #11991: URL: https://github.com/apache/kafka/pull/11991 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13778) Fetch from follower should never run the preferred read replica selection
[ https://issues.apache.org/jira/browse/KAFKA-13778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-13778: Affects Version/s: 2.6.0 (was: 3.2.0) > Fetch from follower should never run the preferred read replica selection > - > > Key: KAFKA-13778 > URL: https://issues.apache.org/jira/browse/KAFKA-13778 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: zhaobo >Assignee: zhaobo >Priority: Minor > Fix For: 3.3.0 > > > The design purpose of the code is that only the leader broker can determine > the preferred read-replica. > > {code:java} > readFromLocalLog() > > // If we are the leader, determine the preferred read-replica > val preferredReadReplica = clientMetadata.flatMap( > metadata => findPreferredReadReplica(partition, metadata, replicaId, > fetchInfo.fetchOffset, fetchTimeMs)) {code} > > But in fact, since the broker does not judge whether it is the leader or not, > the follower will also execute the preferred read-replica selection. > {code:java} > partition.leaderReplicaIdOpt.flatMap { leaderReplicaId => > // Don't look up preferred for follower fetches via normal replication and > if (Request.isValidBrokerId(replicaId)) > None > else { {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13778) Fetch from follower should never run the preferred read replica selection
[ https://issues.apache.org/jira/browse/KAFKA-13778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-13778. - Fix Version/s: 3.3.0 Reviewer: David Jacot Resolution: Fixed > Fetch from follower should never run the preferred read replica selection > - > > Key: KAFKA-13778 > URL: https://issues.apache.org/jira/browse/KAFKA-13778 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.2.0 >Reporter: zhaobo >Assignee: zhaobo >Priority: Minor > Fix For: 3.3.0 > > > The design purpose of the code is that only the leader broker can determine > the preferred read-replica. > > {code:java} > readFromLocalLog() > > // If we are the leader, determine the preferred read-replica > val preferredReadReplica = clientMetadata.flatMap( > metadata => findPreferredReadReplica(partition, metadata, replicaId, > fetchInfo.fetchOffset, fetchTimeMs)) {code} > > But in fact, since the broker does not judge whether it is the leader or not, > the follower will also execute the preferred read-replica selection. > {code:java} > partition.leaderReplicaIdOpt.flatMap { leaderReplicaId => > // Don't look up preferred for follower fetches via normal replication and > if (Request.isValidBrokerId(replicaId)) > None > else { {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac merged pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection
dajac merged PR #11965: URL: https://github.com/apache/kafka/pull/11965 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #11995: KAFKA-13782; Ensure correct partition added to txn after abort on full batch
hachikuji merged PR #11995: URL: https://github.com/apache/kafka/pull/11995 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11995: KAFKA-13782; Ensure correct partition added to txn after abort on full batch
ijuma commented on PR #11995: URL: https://github.com/apache/kafka/pull/11995#issuecomment-1088982164 In addition to this, we will also include https://github.com/apache/kafka/pull/11991. @hachikuji should be able to merge both this morning, I believe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjoon-hyun commented on pull request #11995: KAFKA-13782; Ensure correct partition added to txn after abort on full batch
dongjoon-hyun commented on PR #11995: URL: https://github.com/apache/kafka/pull/11995#issuecomment-1088909166 Hi, @ijuma . According to your dev mailing list email, is this the last blocker issue before Apache Kafka 3.1.1 RC? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12699) Streams no longer overrides the java default uncaught exception handler
[ https://issues.apache.org/jira/browse/KAFKA-12699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-12699: -- Fix Version/s: (was: 3.2.0) > Streams no longer overrides the java default uncaught exception handler > - > > Key: KAFKA-12699 > URL: https://issues.apache.org/jira/browse/KAFKA-12699 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Priority: Minor > > If a user used `Thread.setUncaughtExceptionHanlder()` to set the handler for > all threads in the runtime streams would override that with its own handler. > However since streams does not use the `Thread` handler anymore it will no > longer do so. This can cause problems if the user does something like > `System.exit(1)` in the handler. > > If using the old handler in streams it will still work as it used to -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12699) Streams no longer overrides the java default uncaught exception handler
[ https://issues.apache.org/jira/browse/KAFKA-12699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517525#comment-17517525 ] Bruno Cadonna commented on KAFKA-12699: --- Removing from the 3.2.0 release since code freeze has passed. > Streams no longer overrides the java default uncaught exception handler > - > > Key: KAFKA-12699 > URL: https://issues.apache.org/jira/browse/KAFKA-12699 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Priority: Minor > Fix For: 3.2.0 > > > If a user used `Thread.setUncaughtExceptionHanlder()` to set the handler for > all threads in the runtime streams would override that with its own handler. > However since streams does not use the `Thread` handler anymore it will no > longer do so. This can cause problems if the user does something like > `System.exit(1)` in the handler. > > If using the old handler in streams it will still work as it used to -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j
[ https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517524#comment-17517524 ] Bruno Cadonna commented on KAFKA-12774: --- Removing from the 3.2.0 release since code freeze has passed. > kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through > log4j > > > Key: KAFKA-12774 > URL: https://issues.apache.org/jira/browse/KAFKA-12774 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Jørgen >Priority: Minor > Fix For: 3.2.0 > > > When exceptions is handled in the uncaught-exception handler introduced in > KS2.8, the logging of the stacktrace doesn't seem to go through the logging > framework configured by the application (log4j2 in our case), but gets > printed to console "line-by-line". > All other exceptions logged by kafka-streams go through log4j2 and gets > formatted properly according to the log4j2 appender (json in our case). > Haven't tested this on other frameworks like logback. > Application setup: > * Spring-boot 2.4.5 > * Log4j 2.13.3 > * Slf4j 1.7.30 > Log4j2 appender config: > {code:java} > > > stacktraceAsString="true" properties="true"> > value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/> > > > {code} > Uncaught exception handler config: > {code:java} > kafkaStreams.setUncaughtExceptionHandler { exception -> > logger.warn("Uncaught exception handled - replacing thread", exception) > // logged properly > > StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD > } {code} > Stacktrace that gets printed line-by-line: > {code:java} > Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic xxx-repartition for task 3_2 due > to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id.Exception handler choose to FAIL the processing, no more > records would be sent. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226) >at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242) > at java.base/java.lang.Thread.run(Unknown Source)Caused by: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. {code} > > It's a little bit hard to reproduce as I haven't found any way to trigger > uncaught-exception-handler through junit-tests. > Link to discussion on slack: > https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j
[ https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-12774: -- Fix Version/s: (was: 3.2.0) > kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through > log4j > > > Key: KAFKA-12774 > URL: https://issues.apache.org/jira/browse/KAFKA-12774 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Jørgen >Priority: Minor > > When exceptions is handled in the uncaught-exception handler introduced in > KS2.8, the logging of the stacktrace doesn't seem to go through the logging > framework configured by the application (log4j2 in our case), but gets > printed to console "line-by-line". > All other exceptions logged by kafka-streams go through log4j2 and gets > formatted properly according to the log4j2 appender (json in our case). > Haven't tested this on other frameworks like logback. > Application setup: > * Spring-boot 2.4.5 > * Log4j 2.13.3 > * Slf4j 1.7.30 > Log4j2 appender config: > {code:java} > > > stacktraceAsString="true" properties="true"> > value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/> > > > {code} > Uncaught exception handler config: > {code:java} > kafkaStreams.setUncaughtExceptionHandler { exception -> > logger.warn("Uncaught exception handled - replacing thread", exception) > // logged properly > > StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD > } {code} > Stacktrace that gets printed line-by-line: > {code:java} > Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic xxx-repartition for task 3_2 due > to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id.Exception handler choose to FAIL the processing, no more > records would be sent. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226) >at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242) > at java.base/java.lang.Thread.run(Unknown Source)Caused by: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. {code} > > It's a little bit hard to reproduce as I haven't found any way to trigger > uncaught-exception-handler through junit-tests. > Link to discussion on slack: > https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j
[ https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517523#comment-17517523 ] Bruno Cadonna commented on KAFKA-12774: --- [~wcarlson5] [~ableegoldman] [~jorgenringen] since we cannot reproduce the issue and nobody else has reported this issue since the ticket was opened, should we close it as "not reproducible"? > kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through > log4j > > > Key: KAFKA-12774 > URL: https://issues.apache.org/jira/browse/KAFKA-12774 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Jørgen >Priority: Minor > Fix For: 3.2.0 > > > When exceptions is handled in the uncaught-exception handler introduced in > KS2.8, the logging of the stacktrace doesn't seem to go through the logging > framework configured by the application (log4j2 in our case), but gets > printed to console "line-by-line". > All other exceptions logged by kafka-streams go through log4j2 and gets > formatted properly according to the log4j2 appender (json in our case). > Haven't tested this on other frameworks like logback. > Application setup: > * Spring-boot 2.4.5 > * Log4j 2.13.3 > * Slf4j 1.7.30 > Log4j2 appender config: > {code:java} > > > stacktraceAsString="true" properties="true"> > value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/> > > > {code} > Uncaught exception handler config: > {code:java} > kafkaStreams.setUncaughtExceptionHandler { exception -> > logger.warn("Uncaught exception handled - replacing thread", exception) > // logged properly > > StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD > } {code} > Stacktrace that gets printed line-by-line: > {code:java} > Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic xxx-repartition for task 3_2 due > to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id.Exception handler choose to FAIL the processing, no more > records would be sent. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226) >at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242) > at java.base/java.lang.Thread.run(Unknown Source)Caused by: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. {code} > > It's a little bit hard to reproduce as I haven't found any way to trigger > uncaught-exception-handler through junit-tests. > Link to discussion on slack: > https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] C0urante commented on pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
C0urante commented on PR #11974: URL: https://github.com/apache/kafka/pull/11974#issuecomment-105947 Thanks @mimaison. I hope you don't mind but I've pushed one more change that simplifies how the configured set of connectors and tasks is simulated (this came in handy when designing tests for one of the scenarios described in [KAFKA-13764](https://issues.apache.org/jira/browse/KAFKA-13764)). It should improve readability by removing the existing `clusterConfigState` methods (and the `fillMap` method that was introduced in this PR to clean those up) and making changes to the set of configured connectors and tasks more explicit. However, if this is too much then let me know and I can revert the commit from this PR and add it in a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
C0urante commented on code in PR #11974: URL: https://github.com/apache/kafka/pull/11974#discussion_r842931708 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -76,23 +83,21 @@ @Parameters public static Iterable mode() { -return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2}}); +return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1}, {CONNECT_PROTOCOL_V2}}); Review Comment: Blegh, thanks. I'll simplify it anyways; rather leave `trunk` in a healthy place and not risk FUD from someone else copy+pasting this style. ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -1170,78 +983,128 @@ public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 1 worker and 2 connectors configured but not yet assigned -assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); -++rebalanceNum; -returnedAssignments = assignmentsCapture.getValue(); -assertDelay(0, returnedAssignments); -expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); -assertNoReassignments(memberConfigs, expectedMemberConfigs); -assertAssignment(2, 8, 0, 0, "worker1"); - -//delete connector1 +performStandardRebalance(); +assertDelay(0); +assertWorkers("worker1"); +assertConnectorAllocations(2); +assertTaskAllocations(8); +assertBalancedAndCompleteAllocation(); + +// Delete connector1 configState = clusterConfigState(offset, 2, 1, 4); when(coordinator.configSnapshot()).thenReturn(configState); // Second assignment with a second worker with duplicate assignment joining and the duplicated assignment is deleted at the same time -applyAssignments(returnedAssignments); -memberConfigs = memberConfigs(leader, offset, assignments); -ExtendedAssignment duplicatedWorkerAssignment = newExpandableAssignment(); -duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2)); -duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 4)); -memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, duplicatedWorkerAssignment)); -assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); -++rebalanceNum; -returnedAssignments = assignmentsCapture.getValue(); -assertDelay(0, returnedAssignments); -expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); -assertNoReassignments(memberConfigs, expectedMemberConfigs); -assertAssignment(0, 0, 2, 8, "worker1", "worker2"); +addNewWorker("worker2", newConnectors(1, 2), newTasks("connector1", 0, 4)); +performStandardRebalance(); +assertDelay(0); +assertWorkers("worker1", "worker2"); +assertConnectorAllocations(0, 1); +assertTaskAllocations(0, 4); // Third assignment after revocations -applyAssignments(returnedAssignments); -memberConfigs = memberConfigs(leader, offset, assignments); -assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); -++rebalanceNum; -returnedAssignments = assignmentsCapture.getValue(); -assertDelay(0, returnedAssignments); -expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); -assertNoReassignments(memberConfigs, expectedMemberConfigs); -assertAssignment(0, 0, 0, 2, "worker1", "worker2"); +performStandardRebalance(); +assertDelay(0); +assertConnectorAllocations(0, 1); +assertTaskAllocations(0, 2); // fourth rebalance after revocations -applyAssignments(returnedAssignments); -memberConfigs = memberConfigs(leader, offset, assignments); -assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); -++rebalanceNum; -returnedAssignments = assignmentsCapture.getValue(); -assertDelay(0, returnedAssignments); -expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); -assertNoReassignments(memberConfigs, expectedMemberConfigs); -assertAssignment(0, 2, 0, 0, "worker1", "worker2"); +performStandardRebalance(); +assertDelay(0); +assertConnectorAllocations(0, 1); +assertTaskAllocations(2, 2); +assertBalancedAndCompleteAllocation(); // Fifth rebalance should not change assignments -
[GitHub] [kafka] mimaison commented on pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison commented on PR #11748: URL: https://github.com/apache/kafka/pull/11748#issuecomment-1088831443 @tombentley @showuon Can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r842896247 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -169,6 +172,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) +.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs Review Comment: Thanks @viktorsomogyi that's useful to know! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
viktorsomogyi commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r842854543 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -169,6 +172,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) +.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs Review Comment: @mimaison our fix is pretty much the same but a bit more simplistic (uses -1 instead of Optional). I also think no one would rely on that weird behavior so I'm favor in removing that functionality. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11991: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager
ijuma commented on PR #11991: URL: https://github.com/apache/kafka/pull/11991#issuecomment-1088659695 Thanks, looks like it's been there since 2.4.0 then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13799) Improve documentation for Kafka zero-copy
[ https://issues.apache.org/jira/browse/KAFKA-13799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517382#comment-17517382 ] RivenSun edited comment on KAFKA-13799 at 4/5/22 12:37 PM: --- Hi [~showuon] [~dengziming] and [~guozhang] I think we can state in Kafka's documentation that when Kafka reads disk file messages, Kafka combines pagecache and zero-copy to greatly improve message consumption efficiency. But zero-copy only works in PlaintextTransportLayer. Maybe add this declaration in the sendfile module section of the documentation below. [https://kafka.apache.org/documentation/#maximizingefficiency] This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache. *When the transport layer uses the SSL protocol, sendfile will not be used due to the need to encrypt the data read.* WDYT? Thanks. was (Author: rivensun): Hi [~showuon] [~dengziming] and [~guozhang] I think we can state in Kafka's documentation that when Kafka reads disk file messages, Kafka combines pagecache and zero-copy to greatly improve message consumption efficiency. But zero-copy only works in PlaintextTransportLayer. Maybe add this declaration in the sendfile module section of the documentation below. [https://kafka.apache.org/documentation/#maximizingefficiency] WDYT? Thanks. > Improve documentation for Kafka zero-copy > - > > Key: KAFKA-13799 > URL: https://issues.apache.org/jira/browse/KAFKA-13799 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: RivenSun >Priority: Major > > Via documentation https://kafka.apache.org/documentation/#maximizingefficiency > and [https://kafka.apache.org/documentation/#networklayer] , > We can know that Kafka combines pagecache and zero-copy when reading messages > in files on disk, which greatly improves the consumption rate of messages. > But after browsing the source code: > Look directly at the *FileRecords.writeTo(...)* method, > 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), > and the bottom layer calls the sendfile method to implement zero-copy data > transfer. > 2. The logic of the SslTransportLayer.transferFrom() method: > {code:java} > fileChannel.read(fileChannelBuffer, pos) > -> > sslEngine.wrap(src, netWriteBuffer) > -> > flush(ByteBuffer buf) && socketChannel.write(buf){code} > That is, first read the data on the disk or directly from the page cache, > then encrypt the data, and finally send the encrypted data to the network. > {*}FileChannel.transferTo() is not used in the whole process{*}. > > Conclusion: > PlaintextTransportLayer and SslTransportLayer both use pagecache, but > SslTransportLayer does not implement zero-copy. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13799) Improve documentation for Kafka zero-copy
[ https://issues.apache.org/jira/browse/KAFKA-13799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517382#comment-17517382 ] RivenSun commented on KAFKA-13799: -- Hi [~showuon] [~dengziming] and [~guozhang] I think we can state in Kafka's documentation that when Kafka reads disk file messages, Kafka combines pagecache and zero-copy to greatly improve message consumption efficiency. But zero-copy only works in PlaintextTransportLayer. Maybe add this declaration in the sendfile module section of the documentation below. [https://kafka.apache.org/documentation/#maximizingefficiency] WDYT? Thanks. > Improve documentation for Kafka zero-copy > - > > Key: KAFKA-13799 > URL: https://issues.apache.org/jira/browse/KAFKA-13799 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: RivenSun >Priority: Major > > Via documentation https://kafka.apache.org/documentation/#maximizingefficiency > and [https://kafka.apache.org/documentation/#networklayer] , > We can know that Kafka combines pagecache and zero-copy when reading messages > in files on disk, which greatly improves the consumption rate of messages. > But after browsing the source code: > Look directly at the *FileRecords.writeTo(...)* method, > 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), > and the bottom layer calls the sendfile method to implement zero-copy data > transfer. > 2. The logic of the SslTransportLayer.transferFrom() method: > {code:java} > fileChannel.read(fileChannelBuffer, pos) > -> > sslEngine.wrap(src, netWriteBuffer) > -> > flush(ByteBuffer buf) && socketChannel.write(buf){code} > That is, first read the data on the disk or directly from the page cache, > then encrypt the data, and finally send the encrypted data to the network. > {*}FileChannel.transferTo() is not used in the whole process{*}. > > Conclusion: > PlaintextTransportLayer and SslTransportLayer both use pagecache, but > SslTransportLayer does not implement zero-copy. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13799) Improve documentation for Kafka zero-copy
RivenSun created KAFKA-13799: Summary: Improve documentation for Kafka zero-copy Key: KAFKA-13799 URL: https://issues.apache.org/jira/browse/KAFKA-13799 Project: Kafka Issue Type: Improvement Components: documentation Reporter: RivenSun Via documentation https://kafka.apache.org/documentation/#maximizingefficiency and [https://kafka.apache.org/documentation/#networklayer] , We can know that Kafka combines pagecache and zero-copy when reading messages in files on disk, which greatly improves the consumption rate of messages. But after browsing the source code: Look directly at the *FileRecords.writeTo(...)* method, 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), and the bottom layer calls the sendfile method to implement zero-copy data transfer. 2. The logic of the SslTransportLayer.transferFrom() method: {code:java} fileChannel.read(fileChannelBuffer, pos) -> sslEngine.wrap(src, netWriteBuffer) -> flush(ByteBuffer buf) && socketChannel.write(buf){code} That is, first read the data on the disk or directly from the page cache, then encrypt the data, and finally send the encrypted data to the network. {*}FileChannel.transferTo() is not used in the whole process{*}. Conclusion: PlaintextTransportLayer and SslTransportLayer both use pagecache, but SslTransportLayer does not implement zero-copy. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] sciclon2 commented on pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs
sciclon2 commented on PR #11842: URL: https://github.com/apache/kafka/pull/11842#issuecomment-1088562983 @junrao the tests that I added are working locally so I guess the errors are not relate to my tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
mimaison commented on code in PR #11974: URL: https://github.com/apache/kafka/pull/11974#discussion_r842632626 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -769,16 +609,16 @@ public void testLostAssignmentHandlingWhenWorkerBounces() { new ArrayList<>(configuredAssignment.values()), memberConfigs); -assertThat("Wrong set of workers for reassignments", +assertEquals("Wrong set of workers for reassignments", Review Comment: What about simplifying these assertions into: ``` assertTrue("Wrong set of workers for reassignments", assignor.candidateWorkersForReassignment.isEmpty()); ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -1170,78 +983,128 @@ public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 1 worker and 2 connectors configured but not yet assigned -assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); -++rebalanceNum; -returnedAssignments = assignmentsCapture.getValue(); -assertDelay(0, returnedAssignments); -expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); -assertNoReassignments(memberConfigs, expectedMemberConfigs); -assertAssignment(2, 8, 0, 0, "worker1"); - -//delete connector1 +performStandardRebalance(); +assertDelay(0); +assertWorkers("worker1"); +assertConnectorAllocations(2); +assertTaskAllocations(8); +assertBalancedAndCompleteAllocation(); + +// Delete connector1 configState = clusterConfigState(offset, 2, 1, 4); when(coordinator.configSnapshot()).thenReturn(configState); // Second assignment with a second worker with duplicate assignment joining and the duplicated assignment is deleted at the same time -applyAssignments(returnedAssignments); -memberConfigs = memberConfigs(leader, offset, assignments); -ExtendedAssignment duplicatedWorkerAssignment = newExpandableAssignment(); -duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2)); -duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 4)); -memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, duplicatedWorkerAssignment)); -assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); -++rebalanceNum; -returnedAssignments = assignmentsCapture.getValue(); -assertDelay(0, returnedAssignments); -expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); -assertNoReassignments(memberConfigs, expectedMemberConfigs); -assertAssignment(0, 0, 2, 8, "worker1", "worker2"); +addNewWorker("worker2", newConnectors(1, 2), newTasks("connector1", 0, 4)); +performStandardRebalance(); +assertDelay(0); +assertWorkers("worker1", "worker2"); +assertConnectorAllocations(0, 1); +assertTaskAllocations(0, 4); // Third assignment after revocations -applyAssignments(returnedAssignments); -memberConfigs = memberConfigs(leader, offset, assignments); -assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); -++rebalanceNum; -returnedAssignments = assignmentsCapture.getValue(); -assertDelay(0, returnedAssignments); -expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); -assertNoReassignments(memberConfigs, expectedMemberConfigs); -assertAssignment(0, 0, 0, 2, "worker1", "worker2"); +performStandardRebalance(); +assertDelay(0); +assertConnectorAllocations(0, 1); +assertTaskAllocations(0, 2); // fourth rebalance after revocations -applyAssignments(returnedAssignments); -memberConfigs = memberConfigs(leader, offset, assignments); -assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); -++rebalanceNum; -returnedAssignments = assignmentsCapture.getValue(); -assertDelay(0, returnedAssignments); -expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); -assertNoReassignments(memberConfigs, expectedMemberConfigs); -assertAssignment(0, 2, 0, 0, "worker1", "worker2"); +performStandardRebalance(); +assertDelay(0); +assertConnectorAllocations(0, 1); +assertTaskAllocations(2, 2); +assertBalancedAndCompleteAllocation(); //
[GitHub] [kafka] sciclon2 commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs
sciclon2 commented on code in PR #11842: URL: https://github.com/apache/kafka/pull/11842#discussion_r842615733 ## core/src/main/scala/kafka/tools/DumpLogSegments.scala: ## @@ -430,6 +431,11 @@ object DumpLogSegments { .describedAs("size") .ofType(classOf[java.lang.Integer]) .defaultsTo(5 * 1024 * 1024) +val maxBytesOpt = parser.accepts("max-bytes", "Limit the amount of total batches in bytes avoiding reading the whole file(s).") Review Comment: makes sense, added! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sciclon2 commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs
sciclon2 commented on code in PR #11842: URL: https://github.com/apache/kafka/pull/11842#discussion_r842615490 ## core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala: ## @@ -310,6 +334,38 @@ class DumpLogSegmentsTest { None } + // Returns the total bytes of the batches specified + private def readPartialBatchesBytes(lines: util.ListIterator[String], limit: Int): Int = { +val sizePattern: Regex = raw".+?size:\s(\d+).+".r +var batchesBytes = 0 +var batchesCounter = 0 +while (lines.hasNext) { + if (batchesCounter >= limit){ +return batchesBytes + } + val line = lines.next() + if (line.startsWith("baseOffset")) { +line match { + case sizePattern(size) => batchesBytes += size.toInt + case _ => throw new IllegalStateException(s"Failed to parse and find size value for batch line: $line") +} +batchesCounter += 1 + } +} +return batchesBytes + } + + private def countBatches(lines: util.ListIterator[String]): Int = { +var countBatches = 0 +while (lines.hasNext) { + val line = lines.next() + if (line.startsWith("baseOffset")) { +countBatches += 1 + } +} +return countBatches Review Comment: sorry I dont get why the return is not needed here :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sciclon2 commented on a diff in pull request #11842: KAFKA-13687: Limiting the amount of bytes to be read in a segment logs
sciclon2 commented on code in PR #11842: URL: https://github.com/apache/kafka/pull/11842#discussion_r842614837 ## core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala: ## @@ -310,6 +334,38 @@ class DumpLogSegmentsTest { None } + // Returns the total bytes of the batches specified + private def readPartialBatchesBytes(lines: util.ListIterator[String], limit: Int): Int = { +val sizePattern: Regex = raw".+?size:\s(\d+).+".r +var batchesBytes = 0 +var batchesCounter = 0 +while (lines.hasNext) { + if (batchesCounter >= limit){ +return batchesBytes + } + val line = lines.next() + if (line.startsWith("baseOffset")) { +line match { + case sizePattern(size) => batchesBytes += size.toInt + case _ => throw new IllegalStateException(s"Failed to parse and find size value for batch line: $line") +} +batchesCounter += 1 + } +} +return batchesBytes Review Comment: I agree in this scenario `limit` always be lower (half) than the total lines, but what if tomorrow we use this method passing a limit which is bigger than the total of batches? for example the batches = 3 and limit = 5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
mimaison commented on code in PR #11974: URL: https://github.com/apache/kafka/pull/11974#discussion_r842589656 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -76,23 +83,21 @@ @Parameters public static Iterable mode() { -return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2}}); +return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1}, {CONNECT_PROTOCOL_V2}}); Review Comment: This can be simplified into `return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2);` but since you're removing it in https://github.com/apache/kafka/pull/11983, we can ignore it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
mimaison commented on code in PR #11974: URL: https://github.com/apache/kafka/pull/11974#discussion_r842589656 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -76,23 +83,21 @@ @Parameters public static Iterable mode() { -return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2}}); +return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1}, {CONNECT_PROTOCOL_V2}}); Review Comment: This can be simplified into `return Arrays.asList(new Object[] {CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2});` but since you're removing it in https://github.com/apache/kafka/pull/11983, we can ignore it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ddrid commented on pull request #11991: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager
ddrid commented on PR #11991: URL: https://github.com/apache/kafka/pull/11991#issuecomment-1088446711 I think it was introduced in #6883, but I'm not sure if it affects all the versions since then. What do you think? @hachikuji -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13791) Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of lazy-initialized members should be the last step with double-checked locking
[ https://issues.apache.org/jira/browse/KAFKA-13791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-13791. --- Fix Version/s: 3.3.0 Resolution: Fixed > Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of > lazy-initialized members should be the last step with double-checked locking > --- > > Key: KAFKA-13791 > URL: https://issues.apache.org/jira/browse/KAFKA-13791 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.1 >Reporter: YunKui Lu >Priority: Trivial > Fix For: 3.3.0 > > > Double-checked locking can be used for lazy initialization of volatile > fields, but only if field assignment is the last step in the synchronized > block. Otherwise, you run the risk of threads accessing a half-initialized > object. > The problem is consistent with > [KAFKA-13777|https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-13777] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon merged pull request #11981: KAFKA-13791: Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of lazy-initialized members should be the last step with double-checke
showuon merged PR #11981: URL: https://github.com/apache/kafka/pull/11981 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11981: KAFKA-13791: Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of lazy-initialized members should be the last step with double-
showuon commented on PR #11981: URL: https://github.com/apache/kafka/pull/11981#issuecomment-1088355283 Failed tests are unrelated. ``` Build / JDK 11 and Scala 2.13 / kafka.api.ClientIdQuotaTest.testProducerConsumerOverrideUnthrottled() Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testElectionResultOutput, Security=PLAINTEXT ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org