[jira] [Created] (KAFKA-13645) Support the TopologyTestDriver with modular topologies
A. Sophie Blee-Goldman created KAFKA-13645: -- Summary: Support the TopologyTestDriver with modular topologies Key: KAFKA-13645 URL: https://issues.apache.org/jira/browse/KAFKA-13645 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman Currently the TTD accepts only a single Topology. Users can technically just use one TTD per Topology, but for a complete simulation of the actual KafkaStreams app we'll need to add support for processing multiple modular topologies with the TopologyTestDriver -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon commented on pull request #5407: KAFKA-7109: Close cached fetch sessions in the broker on consumer close
showuon commented on pull request #5407: URL: https://github.com/apache/kafka/pull/5407#issuecomment-1029737477 @stanislavkozlovski , encountered similar errors and I agree we should close the incremental fetch sessions when consumer closed. Are you able to complete this PR? If not, I can co-author with you to help complete this PR to merge this good improvement. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11720: KAFKA-13625: Fix inconsistency in dynamic application log levels
dajac commented on pull request #11720: URL: https://github.com/apache/kafka/pull/11720#issuecomment-1029736633 @dongjinleekr Thanks for the explanation. I missed KIP-817 in the mailing list. Let's continue the discussion there and see what the community think about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13644) Support global state stores with modular topologies
A. Sophie Blee-Goldman created KAFKA-13644: -- Summary: Support global state stores with modular topologies Key: KAFKA-13644 URL: https://issues.apache.org/jira/browse/KAFKA-13644 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13643) Replace "NamedTopology" with "ModularTopology" in the codebase
A. Sophie Blee-Goldman created KAFKA-13643: -- Summary: Replace "NamedTopology" with "ModularTopology" in the codebase Key: KAFKA-13643 URL: https://issues.apache.org/jira/browse/KAFKA-13643 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13534) Upgrade Log4j to 2.15.0 - CVE-2021-44228
[ https://issues.apache.org/jira/browse/KAFKA-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486845#comment-17486845 ] Luke Chen commented on KAFKA-13534: --- [~noonbs] , thanks for the comment. I've set duplicate with KAFKA-9366 and close this ticket. > Upgrade Log4j to 2.15.0 - CVE-2021-44228 > > > Key: KAFKA-13534 > URL: https://issues.apache.org/jira/browse/KAFKA-13534 > Project: Kafka > Issue Type: Task >Affects Versions: 2.7.0, 2.8.0, 3.0.0 >Reporter: Sai Kiran Vudutala >Priority: Major > > Log4j has an RCE vulnerability, see > [https://www.lunasec.io/docs/blog/log4j-zero-day/] > References. > [https://github.com/advisories/GHSA-jfh8-c2jp-5v3q] > [https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13534) Upgrade Log4j to 2.15.0 - CVE-2021-44228
[ https://issues.apache.org/jira/browse/KAFKA-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-13534. --- Resolution: Duplicate > Upgrade Log4j to 2.15.0 - CVE-2021-44228 > > > Key: KAFKA-13534 > URL: https://issues.apache.org/jira/browse/KAFKA-13534 > Project: Kafka > Issue Type: Task >Affects Versions: 2.7.0, 2.8.0, 3.0.0 >Reporter: Sai Kiran Vudutala >Priority: Major > > Log4j has an RCE vulnerability, see > [https://www.lunasec.io/docs/blog/log4j-zero-day/] > References. > [https://github.com/advisories/GHSA-jfh8-c2jp-5v3q] > [https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon edited a comment on pull request #11631: KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode
showuon edited a comment on pull request #11631: URL: https://github.com/apache/kafka/pull/11631#issuecomment-1029613788 @guozhangwang , thanks for your comments. Sorry for confusing you! And it makes sense to remove the `WIP` out from the PR title. I'll let you know when I complete the PR. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11631: KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode
showuon commented on pull request #11631: URL: https://github.com/apache/kafka/pull/11631#issuecomment-1029613788 @guozhangwang , thanks for your comments. Sorry for confusing you! And it makes sense to remove the `KIP` out from the PR title. I'll let you know when I complete the PR. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486808#comment-17486808 ] Guozhang Wang commented on KAFKA-13600: --- Thanks guys for the great discussion here. I think just changing (and of course still maintaining the subtlety of those edge cases) the behavior of "if we cannot find any caught-up node, assign to a fresh node" to "if we cannot find any caught-up node, pick one that is closest to head" as a general principal should be okay for just the scope of this ticket. We can leave further improvements of the assignment algorithm (I know Bruno/John already have some ideas) to a larger scoped KIP. WDYT? > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch > > > Key: KAFKA-13600 > URL: https://issues.apache.org/jira/browse/KAFKA-13600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 2.8.1, 3.0.0 >Reporter: Tim Patterson >Priority: Major > > Consider this scenario: > # A node is lost from the cluster. > # A rebalance is kicked off with a new "target assignment"'s(ie the > rebalance is attempting to move a lot of tasks - see > https://issues.apache.org/jira/browse/KAFKA-10121). > # The kafka cluster is now a bit more sluggish from the increased load. > # A Rolling Deploy happens triggering rebalances, during the rebalance > processing continues but offsets can't be committed(Or nodes are restarted > but fail to commit offsets) > # The most caught up nodes now aren't within `acceptableRecoveryLag` and so > the task is started in it's "target assignment" location, restoring all state > from scratch and delaying further processing instead of using the "almost > caught up" node. > We've hit this a few times and having lots of state (~25TB worth) and being > heavy users of IQ this is not ideal for us. > While we can increase `acceptableRecoveryLag` to larger values to try get > around this that causes other issues (ie a warmup becoming active when its > still quite far behind) > The solution seems to be to balance "balanced assignment" with "most caught > up nodes". > We've got a fork where we do just this and it's made a huge difference to the > reliability of our cluster. > Our change is to simply use the most caught up node if the "target node" is > more than `acceptableRecoveryLag` behind. > This gives up some of the load balancing type behaviour of the existing code > but in practise doesn't seem to matter too much. > I guess maybe an algorithm that identified candidate nodes as those being > within `acceptableRecoveryLag` of the most caught up node might allow the > best of both worlds. > > Our fork is > [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1] > (We also moved the capacity constraint code to happen after all the stateful > assignment to prioritise standby tasks over warmup tasks) > Ideally we don't want to maintain a fork of kafka streams going forward so > are hoping to get a bit of discussion / agreement on the best way to handle > this. > More than happy to contribute code/test different algo's in production system > or anything else to help with this issue -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
[ https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486807#comment-17486807 ] Mohammad Yousuf Minhaj Zia commented on KAFKA-13641: Going a step further maybe we could also make Tombstone's semantically clear using `Either` such that a value in other functions can be `Either` where Tombstone is equivalent to null. But that is unrelated to this issue so treat it like a fleeting thought. > Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters > - > > Key: KAFKA-13641 > URL: https://issues.apache.org/jira/browse/KAFKA-13641 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mohammad Yousuf Minhaj Zia >Priority: Minor > > Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be > nullable, I am wondering if can wrap them around Scala `Option`. > However, there is also the concern that the left hand side value can be null > in the case of tombstone messages, in which the `Option` semantics can be > misleading. I still feel this could be a useful feature in reducing the > number of `NullPointerExceptions`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
[ https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486804#comment-17486804 ] Mohammad Yousuf Minhaj Zia commented on KAFKA-13641: [~mjsax] The backwards compatibility is definitely a problem. One solution would be to maintain two sets of functions one with `ValueJoiner` and one without but that will get hard to maintain over time as well. Maybe there is an abstraction we can abuse to allow both types of `Option` and non-`Option` values in a `ValueJoiner`. I genuinely feel this would improve the safety of the implementations. Also thanks for pointing out that null values are dropped. If that is the case then maybe having `Option` for the specific values of the join will help a lot in making people understand the join semantics just by looking at the signature. > Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters > - > > Key: KAFKA-13641 > URL: https://issues.apache.org/jira/browse/KAFKA-13641 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mohammad Yousuf Minhaj Zia >Priority: Minor > > Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be > nullable, I am wondering if can wrap them around Scala `Option`. > However, there is also the concern that the left hand side value can be null > in the case of tombstone messages, in which the `Option` semantics can be > misleading. I still feel this could be a useful feature in reducing the > number of `NullPointerExceptions`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] jsancio opened a new pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio opened a new pull request #11733: URL: https://github.com/apache/kafka/pull/11733 TODO: Add a description for this PR!! ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
jasonk000 commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799138886 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -578,41 +587,45 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; +} -boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); -ProducerIdAndEpoch producerIdAndEpoch = -transactionManager != null ? transactionManager.producerIdAndEpoch() : null; -ProducerBatch batch = deque.pollFirst(); -if (producerIdAndEpoch != null && !batch.hasSequence()) { -// If the producer id/epoch of the partition do not match the latest one -// of the producer, we update it and reset the sequence. This should be -// only done when all its in-flight batches have completed. This is guarantee -// in `shouldStopDrainBatchesForPartition`. - transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); - -// If the batch already has an assigned sequence, then we should not change the producer id and -// sequence number, since this may introduce duplicates. In particular, the previous attempt -// may actually have been accepted, and if we change the producer id and sequence here, this -// attempt will also be accepted, causing a duplicate. -// -// Additionally, we update the next sequence number bound for the partition, and also have -// the transaction manager track the batch so as to ensure that sequence ordering is maintained -// even if we receive out of order responses. -batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); - transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); -log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + -"{} being sent to partition {}", producerIdAndEpoch.producerId, -producerIdAndEpoch.epoch, batch.baseSequence(), tp); - -transactionManager.addInFlightBatch(batch); -} -batch.close(); -size += batch.records().sizeInBytes(); -ready.add(batch); +// do the rest of the work by processing outside the lock +// close() is particularly expensive +batch = deque.pollFirst(); +} -batch.drained(now); -} +boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); +ProducerIdAndEpoch producerIdAndEpoch = +transactionManager != null ? transactionManager.producerIdAndEpoch() : null; +if (producerIdAndEpoch != null && !batch.hasSequence()) { +// If the producer id/epoch of the partition do not match the latest one +// of the producer, we update it and reset the sequence. This should be +// only done when all its in-flight batches have completed. This is guarantee +// in `shouldStopDrainBatchesForPartition`. + transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); + +// If the batch already has an assigned sequence, then we should not change the producer id and +// sequence number, since this may introduce duplicates. In particular, the previous attempt +// may actually have been accepted, and if we change the producer id and sequence here, this +// attempt will also be accepted, causing a duplicate. +// +// Additionally, we update the next sequence number bound for the partition, and also have +// the transaction manager track the batch so as to ensure that sequence ordering is maintained +// even if we receive out of order responses. +batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); + transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); Review comment: Correct, this application is jdk8. I'll have to find a jd
[GitHub] [kafka] guozhangwang commented on pull request #11631: [WIP] KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode
guozhangwang commented on pull request #11631: URL: https://github.com/apache/kafka/pull/11631#issuecomment-1029600849 @showuon Sorry for getting late on this -- I thought it was not ready since the title still has `WIP` in it. I will re-title and continue reviewing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11631: [WIP] KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode
showuon commented on pull request #11631: URL: https://github.com/apache/kafka/pull/11631#issuecomment-1029598488 @guozhangwang @ableegoldman @hachikuji , please take a look when available. This issue blocks some users when upgrading Kafka-clients. I think this should be fixed soon. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11703: KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names
guozhangwang commented on a change in pull request #11703: URL: https://github.com/apache/kafka/pull/11703#discussion_r799128273 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -171,42 +169,24 @@ private void registerMetrics() { @Deprecated private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); -final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); -final String prefix = getPrefix(context.appConfigs(), context.applicationId()); +final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); Review comment: Yes, but I'm wondering if the two types of flow are really necessary. It seems to me that the topic should have always been created at the `init` phase already, and hence it seems the null check is not necessary. @cadonna Would need your quick thoughts about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs
showuon commented on pull request #11691: URL: https://github.com/apache/kafka/pull/11691#issuecomment-1029590837 @hachikuji , I've updated the KIP to add tests. Please take a look again. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs
showuon commented on a change in pull request #11691: URL: https://github.com/apache/kafka/pull/11691#discussion_r799127097 ## File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala ## @@ -168,7 +168,7 @@ object ConsoleProducer { .withRequiredArg .describedAs("request required acks") .ofType(classOf[java.lang.String]) - .defaultsTo("1") + .defaultsTo("-1") Review comment: Good suggestion! Sent a note in the KIP vote thread. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
ijuma commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799125694 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -578,41 +587,45 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; +} -boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); -ProducerIdAndEpoch producerIdAndEpoch = -transactionManager != null ? transactionManager.producerIdAndEpoch() : null; -ProducerBatch batch = deque.pollFirst(); -if (producerIdAndEpoch != null && !batch.hasSequence()) { -// If the producer id/epoch of the partition do not match the latest one -// of the producer, we update it and reset the sequence. This should be -// only done when all its in-flight batches have completed. This is guarantee -// in `shouldStopDrainBatchesForPartition`. - transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); - -// If the batch already has an assigned sequence, then we should not change the producer id and -// sequence number, since this may introduce duplicates. In particular, the previous attempt -// may actually have been accepted, and if we change the producer id and sequence here, this -// attempt will also be accepted, causing a duplicate. -// -// Additionally, we update the next sequence number bound for the partition, and also have -// the transaction manager track the batch so as to ensure that sequence ordering is maintained -// even if we receive out of order responses. -batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); - transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); -log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + -"{} being sent to partition {}", producerIdAndEpoch.producerId, -producerIdAndEpoch.epoch, batch.baseSequence(), tp); - -transactionManager.addInFlightBatch(batch); -} -batch.close(); -size += batch.records().sizeInBytes(); -ready.add(batch); +// do the rest of the work by processing outside the lock +// close() is particularly expensive +batch = deque.pollFirst(); +} -batch.drained(now); -} +boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); +ProducerIdAndEpoch producerIdAndEpoch = +transactionManager != null ? transactionManager.producerIdAndEpoch() : null; +if (producerIdAndEpoch != null && !batch.hasSequence()) { +// If the producer id/epoch of the partition do not match the latest one +// of the producer, we update it and reset the sequence. This should be +// only done when all its in-flight batches have completed. This is guarantee +// in `shouldStopDrainBatchesForPartition`. + transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); + +// If the batch already has an assigned sequence, then we should not change the producer id and +// sequence number, since this may introduce duplicates. In particular, the previous attempt +// may actually have been accepted, and if we change the producer id and sequence here, this +// attempt will also be accepted, causing a duplicate. +// +// Additionally, we update the next sequence number bound for the partition, and also have +// the transaction manager track the batch so as to ensure that sequence ordering is maintained +// even if we receive out of order responses. +batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); + transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); Review comment: Can you please test with Java 11 or newer? Looks like you tes
[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
ijuma commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799125694 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -578,41 +587,45 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; +} -boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); -ProducerIdAndEpoch producerIdAndEpoch = -transactionManager != null ? transactionManager.producerIdAndEpoch() : null; -ProducerBatch batch = deque.pollFirst(); -if (producerIdAndEpoch != null && !batch.hasSequence()) { -// If the producer id/epoch of the partition do not match the latest one -// of the producer, we update it and reset the sequence. This should be -// only done when all its in-flight batches have completed. This is guarantee -// in `shouldStopDrainBatchesForPartition`. - transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); - -// If the batch already has an assigned sequence, then we should not change the producer id and -// sequence number, since this may introduce duplicates. In particular, the previous attempt -// may actually have been accepted, and if we change the producer id and sequence here, this -// attempt will also be accepted, causing a duplicate. -// -// Additionally, we update the next sequence number bound for the partition, and also have -// the transaction manager track the batch so as to ensure that sequence ordering is maintained -// even if we receive out of order responses. -batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); - transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); -log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + -"{} being sent to partition {}", producerIdAndEpoch.producerId, -producerIdAndEpoch.epoch, batch.baseSequence(), tp); - -transactionManager.addInFlightBatch(batch); -} -batch.close(); -size += batch.records().sizeInBytes(); -ready.add(batch); +// do the rest of the work by processing outside the lock +// close() is particularly expensive +batch = deque.pollFirst(); +} -batch.drained(now); -} +boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); +ProducerIdAndEpoch producerIdAndEpoch = +transactionManager != null ? transactionManager.producerIdAndEpoch() : null; +if (producerIdAndEpoch != null && !batch.hasSequence()) { +// If the producer id/epoch of the partition do not match the latest one +// of the producer, we update it and reset the sequence. This should be +// only done when all its in-flight batches have completed. This is guarantee +// in `shouldStopDrainBatchesForPartition`. + transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); + +// If the batch already has an assigned sequence, then we should not change the producer id and +// sequence number, since this may introduce duplicates. In particular, the previous attempt +// may actually have been accepted, and if we change the producer id and sequence here, this +// attempt will also be accepted, causing a duplicate. +// +// Additionally, we update the next sequence number bound for the partition, and also have +// the transaction manager track the batch so as to ensure that sequence ordering is maintained +// even if we receive out of order responses. +batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); + transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); Review comment: Can you please test with Java 11 or newer? Looks like you tes
[GitHub] [kafka] ijuma commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
ijuma commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799124172 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -392,12 +392,33 @@ public static void writeDouble(double value, ByteBuffer buffer) { * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); + +// magic sequence of numbers that produces a function equivalent to this lookup +// table, where the index in the lookup table is provided by the number of +// leading zeros of the value, and the result is the number of bytes used +// in the output + +// see the test cases as well to verify the implementation matches the prior +// for-loop logic + +// final static byte[] LEADING_ZEROS_TO_U_VARINT_SIZE = new byte[] { +// // 32 bits, and each 7-bits adds one byte to the output +// 5, 5, 5, 5, // 32 +// 4, 4, 4, 4, 4, 4, 4, // 28 +// 3, 3, 3, 3, 3, 3, 3, // 21 +// 2, 2, 2, 2, 2, 2, 2, // 14 +// 1, 1, 1, 1, 1, 1, 1, // 7 +// 1 // 0 +// }; + +// this is the core logic, but the Java encoding is suboptimal when we have a narrow +// range of integers, so we can do better here + +// return (38 - leadingZeros) / 7 + leadingZeros / 32; + +int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 0b10010010010010011) >>> 19; +return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5); Review comment: Very nice. Regarding the comment, do we need the lookup table? Or can we explain it based on the core logic comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
artemlivshits commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799117854 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -578,41 +587,45 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; +} -boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); -ProducerIdAndEpoch producerIdAndEpoch = -transactionManager != null ? transactionManager.producerIdAndEpoch() : null; -ProducerBatch batch = deque.pollFirst(); -if (producerIdAndEpoch != null && !batch.hasSequence()) { -// If the producer id/epoch of the partition do not match the latest one -// of the producer, we update it and reset the sequence. This should be -// only done when all its in-flight batches have completed. This is guarantee -// in `shouldStopDrainBatchesForPartition`. - transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); - -// If the batch already has an assigned sequence, then we should not change the producer id and -// sequence number, since this may introduce duplicates. In particular, the previous attempt -// may actually have been accepted, and if we change the producer id and sequence here, this -// attempt will also be accepted, causing a duplicate. -// -// Additionally, we update the next sequence number bound for the partition, and also have -// the transaction manager track the batch so as to ensure that sequence ordering is maintained -// even if we receive out of order responses. -batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); - transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); -log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + -"{} being sent to partition {}", producerIdAndEpoch.producerId, -producerIdAndEpoch.epoch, batch.baseSequence(), tp); - -transactionManager.addInFlightBatch(batch); -} -batch.close(); -size += batch.records().sizeInBytes(); -ready.add(batch); +// do the rest of the work by processing outside the lock +// close() is particularly expensive +batch = deque.pollFirst(); +} -batch.drained(now); -} +boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); +ProducerIdAndEpoch producerIdAndEpoch = +transactionManager != null ? transactionManager.producerIdAndEpoch() : null; +if (producerIdAndEpoch != null && !batch.hasSequence()) { +// If the producer id/epoch of the partition do not match the latest one +// of the producer, we update it and reset the sequence. This should be +// only done when all its in-flight batches have completed. This is guarantee +// in `shouldStopDrainBatchesForPartition`. + transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); + +// If the batch already has an assigned sequence, then we should not change the producer id and +// sequence number, since this may introduce duplicates. In particular, the previous attempt +// may actually have been accepted, and if we change the producer id and sequence here, this +// attempt will also be accepted, causing a duplicate. +// +// Additionally, we update the next sequence number bound for the partition, and also have +// the transaction manager track the batch so as to ensure that sequence ordering is maintained +// even if we receive out of order responses. +batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); + transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); Review comment: Moving `close` outside of locked scope LGTM -- T
[GitHub] [kafka] artemlivshits commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
artemlivshits commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799111359 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: Nice! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
jasonk000 commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799107208 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -578,41 +587,45 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; +} -boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); -ProducerIdAndEpoch producerIdAndEpoch = -transactionManager != null ? transactionManager.producerIdAndEpoch() : null; -ProducerBatch batch = deque.pollFirst(); -if (producerIdAndEpoch != null && !batch.hasSequence()) { -// If the producer id/epoch of the partition do not match the latest one -// of the producer, we update it and reset the sequence. This should be -// only done when all its in-flight batches have completed. This is guarantee -// in `shouldStopDrainBatchesForPartition`. - transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); - -// If the batch already has an assigned sequence, then we should not change the producer id and -// sequence number, since this may introduce duplicates. In particular, the previous attempt -// may actually have been accepted, and if we change the producer id and sequence here, this -// attempt will also be accepted, causing a duplicate. -// -// Additionally, we update the next sequence number bound for the partition, and also have -// the transaction manager track the batch so as to ensure that sequence ordering is maintained -// even if we receive out of order responses. -batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); - transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); -log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + -"{} being sent to partition {}", producerIdAndEpoch.producerId, -producerIdAndEpoch.epoch, batch.baseSequence(), tp); - -transactionManager.addInFlightBatch(batch); -} -batch.close(); -size += batch.records().sizeInBytes(); -ready.add(batch); +// do the rest of the work by processing outside the lock +// close() is particularly expensive +batch = deque.pollFirst(); +} -batch.drained(now); -} +boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); +ProducerIdAndEpoch producerIdAndEpoch = +transactionManager != null ? transactionManager.producerIdAndEpoch() : null; +if (producerIdAndEpoch != null && !batch.hasSequence()) { +// If the producer id/epoch of the partition do not match the latest one +// of the producer, we update it and reset the sequence. This should be +// only done when all its in-flight batches have completed. This is guarantee +// in `shouldStopDrainBatchesForPartition`. + transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); + +// If the batch already has an assigned sequence, then we should not change the producer id and +// sequence number, since this may introduce duplicates. In particular, the previous attempt +// may actually have been accepted, and if we change the producer id and sequence here, this +// attempt will also be accepted, causing a duplicate. +// +// Additionally, we update the next sequence number bound for the partition, and also have +// the transaction manager track the batch so as to ensure that sequence ordering is maintained +// even if we receive out of order responses. +batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); + transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); Review comment: I've addressed this in 34008bf. Below is flamegrap
[jira] [Commented] (KAFKA-7572) Producer should not send requests with negative partition id
[ https://issues.apache.org/jira/browse/KAFKA-7572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486780#comment-17486780 ] Guozhang Wang commented on KAFKA-7572: -- Thanks for pinging me. I've just made a pass on the PR. > Producer should not send requests with negative partition id > > > Key: KAFKA-7572 > URL: https://issues.apache.org/jira/browse/KAFKA-7572 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1, 1.1.1 >Reporter: Yaodong Yang >Assignee: Wenhao Ji >Priority: Major > Labels: patch-available > > h3. Issue: > In one Kafka producer log from our users, we found the following weird one: > timestamp="2018-10-09T17:37:41,237-0700",level="ERROR", Message="Write to > Kafka failed with: ",exception="java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topicName--2: 30042 ms has passed since batch creation plus linger time > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for topicName--2: 30042 ms has passed since batch creation plus > linger time" > After a few hours debugging, we finally understood the root cause of this > issue: > # The producer used a buggy custom Partitioner, which sometimes generates > negative partition ids for new records. > # The corresponding produce requests were rejected by brokers, because it's > illegal to have a partition with a negative id. > # The client kept refreshing its local cluster metadata, but could not send > produce requests successfully. > # From the above log, we found a suspicious string "topicName--2": > # According to the source code, the format of this string in the log is > TopicName+"-"+PartitionId. > # It's not easy to notice that there were 2 consecutive dash in the above > log. > # Eventually, we found that the second dash was a negative sign. Therefore, > the partition id is -2, rather than 2. > # The bug the custom Partitioner. > h3. Proposal: > # Producer code should check the partitionId before sending requests to > brokers. > # If there is a negative partition Id, just throw an IllegalStateException{{ > }}exception. > # Such a quick check can save lots of time for people debugging their > producer code. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id
guozhangwang commented on pull request #10525: URL: https://github.com/apache/kafka/pull/10525#issuecomment-1029565310 @predatorray Thanks for the PR. Just following @showuon 's comment, I thought about the existing `InvalidPartitionsException`, but that has some issues to be used by the producer, since it is currently extending `ApiException` which is used by the admin / raft controller as returned by the brokers, and using it in the producer would also be considered a public change. So I think for now just throwing the non-checked `illegal-argument` as a fatal error to crash the producer directly is fine. The PR needs some rebasing at the moment, otherwise it looks good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang closed pull request #5858: KAFKA-7572: Producer should not send requests with negative partition id
guozhangwang closed pull request #5858: URL: https://github.com/apache/kafka/pull/5858 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ahuang98 opened a new pull request #11732: MINOR: enable KRaft in ConfigCommandTest
ahuang98 opened a new pull request #11732: URL: https://github.com/apache/kafka/pull/11732 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799084697 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: @artemlivshits i've updated the implementation in f040109ba. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799082813 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: OK, got it. Thank you for the nudges. They are now effectively identical in performance, and the math-version uses 1 less cmp/mov (for array bounds & fetch), with one extra instruction, and no requirement to hit the cache. ``` Benchmark Mode Cnt Score Error Units ByteUtilsBenchmark.testSizeOfUnsignedVarint thrpt 20 503633.665 ± 5214.083 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarint:IPC thrpt2 4.608 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarint:L1-dcache-loads thrpt2 13.002 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarint:branches thrpt2 5.005 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarint:instructions thrpt2 31.080 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMath thrpt 20 507438.241 ± 5268.695 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:IPC thrpt2 4.789 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:L1-dcache-loads thrpt2 11.992 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:branches thrpt2 4.004 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:instructions thrpt2 32.059 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal thrpt 20 371946.626 ± 3105.947 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:IPC thrpt2 5.360 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:L1-dcache-loads thrpt2 14.002 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:branches thrpt2 7.992 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:instructions thrpt2 48.931 #/op ``` I'll switch the code over. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799082813 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: OK, got it. Thank you for the nudges. They are now effectively identical in performance, and the math-version uses 1 less cmp/mov (for array bounds & fetch), with one extra instruction, and no requirement to hit the cache. ``` Benchmark Mode Cnt Score Error Units ByteUtilsBenchmark.testSizeOfUnsignedVarint thrpt 20 503633.665 ± 5214.083 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarint:IPC thrpt2 4.608 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarint:L1-dcache-loads thrpt2 13.002 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarint:branches thrpt2 5.005 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarint:instructions thrpt2 31.080 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMath thrpt 20 507438.241 ± 5268.695 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:IPC thrpt2 4.789 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:L1-dcache-loads thrpt2 11.992 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:branches thrpt2 4.004 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:instructions thrpt2 32.059 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal thrpt 20 371946.626 ± 3105.947 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:IPC thrpt2 5.360 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:L1-dcache-loads thrpt2 14.002 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:branches thrpt2 7.992 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:instructions thrpt2 48.931 #/op ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799082813 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: OK, got it. Thank you for the nudges. They are now effectively identical in performance, and the math-version uses 1 less cmp/mov (for array bounds & fetch), with one extra instruction, and no requirement to hit the cache. ``` Benchmark Mode Cnt Score Error Units ByteUtilsBenchmark.testSizeOfUnsignedVarint thrpt 20 503633.665 ± 5214.083 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarint:IPC thrpt2 4.608 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarint:L1-dcache-loads thrpt2 13.002 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarint:branches thrpt2 5.005 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarint:instructions thrpt2 31.080 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMath thrpt 20 507438.241 ± 5268.695 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:CPI thrpt2 0.209 clks/insn ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:L1-dcache-loads thrpt2 11.992 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:branches thrpt2 4.004 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:instructions thrpt2 32.059 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal thrpt 20 371946.626 ± 3105.947 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:IPC thrpt2 5.360 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:L1-dcache-loads thrpt2 14.002 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:branches thrpt2 7.992 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:instructions thrpt2 48.931 #/op ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-13619) zookeeper.sync.time.ms is no longer used
[ https://issues.apache.org/jira/browse/KAFKA-13619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomonari Yamashita closed KAFKA-13619. -- > zookeeper.sync.time.ms is no longer used > > > Key: KAFKA-13619 > URL: https://issues.apache.org/jira/browse/KAFKA-13619 > Project: Kafka > Issue Type: Bug > Components: core, documentation >Affects Versions: 2.0.0, 3.1.0 >Reporter: Tomonari Yamashita >Assignee: Tomonari Yamashita >Priority: Minor > > - zookeeper.sync.time.ms is no longer used. But it is present in the > documentation (1) > -- The implementation and documentation of zookeeper.sync.time.ms should be > removed. > -- As far as I can see, it was already out of use by v2.0.0. > - I've submitted pull request : " KAFKA-13619: zookeeper.sync.time.ms is no > longer used [#11717|https://github.com/apache/kafka/pull/11717]"; > (1) > [https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486744#comment-17486744 ] Tim Patterson commented on KAFKA-13600: --- Thanks [~cadonna] yeah I had a bit of a think and I think you're right, it also gets a bit weird when dealing with replicas.. Like you almost have to decide what's "caught up", assign the actives, remove those node/partitions from the candidates and then recalc for the replicas to handle the case where theres a large gab between the most caught up and second most caught up. [~vvcephei] > Is the situation that there's an active that's happens to be processing quite > a bit ahead of the replicas, such that when the active goes offline, there's > no "caught-up" node, and instead of failing the task over to the > least-lagging node, we just assign it to a fresh node Yeah that's it!. Although what we also hit a couple of times is a variation on clusters with no replicas where the active is restarted but its failed to locally checkpoint in a couple of minutes, when it comes back up its seen as not being caught up and so the task is assigned to a fresh(ish) node (of course this only occurs when the cluster is already wanting to move that task to a new home due to a node being added/removed recently) One thing I haven't really taken into consideration/thought about is clusters with more than one replica, I'm not entirely convinced it works there although the unit tests do pass. Did you mean submit as is, or to create a minimal PR where I only try to address that flaw you've identified here [https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baL92-L96] I can certainly have a go at that (it was a few months ago that I patched this so it might take me a bit to wrap my head around it again lol). Thanks Tim > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch > > > Key: KAFKA-13600 > URL: https://issues.apache.org/jira/browse/KAFKA-13600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 2.8.1, 3.0.0 >Reporter: Tim Patterson >Priority: Major > > Consider this scenario: > # A node is lost from the cluster. > # A rebalance is kicked off with a new "target assignment"'s(ie the > rebalance is attempting to move a lot of tasks - see > https://issues.apache.org/jira/browse/KAFKA-10121). > # The kafka cluster is now a bit more sluggish from the increased load. > # A Rolling Deploy happens triggering rebalances, during the rebalance > processing continues but offsets can't be committed(Or nodes are restarted > but fail to commit offsets) > # The most caught up nodes now aren't within `acceptableRecoveryLag` and so > the task is started in it's "target assignment" location, restoring all state > from scratch and delaying further processing instead of using the "almost > caught up" node. > We've hit this a few times and having lots of state (~25TB worth) and being > heavy users of IQ this is not ideal for us. > While we can increase `acceptableRecoveryLag` to larger values to try get > around this that causes other issues (ie a warmup becoming active when its > still quite far behind) > The solution seems to be to balance "balanced assignment" with "most caught > up nodes". > We've got a fork where we do just this and it's made a huge difference to the > reliability of our cluster. > Our change is to simply use the most caught up node if the "target node" is > more than `acceptableRecoveryLag` behind. > This gives up some of the load balancing type behaviour of the existing code > but in practise doesn't seem to matter too much. > I guess maybe an algorithm that identified candidate nodes as those being > within `acceptableRecoveryLag` of the most caught up node might allow the > best of both worlds. > > Our fork is > [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1] > (We also moved the capacity constraint code to happen after all the stateful > assignment to prioritise standby tasks over warmup tasks) > Ideally we don't want to maintain a fork of kafka streams going forward so > are hoping to get a bit of discussion / agreement on the best way to handle > this. > More than happy to contribute code/test different algo's in production system > or anything else to help with this issue -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799031745 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: I can get a decent bump in performance, almost to lookup table performance, by swapping the divide by 32 for a direct shift right. I'll need to work out the right incantation of casts to get a divide by 7 through shifts to give the correct answer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
jasonk000 commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799030101 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -578,41 +587,45 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; +} -boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); -ProducerIdAndEpoch producerIdAndEpoch = -transactionManager != null ? transactionManager.producerIdAndEpoch() : null; -ProducerBatch batch = deque.pollFirst(); -if (producerIdAndEpoch != null && !batch.hasSequence()) { -// If the producer id/epoch of the partition do not match the latest one -// of the producer, we update it and reset the sequence. This should be -// only done when all its in-flight batches have completed. This is guarantee -// in `shouldStopDrainBatchesForPartition`. - transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); - -// If the batch already has an assigned sequence, then we should not change the producer id and -// sequence number, since this may introduce duplicates. In particular, the previous attempt -// may actually have been accepted, and if we change the producer id and sequence here, this -// attempt will also be accepted, causing a duplicate. -// -// Additionally, we update the next sequence number bound for the partition, and also have -// the transaction manager track the batch so as to ensure that sequence ordering is maintained -// even if we receive out of order responses. -batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); - transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); -log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + -"{} being sent to partition {}", producerIdAndEpoch.producerId, -producerIdAndEpoch.epoch, batch.baseSequence(), tp); - -transactionManager.addInFlightBatch(batch); -} -batch.close(); -size += batch.records().sizeInBytes(); -ready.add(batch); +// do the rest of the work by processing outside the lock +// close() is particularly expensive +batch = deque.pollFirst(); +} -batch.drained(now); -} +boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); +ProducerIdAndEpoch producerIdAndEpoch = +transactionManager != null ? transactionManager.producerIdAndEpoch() : null; +if (producerIdAndEpoch != null && !batch.hasSequence()) { +// If the producer id/epoch of the partition do not match the latest one +// of the producer, we update it and reset the sequence. This should be +// only done when all its in-flight batches have completed. This is guarantee +// in `shouldStopDrainBatchesForPartition`. + transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); + +// If the batch already has an assigned sequence, then we should not change the producer id and +// sequence number, since this may introduce duplicates. In particular, the previous attempt +// may actually have been accepted, and if we change the producer id and sequence here, this +// attempt will also be accepted, causing a duplicate. +// +// Additionally, we update the next sequence number bound for the partition, and also have +// the transaction manager track the batch so as to ensure that sequence ordering is maintained +// even if we receive out of order responses. +batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); + transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); Review comment: OK, this makes sense. I agree it makes sense to push this
[GitHub] [kafka] dongjinleekr commented on pull request #11720: KAFKA-13625: Fix inconsistency in dynamic application log levels
dongjinleekr commented on pull request #11720: URL: https://github.com/apache/kafka/pull/11720#issuecomment-1029440873 Hi @dajac, Oh yes. As I mentioned in [KIP-817](https://cwiki.apache.org/confluence/display/KAFKA/KIP-817%3A+Fix+inconsistency+in+dynamic+application+log+levels), I already checked [KIP-412](https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels). But IMHO, I think omitting `OFF` was a mistake; As you can see in the [motivation section in the proposal](https://cwiki.apache.org/confluence/display/KAFKA/KIP-817%3A+Fix+inconsistency+in+dynamic+application+log+levels), I found that `OFF` is helpful when debugging, for the loggers like `kafka.request.logger`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
artemlivshits commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r798988451 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long nowMs) { boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); + +final ProducerBatch batch; +final long waitedTimeMs; +final boolean backingOff; +final boolean full; + +// Collect as little as possible inside critical region, determine outcome after release synchronized (deque) { -// When producing to a large number of partitions, this path is hot and deques are often empty. -// We check whether a batch exists first to avoid the more expensive checks whenever possible. -ProducerBatch batch = deque.peekFirst(); -if (batch != null) { -TopicPartition part = entry.getKey(); -Node leader = cluster.leaderFor(part); -if (leader == null) { -// This is a partition for which leader is not known, but messages are available to send. -// Note that entries are currently not removed from batches when deque is empty. -unknownLeaderTopics.add(part.topic()); -} else if (!readyNodes.contains(leader) && !isMuted(part)) { -long waitedTimeMs = batch.waitedTimeMs(nowMs); -boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; -long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; -boolean full = deque.size() > 1 || batch.isFull(); -boolean expired = waitedTimeMs >= timeToWaitMs; -boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting(); -boolean sendable = full -|| expired -|| exhausted -|| closed -|| flushInProgress() -|| transactionCompleting; -if (sendable && !backingOff) { -readyNodes.add(leader); -} else { -long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); -// Note that this results in a conservative estimate since an un-sendable partition may have -// a leader that will later be found to have sendable data. However, this is good enough -// since we'll just wake up and then sleep again for the remaining time. -nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); -} -} +batch = deque.peekFirst(); Review comment: Changes in the `ready` function LGTM ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -578,41 +587,45 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; +} -boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); -ProducerIdAndEpoch producerIdAndEpoch = -transactionManager != null ? transactionManager.producerIdAndEpoch() : null; -ProducerBatch batch = deque.pollFirst(); -if (producerIdAndEpoch != null && !batch.hasSequence()) { -// If the producer id/epoch of the partition do not match the latest one -// of the producer, we update it and reset the sequence. This should be -// only done when all its in-flight batches have completed. This is guarantee -// in `shouldStopDrainBatchesForPartition`. - transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); - -// If the batch already has an assigned sequence, then we should not change the producer id and -// sequence number, since this may introduce duplicates. In particular, the previous attempt -// may actually have been accepted, and if we change the producer id and sequence here, this -// attempt will also be accepted, causing a duplicate. -
[jira] [Commented] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486714#comment-17486714 ] John Roesler commented on KAFKA-13600: -- Hi [~tim.patterson] , thanks for the report and the patch! It sounds like you're reporting two things here: # a bug around the acceptable recovery lag. # an improvement on assignment balance If we can discuss those things independently, then we can definitely merge the bugfix immediately. Depending on the impact of the improvement, it might also fall into the category of a simple ticket, or it might be more appropriate to have a KIP as [~cadonna] suggested. Regarding the bug, I find it completely plausible that we have a bug, but I have to confess that I'm not 100% sure I understand the report. Is the situation that there's an active that's happens to be processing quite a bit ahead of the replicas, such that when the active goes offline, there's no "caught-up" node, and instead of failing the task over to the least-lagging node, we just assign it to a fresh node? If that's it, then it is certainly not the desired behavior. The notion of acceptableRecoveryLag was introduced because follower replicas will always lag the active task, by definition. We want task ownership to be able to swap over from the active to a warm-up when it's caught up, but it will never be 100% caught up (because it is a follower until it takes over). acceptableRecoveryLag is a way to define a small amount of lag that "acceptable" so that when a warm-up is only lagging by that amount, we can consider it to be effectively caught up and move the active to the warm-up node. As you can see, this has nothing at all to do with which nodes are eligible to take over when an active exits the cluster. In that case, it was always the intent that the most-caught-up node should take over active processing, regardless of its lag. I've been squinting at our existing code, and also your patch ([https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1)] . It looks to me like the flaw in the existing implementation is essentially just here: [https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baL92-L96] {code:java} // if the desired client is not caught up, and there is another client that _is_ caught up, then // we schedule a movement, so we can move the active task to the caught-up client. We'll try to // assign a warm-up to the desired client so that we can move it later on.{code} which should indeed be just like what you described: {code:java} // if the desired client is not caught up, and there is another client that _is_ more caught up, // then we schedule a movement [to] move the active task to the [most] caught-up client. // We'll try to assign a warm-up to the desired client so that we can move it later on.{code} On the other hand, we should not lose this important predicate to determine whether a task is considered "caught up: [https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-e50a755ba2a4d2f7306d1016d079018cba22f9f32993ef5dd64408d1a94d79acL245] {code:java} activeRunning(taskLag) || unbounded(acceptableRecoveryLag) || acceptable(acceptableRecoveryLag, taskLag) {code} This captures a couple of subtleties in addition to the obvious "a task is caught up if it's under the acceptable recovery lag": # A running, active task doesn't have a real lag at all, but instead its "lag" is the sentinel value `-2` # You can disable the "warm up" phase completely by setting acceptableRecoveryLag to `Long.MAX_VALUE`, in which case, we ignore lags completely and consider all nodes to be caught up, even if they didn't report a lag at all. One extra thing I like about your patch is this: [https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baR54-R56] {code:java} // Even if there is a more caught up client, as long as we're within allowable lag then // its best just to stick with what we've got {code} I agree that, if two nodes are within the acceptableRecoveryLag of each other, we should consider their lags to be effectively the same. That's something I wanted to do when we wrote this code, but couldn't figure out a good way to do it. One thing I'd need more time on is the TaskMovementTest. At first glance, it looks like those changes are just about the slightly different method signature, but I'd want to be very sure that we're still testing the same invariants that we wanted to test. Would you be willing to submit this bugfix as a PR so that we can formally review and merge it? > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch >
[GitHub] [kafka] dajac commented on pull request #11720: KAFKA-13625: Fix inconsistency in dynamic application log levels
dajac commented on pull request #11720: URL: https://github.com/apache/kafka/pull/11720#issuecomment-1029400036 @dongjinleekr Thanks for the PR. It seems that OFF was intentionally left out in [KIP-412](https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels). See the rejected alternative section. Do you think that we should change our mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
guozhangwang commented on pull request #11424: URL: https://github.com/apache/kafka/pull/11424#issuecomment-1029397689 > If by "first place" you mean https://github.com/apache/kafka/pull/11424#discussion_r796139442, then no, other way around actually. The TopologyConfig bug is the one that actually breaks compatibility, the TTD one actually doesn't really do anything -- if I understand the TTD correctly. See https://github.com/apache/kafka/pull/11424#discussion_r798443526 Crystal! Thanks for the clarification @ableegoldman -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11672: KAFKA-13577: Replace easymock with mockito in kafka:core - part 1
mimaison commented on pull request #11672: URL: https://github.com/apache/kafka/pull/11672#issuecomment-1029397144 Thanks @tombentley. I've pushed an update that improves coverage of all mocks whenever possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #11672: KAFKA-13577: Replace easymock with mockito in kafka:core - part 1
mimaison commented on a change in pull request #11672: URL: https://github.com/apache/kafka/pull/11672#discussion_r798964433 ## File path: core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala ## @@ -134,36 +129,33 @@ class InterBrokerSendThreadTest { val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler) -EasyMock.expect(networkClient.newClientRequest( - EasyMock.eq("1"), - EasyMock.same(handler.request), - EasyMock.anyLong(), - EasyMock.eq(true), - EasyMock.eq(requestTimeoutMs), - EasyMock.same(handler.handler))) - .andReturn(clientRequest) +when(networkClient.newClientRequest( + ArgumentMatchers.eq("1"), + same(handler.request), + anyLong(), + ArgumentMatchers.eq(true), + ArgumentMatchers.eq(requestTimeoutMs), + same(handler.handler))) + .thenReturn(clientRequest) -EasyMock.expect(networkClient.ready(node, time.milliseconds())) - .andReturn(false) +when(networkClient.ready(node, time.milliseconds())) + .thenReturn(false) -EasyMock.expect(networkClient.connectionDelay(EasyMock.anyObject(), EasyMock.anyLong())) - .andReturn(0) +when(networkClient.connectionDelay(any[Node], anyLong())) + .thenReturn(0) -EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong())) - .andReturn(new util.ArrayList()) +when(networkClient.poll(anyLong(), anyLong())) + .thenReturn(new util.ArrayList[ClientResponse]()) -EasyMock.expect(networkClient.connectionFailed(node)) - .andReturn(true) + when(networkClient.connectionFailed(node)) + .thenReturn(true) -EasyMock.expect(networkClient.authenticationException(node)) - .andReturn(new AuthenticationException("")) - -EasyMock.replay(networkClient) +when(networkClient.authenticationException(node)) + .thenReturn(new AuthenticationException("")) sendThread.enqueue(handler) sendThread.doWork() -EasyMock.verify(networkClient) Review comment: Yeah it's tricky to get the exact same coverage with Mockito. I've tried to verify all the important calls, especially on the component being tested, and whenever possible verify all the mocks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-13600: Assignee: (was: John Roesler) > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch > > > Key: KAFKA-13600 > URL: https://issues.apache.org/jira/browse/KAFKA-13600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 2.8.1, 3.0.0 >Reporter: Tim Patterson >Priority: Major > > Consider this scenario: > # A node is lost from the cluster. > # A rebalance is kicked off with a new "target assignment"'s(ie the > rebalance is attempting to move a lot of tasks - see > https://issues.apache.org/jira/browse/KAFKA-10121). > # The kafka cluster is now a bit more sluggish from the increased load. > # A Rolling Deploy happens triggering rebalances, during the rebalance > processing continues but offsets can't be committed(Or nodes are restarted > but fail to commit offsets) > # The most caught up nodes now aren't within `acceptableRecoveryLag` and so > the task is started in it's "target assignment" location, restoring all state > from scratch and delaying further processing instead of using the "almost > caught up" node. > We've hit this a few times and having lots of state (~25TB worth) and being > heavy users of IQ this is not ideal for us. > While we can increase `acceptableRecoveryLag` to larger values to try get > around this that causes other issues (ie a warmup becoming active when its > still quite far behind) > The solution seems to be to balance "balanced assignment" with "most caught > up nodes". > We've got a fork where we do just this and it's made a huge difference to the > reliability of our cluster. > Our change is to simply use the most caught up node if the "target node" is > more than `acceptableRecoveryLag` behind. > This gives up some of the load balancing type behaviour of the existing code > but in practise doesn't seem to matter too much. > I guess maybe an algorithm that identified candidate nodes as those being > within `acceptableRecoveryLag` of the most caught up node might allow the > best of both worlds. > > Our fork is > [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1] > (We also moved the capacity constraint code to happen after all the stateful > assignment to prioritise standby tasks over warmup tasks) > Ideally we don't want to maintain a fork of kafka streams going forward so > are hoping to get a bit of discussion / agreement on the best way to handle > this. > More than happy to contribute code/test different algo's in production system > or anything else to help with this issue -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-13600: Assignee: John Roesler > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch > > > Key: KAFKA-13600 > URL: https://issues.apache.org/jira/browse/KAFKA-13600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.1, 3.0.0 >Reporter: Tim Patterson >Assignee: John Roesler >Priority: Major > > Consider this scenario: > # A node is lost from the cluster. > # A rebalance is kicked off with a new "target assignment"'s(ie the > rebalance is attempting to move a lot of tasks - see > https://issues.apache.org/jira/browse/KAFKA-10121). > # The kafka cluster is now a bit more sluggish from the increased load. > # A Rolling Deploy happens triggering rebalances, during the rebalance > processing continues but offsets can't be committed(Or nodes are restarted > but fail to commit offsets) > # The most caught up nodes now aren't within `acceptableRecoveryLag` and so > the task is started in it's "target assignment" location, restoring all state > from scratch and delaying further processing instead of using the "almost > caught up" node. > We've hit this a few times and having lots of state (~25TB worth) and being > heavy users of IQ this is not ideal for us. > While we can increase `acceptableRecoveryLag` to larger values to try get > around this that causes other issues (ie a warmup becoming active when its > still quite far behind) > The solution seems to be to balance "balanced assignment" with "most caught > up nodes". > We've got a fork where we do just this and it's made a huge difference to the > reliability of our cluster. > Our change is to simply use the most caught up node if the "target node" is > more than `acceptableRecoveryLag` behind. > This gives up some of the load balancing type behaviour of the existing code > but in practise doesn't seem to matter too much. > I guess maybe an algorithm that identified candidate nodes as those being > within `acceptableRecoveryLag` of the most caught up node might allow the > best of both worlds. > > Our fork is > [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1] > (We also moved the capacity constraint code to happen after all the stateful > assignment to prioritise standby tasks over warmup tasks) > Ideally we don't want to maintain a fork of kafka streams going forward so > are hoping to get a bit of discussion / agreement on the best way to handle > this. > More than happy to contribute code/test different algo's in production system > or anything else to help with this issue -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13600: - Affects Version/s: 3.1.0 > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch > > > Key: KAFKA-13600 > URL: https://issues.apache.org/jira/browse/KAFKA-13600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 2.8.1, 3.0.0 >Reporter: Tim Patterson >Assignee: John Roesler >Priority: Major > > Consider this scenario: > # A node is lost from the cluster. > # A rebalance is kicked off with a new "target assignment"'s(ie the > rebalance is attempting to move a lot of tasks - see > https://issues.apache.org/jira/browse/KAFKA-10121). > # The kafka cluster is now a bit more sluggish from the increased load. > # A Rolling Deploy happens triggering rebalances, during the rebalance > processing continues but offsets can't be committed(Or nodes are restarted > but fail to commit offsets) > # The most caught up nodes now aren't within `acceptableRecoveryLag` and so > the task is started in it's "target assignment" location, restoring all state > from scratch and delaying further processing instead of using the "almost > caught up" node. > We've hit this a few times and having lots of state (~25TB worth) and being > heavy users of IQ this is not ideal for us. > While we can increase `acceptableRecoveryLag` to larger values to try get > around this that causes other issues (ie a warmup becoming active when its > still quite far behind) > The solution seems to be to balance "balanced assignment" with "most caught > up nodes". > We've got a fork where we do just this and it's made a huge difference to the > reliability of our cluster. > Our change is to simply use the most caught up node if the "target node" is > more than `acceptableRecoveryLag` behind. > This gives up some of the load balancing type behaviour of the existing code > but in practise doesn't seem to matter too much. > I guess maybe an algorithm that identified candidate nodes as those being > within `acceptableRecoveryLag` of the most caught up node might allow the > best of both worlds. > > Our fork is > [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1] > (We also moved the capacity constraint code to happen after all the stateful > assignment to prioritise standby tasks over warmup tasks) > Ideally we don't want to maintain a fork of kafka streams going forward so > are hoping to get a bit of discussion / agreement on the best way to handle > this. > More than happy to contribute code/test different algo's in production system > or anything else to help with this issue -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on pull request #11447: KAFKA-13024
guozhangwang commented on pull request #11447: URL: https://github.com/apache/kafka/pull/11447#issuecomment-1029335322 > I might be missing the full picture here as I am a bit new to the project, but, as I see it, the algorithm I described above does not suffer from this issue. There, we essentially extract if (record.key() == null || record.value() == null) { code from the operations (e.g. https://github.com/apache/kafka/pull/11447/files#diff-ad3728ff5f32ed1e88e11e46c0de970bcce01c39767fd3ae666e4a0bfbd97be3L87-L104), except we also don't check for that condition more than necessary. My rationale is that, we do some optimizations at the end of the building phase which may move some repartition up in the topology etc, which would effectively change the topology. And hence, we would not know if all of the downstream operators after the repartition would be filtering null keys until the optimization that changed the topology is done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
guozhangwang commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r798901691 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ## @@ -329,7 +330,7 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, final ThreadCache cache = new ThreadCache( logContext, -Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)), +Math.max(0, streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)), Review comment: > got it.. If i understood correctly, we need a new PR with all the changes in this PR and the new ones along with document changes, right? That sounds right to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13642) We should have a command-line tool + API to display fenced / unfenced / etc. brokers
Colin McCabe created KAFKA-13642: Summary: We should have a command-line tool + API to display fenced / unfenced / etc. brokers Key: KAFKA-13642 URL: https://issues.apache.org/jira/browse/KAFKA-13642 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe We should have a command-line tool + API to display fenced / unfenced / etc. brokers -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486529#comment-17486529 ] Ulrik edited comment on KAFKA-13638 at 2/3/22, 6:28 PM: Hi [~cadonna], I forked the repo and added a test. Here's the commit https://github.com/ulejon/kafka/commit/fbf8d89231fb0201ee6a48767a8dcec4af8585f6 The test was very quick and took about 900 ms. Running the test on the trunk branch took 6 seconds. was (Author: JIRAUSER284585): Hi [~cadonna], I forked the repo and added a test. Here's the commit: [https://github.com/ulejon/kafka/commit/fbf8d89231fb0201ee6a48767a8dcec4af8585f6.] The test was very quick and took about 900 ms. Running the test on the trunk branch took 6 seconds. > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] cmccabe merged pull request #11729: MINOR: fix control plane listener + kraft error message
cmccabe merged pull request #11729: URL: https://github.com/apache/kafka/pull/11729 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
[ https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486584#comment-17486584 ] Matthias J. Sax commented on KAFKA-13641: - The Java API faces the same issue – it's hard from a backward compatibility POV to change the signature. Also the input `null`-value question is tricky: atm, to avoid ambiguity, the runtime drops all join input records with `null`-value (also with `null`-key) as malformed. Thus, if you get a `null` you always know that it's a left/outer join call with no matching join record from the other side. > Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters > - > > Key: KAFKA-13641 > URL: https://issues.apache.org/jira/browse/KAFKA-13641 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mohammad Yousuf Minhaj Zia >Priority: Minor > > Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be > nullable, I am wondering if can wrap them around Scala `Option`. > However, there is also the concern that the left hand side value can be null > in the case of tombstone messages, in which the `Option` semantics can be > misleading. I still feel this could be a useful feature in reducing the > number of `NullPointerExceptions`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
[ https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13641: Language: scala needs-kip (was: scala) > Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters > - > > Key: KAFKA-13641 > URL: https://issues.apache.org/jira/browse/KAFKA-13641 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mohammad Yousuf Minhaj Zia >Priority: Minor > > Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be > nullable, I am wondering if can wrap them around Scala `Option`. > However, there is also the concern that the left hand side value can be null > in the case of tombstone messages, in which the `Option` semantics can be > misleading. I still feel this could be a useful feature in reducing the > number of `NullPointerExceptions`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486578#comment-17486578 ] Bruno Cadonna commented on KAFKA-13638: --- [~Lejon]Thank you a lot for the testing! I think we can rule out RocksDB then. > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486576#comment-17486576 ] Federico Valeri commented on KAFKA-12635: - I was able to reproduce the issue on Kafka 2.7.2 and 2.8.1, but not on 3.1.0. State of the source cluster after producing/consuming 1mln records: {code} $ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9090 --describe --topic my-topic Topic: my-topic PartitionCount: 3 ReplicationFactor: 3Configs: min.insync.replicas=2,message.format.version=2.8-IV1 Topic: my-topic Partition: 0Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: my-topic Partition: 1Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: my-topic Partition: 2Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 $ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9090 --describe --group my-group Consumer group 'my-group' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOSTCLIENT-ID my-groupmy-topic0 332600 332600 0 - - - my-groupmy-topic1 335510 335510 0 - - - my-groupmy-topic2 331890 331890 0 - - - {code} State of the target cluster after MM2 has done its job (sync.group.offsets.enabled = true, replication.policy.class = io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy): {code} $ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9093 --describe --topic my-topic Topic: my-topic PartitionCount: 3 ReplicationFactor: 3Configs: min.insync.replicas=2,message.format.version=2.8-IV1 Topic: my-topic Partition: 0Leader: 3 Replicas: 3,4,5 Isr: 3,4,5 Topic: my-topic Partition: 1Leader: 4 Replicas: 4,5,3 Isr: 4,5,3 Topic: my-topic Partition: 2Leader: 5 Replicas: 5,3,4 Isr: 5,3,4 $ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9093 --describe --group my-group Consumer group 'my-group' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOSTCLIENT-ID my-groupmy-topic0 332600 0 -332600 - - - my-groupmy-topic1 335510 0 -335510 - - - my-groupmy-topic2 331890 0 -331890 - - - {code} There is actually no need to set a custom value for retention.ms in order to trigger the issue. > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mimaison commented on pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)
mimaison commented on pull request #10907: URL: https://github.com/apache/kafka/pull/10907#issuecomment-1029158871 @C0urante Thanks for the PR! It's really massive! Can we try to cut it in smaller chunks? That would make it a lot easier to review and allow merging bit by bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486565#comment-17486565 ] Ulrik commented on KAFKA-13638: --- [~cadonna] Done. Average time for the test was the same as the other branch: around 900 ms > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13293) Support client reload of JKS/PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486563#comment-17486563 ] Elliot West edited comment on KAFKA-13293 at 2/3/22, 4:15 PM: -- FWIW we've implemented a custom {{SslEngineFactory}} here: https://github.com/apache/kafka/pull/11731 Would this be more generally useful as an interim solution? Or is there process on the dynamic client configuration work? was (Author: teabot): FWIW we've implemented a custom {{SslEngineFactory}} here: https://github.com/apache/kafka/pull/11731 > Support client reload of JKS/PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Producer/Consumer clients do not currently automatically reload certificates > when the key stores were modified, or certificates expire. Currently one > supplies key chains when instantiating clients only - there is no mechanism > available to either directly reconfigure the client, or for the client to > observe changes to the original properties set reference used in > construction. Additionally, no work-arounds are documented that might given > users alternative strategies for dealing with expiring certificates. > Given that expiration and renewal of certificates is an industry standard > practice, it could be argued that the current client certificate > implementation is not fit for purpose. A mechanism should be provided such > that clients can automatically detect, load, and use updated key chains from > some abstracted source. > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client certs (perhaps > closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13293) Support client reload of JKS/PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486563#comment-17486563 ] Elliot West commented on KAFKA-13293: - FWIW we've implemented a custom {{SslEngineFactory}} here: https://github.com/apache/kafka/pull/11731 > Support client reload of JKS/PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Producer/Consumer clients do not currently automatically reload certificates > when the key stores were modified, or certificates expire. Currently one > supplies key chains when instantiating clients only - there is no mechanism > available to either directly reconfigure the client, or for the client to > observe changes to the original properties set reference used in > construction. Additionally, no work-arounds are documented that might given > users alternative strategies for dealing with expiring certificates. > Given that expiration and renewal of certificates is an industry standard > practice, it could be argued that the current client certificate > implementation is not fit for purpose. A mechanism should be provided such > that clients can automatically detect, load, and use updated key chains from > some abstracted source. > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client certs (perhaps > closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] teabot opened a new pull request #11731: KAFKA-13293: Reloading engine factory
teabot opened a new pull request #11731: URL: https://github.com/apache/kafka/pull/11731 This PR adds an optional `SslEngineFactory` implementation that decorates and manages a delegate. It is able to recreate the delegate either on a schedule, or in response to a SSL configuration patch being applied. The purpose of this implementation is to permit the automatic reload of renewed client certificates. This implementation includes a supporting unit test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486557#comment-17486557 ] Bruno Cadonna commented on KAFKA-13638: --- [~Lejon] Could you also try with https://github.com/cadonna/kafka/tree/test-KAFKA-13638-6.27.3 that uses RocksDB 6.27.3 with AK 2.8? > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486529#comment-17486529 ] Ulrik edited comment on KAFKA-13638 at 2/3/22, 3:05 PM: Hi [~cadonna], I forked the repo and added a test. Here's the commit: [https://github.com/ulejon/kafka/commit/fbf8d89231fb0201ee6a48767a8dcec4af8585f6.] The test was very quick and took about 900 ms. Running the test on the trunk branch took 6 seconds. was (Author: JIRAUSER284585): Hi [~cadonna], I forked the repo and added a test. Here's the commit: [https://github.com/ulejon/kafka/commit/fbf8d89231fb0201ee6a48767a8dcec4af8585f6.] The test was very quick and took about 900 ms. > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] tombentley commented on a change in pull request #11672: KAFKA-13577: Replace easymock with mockito in kafka:core - part 1
tombentley commented on a change in pull request #11672: URL: https://github.com/apache/kafka/pull/11672#discussion_r796754101 ## File path: core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala ## @@ -85,19 +84,17 @@ class PartitionStateMachineTest { controllerContext.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(Seq(brokerId))) controllerContext.putPartitionState(partition, NewPartition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) -EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) -EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion)) - .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null, ResponseMetadata(0, 0 - EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), - partition, leaderIsrAndControllerEpoch, replicaAssignment(Seq(brokerId)), isNew = true)) - EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) -EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) +when(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion)) + .thenReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null, ResponseMetadata(0, 0 partitionStateMachine.handleStateChanges( partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy(false)) ) -EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) +verify(mockControllerBrokerRequestBatch).newBatch() + verify(mockControllerBrokerRequestBatch).addLeaderAndIsrRequestForBrokers(Seq(brokerId), + partition, leaderIsrAndControllerEpoch, replicaAssignment(Seq(brokerId)), isNew = true) + verify(mockControllerBrokerRequestBatch).sendRequestsToBrokers(controllerEpoch) Review comment: I know we mocked it, but shouldn't we also verify `createTopicPartitionStatesRaw` here? ## File path: core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala ## @@ -165,22 +158,20 @@ class PartitionStateMachineTest { controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) -EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) -EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)) - .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), +when(mockZkClient.getTopicPartitionStatesRaw(partitions)) + .thenReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0 val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) -EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) - .andReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty)) - EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), - partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), replicaAssignment(Seq(brokerId)), isNew = false)) - EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) -EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) +when(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) + .thenReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty)) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy)) -EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) +verify(mockControllerBrokerRequestBatch).newBatch() Review comment: Again, verify the `...Raw()` call? ## File path: core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala ## @@ -165,22 +158,20 @@ class PartitionStateMachineTest { controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) -EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) -EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)) - .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), +when(mockZkClient.getTopicPartitionStatesRaw(partitions)) + .thenReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionSta
[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486529#comment-17486529 ] Ulrik commented on KAFKA-13638: --- Hi [~cadonna], I forked the repo and added a test. Here's the commit: [https://github.com/ulejon/kafka/commit/fbf8d89231fb0201ee6a48767a8dcec4af8585f6.] The test was very quick and took about 900 ms. > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486510#comment-17486510 ] Bruno Cadonna edited comment on KAFKA-13638 at 2/3/22, 2:09 PM: Hi [~Lejon], Could you try to run your test on the following branch? https://github.com/cadonna/kafka/tree/test-KAFKA-13638 This is a 2.8 branch modified to run with RocksDB 6.19.3 the version used in AK 3.0.0. was (Author: cadonna): Hi [~Lejon], Could you try to run your test with on the following branch? https://github.com/cadonna/kafka/tree/test-KAFKA-13638 This is a 2.8 branch modified to run with RocksDB 6.19.3 the version used in AK 3.0.0. > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486510#comment-17486510 ] Bruno Cadonna commented on KAFKA-13638: --- Hi [~Lejon], Could you try to run your test with on the following branch? https://github.com/cadonna/kafka/tree/test-KAFKA-13638 This is a 2.8 branch modified to run with RocksDB 6.19.3 the version used in AK 3.0.0. > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa edited a comment on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028979162 Sure @dongjinleekr .Thank you so much for sharing documentation ,Sorry I was not aware that -Dlog4j.configuration is VM parameter its pointing to in this ,so was confused with that Comment. I got it now.let me through the changes what i have made to use log4j2 properties by setting KAFKA_LOG4J_OPTS in detail,whether i have missed to update it any of the file and Will update you.Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa edited a comment on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028979162 Sure @dongjinleekr .Thank you so much for sharing documentation ,Sorry I was not aware that -Dlog4j.configuration is VM parameter its pointing to ,so was confused with that Comment. I got it now.let me through the changes what i have made to use log4j2 properties by setting KAFKA_LOG4J_OPTS in detail,whether i have missed to update it any of the file and Will update you.Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa edited a comment on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028979162 Sure @dongjinleekr .Thank you so much for sharing documentation ,Sorry I was not aware that -Dlog4j.configuration is VM parameter ,so was confused with that Comment. I got it now.let me through the changes what i have made to use log4j2 properties by setting KAFKA_LOG4J_OPTS in detail,whether i have missed to update it any of the file and Will update you.Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028979162 Sure @dongjinleekr .Thank you so much for sharing documentation ,let me through the changes what i have made to use log4j2 properties by setting KAFKA_LOG4J_OPTS in detail,whether i have missed to update it any of the file. Will update you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Indupa removed a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa removed a comment on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028932370 Where I could find it @dongjinleekr ..? And can you help me what is the use of VM parameter and why is it required? Because i Haven't heard of this Vm parameter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
dongjinleekr commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028965316 @Indupa Please refer [the documentation](https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+Upgrade+log4j+to+log4j2) and the log messages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486434#comment-17486434 ] Indupriya edited comment on KAFKA-9366 at 2/3/22, 12:46 PM: Hi [~dongjin] Could you please help me any approximate dates for official release for latest log4j build..? was (Author: JIRAUSER284624): Hi [~dongjin] Could you please help us any approximate dates for official release for latest log4j build..? > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486434#comment-17486434 ] Indupriya commented on KAFKA-9366: -- Hi [~dongjin] Could you please help us any approximate dates for official release for latest log4j build..? > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366 ] Indupriya deleted comment on KAFKA-9366: -- was (Author: JIRAUSER284624): Hi [~dongjin] , Con you please confirm. this issue KAFKA-12399 will resolve all the critical vulnerabilities along with CVE-2019-17571 ..? > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486380#comment-17486380 ] Indupriya edited comment on KAFKA-9366 at 2/3/22, 12:34 PM: Hi [~dongjin] , Con you please confirm. this issue KAFKA-12399 will resolve all the critical vulnerabilities along with CVE-2019-17571 ..? was (Author: JIRAUSER284624): Hi [~dongjin] , this issue KAFKA-12399 will resolve all the critical vulnerabilities along with CVE-2019-17571 ..? > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486380#comment-17486380 ] Indupriya edited comment on KAFKA-9366 at 2/3/22, 12:31 PM: Hi [~dongjin] , this issue KAFKA-12399 will resolve all the critical vulnerabilities along with CVE-2019-17571 ..? was (Author: JIRAUSER284624): Hi [~dongjin] , You mean The patch you have released ,will resolve all the critical vulnerabilities along with CVE-2019-17571 ..? > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486380#comment-17486380 ] Indupriya edited comment on KAFKA-9366 at 2/3/22, 12:29 PM: Hi [~dongjin] , You mean The patch you have released ,will resolve all the critical vulnerabilities along with CVE-2019-17571 ..? was (Author: JIRAUSER284624): Can Anyone suggests fix for this vulnerabilities in kafka by the use of log4j-1.2.17.? > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
[ https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486427#comment-17486427 ] Mohammad Yousuf Minhaj Zia commented on KAFKA-13641: If I am not wrong, the `ValueJoiner` is a java class which is sort of like a function signature with generic parameters `value1` and `value2`. What I would like is to create a Scala wrapper such that: ``` type ValueLeftJoiner[A, B, C) => (A, Option[B]) => C type ValueOuterJoiner[A, B, C) => (Option[A], B) => C type ValueInnerJoiner[A, B, C) => (A, B) => C ``` or to deal with tombstones just make both inputs optional with a central ValueJoiner: ``` type ValueJoiner[A, B, C) => (Option[A], Option[B]) => C ``` Unfortunately this makes things quite inconsistent with the rest of the codebase because a null could be returned or be a parameter virtually anywhere apart from joiners. Wondering if we will ever making the explicitness of nullable fields better in Kafka? > Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters > - > > Key: KAFKA-13641 > URL: https://issues.apache.org/jira/browse/KAFKA-13641 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mohammad Yousuf Minhaj Zia >Priority: Minor > > Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be > nullable, I am wondering if can wrap them around Scala `Option`. > However, there is also the concern that the left hand side value can be null > in the case of tombstone messages, in which the `Option` semantics can be > misleading. I still feel this could be a useful feature in reducing the > number of `NullPointerExceptions`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028932370 Where I could find it @dongjinleekr ..? And can you help me what is the use of VM parameter and why is it required? Because i Haven't heard of this Vm parameter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #11730: MINOR: Use ducktape version 0.7.17
dajac merged pull request #11730: URL: https://github.com/apache/kafka/pull/11730 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
dongjinleekr commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028885839 @Indupa Which VM paramater are you using? -Dlog4j.configuration or -Dlog4j.configuration**File**? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
ableegoldman commented on pull request #11424: URL: https://github.com/apache/kafka/pull/11424#issuecomment-1028879209 > My read on the code is that we only need to change the TopologyTestDriver, while the first place seems fine to me. Did I miss anything? If by "first place" you mean [the bug in the TopologyConfig class](https://github.com/apache/kafka/pull/11424#discussion_r796139442), then no, other way around actually. The TopologyConfig bug is the one that actually breaks compatibility, the TTD one actually doesn't really do anything -- if I understand the TTD correctly. See [this](https://github.com/apache/kafka/pull/11424#discussion_r798443526) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028876857 Hi @dongjinleekr , Yeah I refered this article earlier,but didn't get to know the soluton what they are suggesting like Dlog4j.configuration in VM Arguments.I have not seen where we are using this VM arguments. How i can overcome from this issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
vamossagar12 commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r798456223 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ## @@ -329,7 +330,7 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, final ThreadCache cache = new ThreadCache( logContext, -Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)), +Math.max(0, streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)), Review comment: got it.. If i understood correctly, we need a new PR with all the changes in this PR and the new ones along with document changes, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
vamossagar12 commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r798454215 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java ## @@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); } else { -maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); +maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) +? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1; } -if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { -cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); -log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); +if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) || +isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { + +if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { +cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); +log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}", +topologyName, +STATESTORE_CACHE_MAX_BYTES_CONFIG, +CACHE_MAX_BYTES_BUFFERING_CONFIG, +STATESTORE_CACHE_MAX_BYTES_CONFIG, +cacheSize); +} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { +cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); +log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); +} else { +cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); +log.info("Topology {} is overriding {} to {}", topologyName, STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize); +} } else { -cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); +cacheSize = globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); Review comment: thanks @ableegoldman! i am running slightly occupied currently. But, I will make the changes in the next few days. As you said, i will introduce a new utility class and move these methods out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486380#comment-17486380 ] Indupriya commented on KAFKA-9366: -- Can Anyone suggests fix for this vulnerabilities in kafka by the use of log4j-1.2.17.? > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
dongjinleekr commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028870380 Hi @Indupa, Please refer [here](https://stackoverflow.com/questions/66545546/error-statuslogger-reconfiguration-failed-no-configuration-found-for-73d16e93). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
[ https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486378#comment-17486378 ] Kvicii.Yu commented on KAFKA-13641: --- [~yzia2000] hi, I didn't understand what you mean, or you can tell me class name#method name need to add Option? > Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters > - > > Key: KAFKA-13641 > URL: https://issues.apache.org/jira/browse/KAFKA-13641 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mohammad Yousuf Minhaj Zia >Priority: Minor > > Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be > nullable, I am wondering if can wrap them around Scala `Option`. > However, there is also the concern that the left hand side value can be null > in the case of tombstone messages, in which the `Option` semantics can be > misleading. I still feel this could be a useful feature in reducing the > number of `NullPointerExceptions`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028865528 Hi @dongjinleekr , yeah I already have made changes to use log4j2 properties, set KAFKA_LOG4J_OPTS. But after that as well, as I sent in my previous comment, iam getting this following exception. 2022-02-02 05:57:17.158 [INF] [Kafka] Connecting to localhost:2181 2022-02-02 05:57:27.571 [INF] [Kafka] WATCHER:: 2022-02-02 05:57:27.571 [INF] [Kafka] WatchedEvent state:SyncConnected type:None path:null 2022-02-02 05:57:27.574 [INF] [Kafka] [] 2022-02-02 05:58:17.227 [ERR] [Kafka] ERROR StatusLogger Reconfiguration failed: No configuration found for '764c12b6' at 'null' in 'null'. But still iam getting this exception,Kafka is not running.Can you help me with What is the cause of this exception..? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
ableegoldman commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r798443526 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ## @@ -329,7 +330,7 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, final ThreadCache cache = new ThreadCache( logContext, -Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)), +Math.max(0, streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)), Review comment: > since it's a test case so it shouldn't really matter. Isn't that the case? Well if someone is using the TTD to write unit tests and those tests start to fail after they upgrade because the caching is different, I would say that's compatibility change. Although I read the TTD's javadocs earlier and remembered that it actually processes records synchronously, which effectively means that the only thing that matters/affects the TTD results is whether the cache size is non-zero or has been set to 0 -- and setting it to 0 only matters if it's correctly set to 0 in the TopologyConfig, not the value here. Which is a long way of saying that in hindsight, this config/bug doesn't really impact anything after all. In fact, imho we should probably just hard-code the TTD's ThreadCache cache size to 0 -- but let's not wrap that change into an already rather large PR in case there's something I'm not taking into account here. So tl;dr, for future reference do still need to maintain backwards compatibility in the TTD since it's part of the public interface. But it just so happens that this particular bug doesn't actually break anything "real" or have any visible impact (at least AFAICT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
ableegoldman commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r798399133 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java ## @@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); } else { -maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); +maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) +? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1; } -if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { -cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); -log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); +if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) || +isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { + +if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { +cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); +log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}", +topologyName, +STATESTORE_CACHE_MAX_BYTES_CONFIG, +CACHE_MAX_BYTES_BUFFERING_CONFIG, +STATESTORE_CACHE_MAX_BYTES_CONFIG, +cacheSize); +} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { +cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); +log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); +} else { +cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); +log.info("Topology {} is overriding {} to {}", topologyName, STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize); +} } else { -cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); +cacheSize = globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); Review comment: Ah, I see the confusion. The `#isTopologyOverride` method checks whether the config has been overridden for the specific topology, ie has been set in the Properties passed in to `StreamsBuilder#build` -- it's not looking at what we call the `globalAppConfigs` which are the actual application configs: ie those passed in to the `KafkaStreams` constructor. So basically there are two sets of configs. The value should be taken as the first of these to be set by the user, in the following order: 1) `statestore.cache.max.bytes` in `topologyOverrides` 2) `cache.max.bytes.buffering` in `topologyOverrides` 3)`statestore.cache.max.bytes` in `globalAppConfigs` 4) `cache.max.bytes.buffering` in `globalAppConfigs` Essentially, using `#getTotalCacheSize` on the `topologyOverrides` if either of them is set (which this PR is doing) and on the `globalAppConfigs` if they are not (which is the regression here). On that note -- we also need to move `##getTotalCacheSize` out of StreamsConfig, because it's a public class and wasn't listed as a public API in the KIP (nor should it be, imo). I recommend creating a new static utility class for things like this, eg `StreamsConfigUtils` in the `org.apache.kafka.streams.internals` package. There are some other methods that would belong there, for example the `StreamThread` methods `#processingMode` and `#eosEnabled` should be moved as well Hope that all makes sense -- and lmk if you don't think you'll have the time to put out a full patch, and I or another Streams dev can help out 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13583) Fix FetchRequestBetweenDifferentIbpTest flaky tests
[ https://issues.apache.org/jira/browse/KAFKA-13583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-13583. - Fix Version/s: 3.2.0 3.1.1 Reviewer: David Jacot Resolution: Fixed > Fix FetchRequestBetweenDifferentIbpTest flaky tests > --- > > Key: KAFKA-13583 > URL: https://issues.apache.org/jira/browse/KAFKA-13583 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: Kvicii.Yu >Priority: Minor > Fix For: 3.2.0, 3.1.1 > > > FetchRequestBetweenDifferentIbpTest's tests often fails with: > {noformat} > org.opentest4j.AssertionFailedError: expected: <2> but was: <1> > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) > at org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) > at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) > at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145) > at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:510) > at > integration.kafka.server.FetchRequestBetweenDifferentIbpTest.testControllerSwitchingIBP(FetchRequestBetweenDifferentIbpTest.scala:113) > at > integration.kafka.server.FetchRequestBetweenDifferentIbpTest.testControllerOldToNewIBP(FetchRequestBetweenDifferentIbpTest.scala:87) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > {noformat} > This might be due to > [https://github.com/apache/kafka/commit/e8818e234a879d5ca45accba0121f43f45381f4a] > where we reduced the poll timeout. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac merged pull request #11699: Fix FetchRequestBetweenDifferentIbpTest flaky tests
dajac merged pull request #11699: URL: https://github.com/apache/kafka/pull/11699 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #11710: MINOR: MiniTrogdorCluster mutates objects from other threads
dajac merged pull request #11710: URL: https://github.com/apache/kafka/pull/11710 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #11727: MINOR:fix AbstractStickyAssignor doc
dajac merged pull request #11727: URL: https://github.com/apache/kafka/pull/11727 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13637) User default.api.timeout.ms config as default timeout for KafkaConsumer.endOffsets
[ https://issues.apache.org/jira/browse/KAFKA-13637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-13637. - Fix Version/s: 3.0.1 3.2.0 3.1.1 Reviewer: David Jacot Resolution: Fixed > User default.api.timeout.ms config as default timeout for > KafkaConsumer.endOffsets > -- > > Key: KAFKA-13637 > URL: https://issues.apache.org/jira/browse/KAFKA-13637 > Project: Kafka > Issue Type: Improvement >Reporter: dengziming >Assignee: dengziming >Priority: Major > Fix For: 3.0.1, 3.2.0, 3.1.1 > > > In KafkaConsumer, we use `request.timeout.ms` in `endOffsets` and > `default.api.timeout.ms` when in `beginningOffsets`, we should use > `default.api.timeout.ms` for both. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac merged pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets
dajac merged pull request #11726: URL: https://github.com/apache/kafka/pull/11726 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486321#comment-17486321 ] Dongjin Lee commented on KAFKA-9366: Hi [~noonbs], this issue will be resolved with KAFKA-12399. I expect it will be done in 3.2.0. > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)