[jira] [Created] (KAFKA-13645) Support the TopologyTestDriver with modular topologies

2022-02-03 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13645:
--

 Summary: Support the TopologyTestDriver with modular topologies
 Key: KAFKA-13645
 URL: https://issues.apache.org/jira/browse/KAFKA-13645
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


Currently the TTD accepts only a single Topology. Users can technically just 
use one TTD per Topology, but for a complete simulation of the actual 
KafkaStreams app we'll need to add support for processing multiple modular 
topologies with the TopologyTestDriver



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #5407: KAFKA-7109: Close cached fetch sessions in the broker on consumer close

2022-02-03 Thread GitBox


showuon commented on pull request #5407:
URL: https://github.com/apache/kafka/pull/5407#issuecomment-1029737477


   @stanislavkozlovski , encountered similar errors and I agree we should close 
the incremental fetch sessions when consumer closed. Are you able to complete 
this PR? If not, I can co-author with you to help complete this PR to merge 
this good improvement. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11720: KAFKA-13625: Fix inconsistency in dynamic application log levels

2022-02-03 Thread GitBox


dajac commented on pull request #11720:
URL: https://github.com/apache/kafka/pull/11720#issuecomment-1029736633


   @dongjinleekr Thanks for the explanation. I missed KIP-817 in the mailing 
list. Let's continue the discussion there and see what the community think 
about this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13644) Support global state stores with modular topologies

2022-02-03 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13644:
--

 Summary: Support global state stores with modular topologies
 Key: KAFKA-13644
 URL: https://issues.apache.org/jira/browse/KAFKA-13644
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13643) Replace "NamedTopology" with "ModularTopology" in the codebase

2022-02-03 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13643:
--

 Summary: Replace "NamedTopology" with "ModularTopology" in the 
codebase
 Key: KAFKA-13643
 URL: https://issues.apache.org/jira/browse/KAFKA-13643
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13534) Upgrade Log4j to 2.15.0 - CVE-2021-44228

2022-02-03 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486845#comment-17486845
 ] 

Luke Chen commented on KAFKA-13534:
---

[~noonbs] , thanks for the comment. I've set duplicate with KAFKA-9366 and 
close this ticket.

> Upgrade Log4j to 2.15.0 - CVE-2021-44228
> 
>
> Key: KAFKA-13534
> URL: https://issues.apache.org/jira/browse/KAFKA-13534
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 2.7.0, 2.8.0, 3.0.0
>Reporter: Sai Kiran Vudutala
>Priority: Major
>
> Log4j has an RCE vulnerability, see 
> [https://www.lunasec.io/docs/blog/log4j-zero-day/]
> References. 
> [https://github.com/advisories/GHSA-jfh8-c2jp-5v3q]
> [https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13534) Upgrade Log4j to 2.15.0 - CVE-2021-44228

2022-02-03 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13534.
---
Resolution: Duplicate

> Upgrade Log4j to 2.15.0 - CVE-2021-44228
> 
>
> Key: KAFKA-13534
> URL: https://issues.apache.org/jira/browse/KAFKA-13534
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 2.7.0, 2.8.0, 3.0.0
>Reporter: Sai Kiran Vudutala
>Priority: Major
>
> Log4j has an RCE vulnerability, see 
> [https://www.lunasec.io/docs/blog/log4j-zero-day/]
> References. 
> [https://github.com/advisories/GHSA-jfh8-c2jp-5v3q]
> [https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon edited a comment on pull request #11631: KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-02-03 Thread GitBox


showuon edited a comment on pull request #11631:
URL: https://github.com/apache/kafka/pull/11631#issuecomment-1029613788


   @guozhangwang , thanks for your comments. Sorry for confusing you! And it 
makes sense to remove the `WIP` out from the PR title. 
   I'll let you know when I complete the PR. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11631: KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-02-03 Thread GitBox


showuon commented on pull request #11631:
URL: https://github.com/apache/kafka/pull/11631#issuecomment-1029613788


   @guozhangwang , thanks for your comments. Sorry for confusing you! And it 
makes sense to remove the `KIP` out from the PR title. 
   I'll let you know when I complete the PR. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-03 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486808#comment-17486808
 ] 

Guozhang Wang commented on KAFKA-13600:
---

Thanks guys for the great discussion here.

I think just changing (and of course still maintaining the subtlety of those 
edge cases) the behavior of "if we cannot find any caught-up node, assign to a 
fresh node" to "if we cannot find any caught-up node, pick one that is closest 
to head" as a general principal should be okay for just the scope of this 
ticket. We can leave further improvements of the assignment algorithm (I know 
Bruno/John already have some ideas) to a larger scoped KIP. WDYT?

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Priority: Major
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters

2022-02-03 Thread Mohammad Yousuf Minhaj Zia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486807#comment-17486807
 ] 

Mohammad Yousuf Minhaj Zia commented on KAFKA-13641:


Going a step further maybe we could also make Tombstone's semantically clear 
using `Either` such that a value in other functions can be `Either` where Tombstone is equivalent to null. But that is unrelated to 
this issue so treat it like a fleeting thought.

> Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
> -
>
> Key: KAFKA-13641
> URL: https://issues.apache.org/jira/browse/KAFKA-13641
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mohammad Yousuf Minhaj Zia
>Priority: Minor
>
> Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be 
> nullable, I am wondering if can wrap them around Scala `Option`.
> However, there is also the concern that the left hand side value can be null 
> in the case of tombstone messages, in which the `Option` semantics can be 
> misleading. I still feel this could be a useful feature in reducing the 
> number of `NullPointerExceptions`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters

2022-02-03 Thread Mohammad Yousuf Minhaj Zia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486804#comment-17486804
 ] 

Mohammad Yousuf Minhaj Zia commented on KAFKA-13641:


[~mjsax] The backwards compatibility is definitely a problem. One solution 
would be to maintain two sets of functions one with `ValueJoiner` and one 
without but that will get hard to maintain over time as well. Maybe there is an 
abstraction we can abuse to allow both types of `Option` and non-`Option` 
values in a `ValueJoiner`.

I genuinely feel this would improve the safety of the implementations.

Also thanks for pointing out that null values are dropped. If that is the case 
then maybe having `Option` for the specific values of the join will help a lot 
in making people understand the join semantics just by looking at the signature.

> Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
> -
>
> Key: KAFKA-13641
> URL: https://issues.apache.org/jira/browse/KAFKA-13641
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mohammad Yousuf Minhaj Zia
>Priority: Minor
>
> Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be 
> nullable, I am wondering if can wrap them around Scala `Option`.
> However, there is also the concern that the left hand side value can be null 
> in the case of tombstone messages, in which the `Option` semantics can be 
> misleading. I still feel this could be a useful feature in reducing the 
> number of `NullPointerExceptions`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jsancio opened a new pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

2022-02-03 Thread GitBox


jsancio opened a new pull request #11733:
URL: https://github.com/apache/kafka/pull/11733


   TODO:  Add a description for this PR!!
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-03 Thread GitBox


jasonk000 commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799138886



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -578,41 +587,45 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 } else {
 if (shouldStopDrainBatchesForPartition(first, tp))
 break;
+}
 
-boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-ProducerIdAndEpoch producerIdAndEpoch =
-transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-ProducerBatch batch = deque.pollFirst();
-if (producerIdAndEpoch != null && !batch.hasSequence()) {
-// If the producer id/epoch of the partition do not 
match the latest one
-// of the producer, we update it and reset the 
sequence. This should be
-// only done when all its in-flight batches have 
completed. This is guarantee
-// in `shouldStopDrainBatchesForPartition`.
-
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-// If the batch already has an assigned sequence, then 
we should not change the producer id and
-// sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-// may actually have been accepted, and if we change 
the producer id and sequence here, this
-// attempt will also be accepted, causing a duplicate.
-//
-// Additionally, we update the next sequence number 
bound for the partition, and also have
-// the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-// even if we receive out of order responses.
-batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-transactionManager.addInFlightBatch(batch);
-}
-batch.close();
-size += batch.records().sizeInBytes();
-ready.add(batch);
+// do the rest of the work by processing outside the lock
+// close() is particularly expensive
+batch = deque.pollFirst();
+}
 
-batch.drained(now);
-}
+boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+ProducerIdAndEpoch producerIdAndEpoch =
+transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+if (producerIdAndEpoch != null && !batch.hasSequence()) {
+// If the producer id/epoch of the partition do not match the 
latest one
+// of the producer, we update it and reset the sequence. This 
should be
+// only done when all its in-flight batches have completed. 
This is guarantee
+// in `shouldStopDrainBatchesForPartition`.
+
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+// If the batch already has an assigned sequence, then we 
should not change the producer id and
+// sequence number, since this may introduce duplicates. In 
particular, the previous attempt
+// may actually have been accepted, and if we change the 
producer id and sequence here, this
+// attempt will also be accepted, causing a duplicate.
+//
+// Additionally, we update the next sequence number bound for 
the partition, and also have
+// the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+// even if we receive out of order responses.
+batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);

Review comment:
   Correct, this application is jdk8. I'll have to find a jd

[GitHub] [kafka] guozhangwang commented on pull request #11631: [WIP] KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-02-03 Thread GitBox


guozhangwang commented on pull request #11631:
URL: https://github.com/apache/kafka/pull/11631#issuecomment-1029600849


   @showuon Sorry for getting late on this -- I thought it was not ready since 
the title still has `WIP` in it. I will re-title and continue reviewing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11631: [WIP] KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-02-03 Thread GitBox


showuon commented on pull request #11631:
URL: https://github.com/apache/kafka/pull/11631#issuecomment-1029598488


   @guozhangwang @ableegoldman @hachikuji , please take a look when available. 
This issue blocks some users when upgrading Kafka-clients. I think this should 
be fixed soon. 
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11703: KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names

2022-02-03 Thread GitBox


guozhangwang commented on a change in pull request #11703:
URL: https://github.com/apache/kafka/pull/11703#discussion_r799128273



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -171,42 +169,24 @@ private void registerMetrics() {
 @Deprecated
 private void initStoreSerde(final ProcessorContext context) {
 final String storeName = name();
-final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName);
-final String prefix = getPrefix(context.appConfigs(), 
context.applicationId());
+final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);

Review comment:
   Yes, but I'm wondering if the two types of flow are really necessary. It 
seems to me that the topic should have always been created at the `init` phase 
already, and hence it seems the null check is not necessary.
   
   @cadonna Would need your quick thoughts about this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-02-03 Thread GitBox


showuon commented on pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#issuecomment-1029590837


   @hachikuji  , I've updated the KIP to add tests. Please take a look again. 
Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-02-03 Thread GitBox


showuon commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r799127097



##
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##
@@ -168,7 +168,7 @@ object ConsoleProducer {
   .withRequiredArg
   .describedAs("request required acks")
   .ofType(classOf[java.lang.String])
-  .defaultsTo("1")
+  .defaultsTo("-1")

Review comment:
   Good suggestion! Sent a note in the KIP vote thread. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-03 Thread GitBox


ijuma commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799125694



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -578,41 +587,45 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 } else {
 if (shouldStopDrainBatchesForPartition(first, tp))
 break;
+}
 
-boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-ProducerIdAndEpoch producerIdAndEpoch =
-transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-ProducerBatch batch = deque.pollFirst();
-if (producerIdAndEpoch != null && !batch.hasSequence()) {
-// If the producer id/epoch of the partition do not 
match the latest one
-// of the producer, we update it and reset the 
sequence. This should be
-// only done when all its in-flight batches have 
completed. This is guarantee
-// in `shouldStopDrainBatchesForPartition`.
-
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-// If the batch already has an assigned sequence, then 
we should not change the producer id and
-// sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-// may actually have been accepted, and if we change 
the producer id and sequence here, this
-// attempt will also be accepted, causing a duplicate.
-//
-// Additionally, we update the next sequence number 
bound for the partition, and also have
-// the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-// even if we receive out of order responses.
-batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-transactionManager.addInFlightBatch(batch);
-}
-batch.close();
-size += batch.records().sizeInBytes();
-ready.add(batch);
+// do the rest of the work by processing outside the lock
+// close() is particularly expensive
+batch = deque.pollFirst();
+}
 
-batch.drained(now);
-}
+boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+ProducerIdAndEpoch producerIdAndEpoch =
+transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+if (producerIdAndEpoch != null && !batch.hasSequence()) {
+// If the producer id/epoch of the partition do not match the 
latest one
+// of the producer, we update it and reset the sequence. This 
should be
+// only done when all its in-flight batches have completed. 
This is guarantee
+// in `shouldStopDrainBatchesForPartition`.
+
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+// If the batch already has an assigned sequence, then we 
should not change the producer id and
+// sequence number, since this may introduce duplicates. In 
particular, the previous attempt
+// may actually have been accepted, and if we change the 
producer id and sequence here, this
+// attempt will also be accepted, causing a duplicate.
+//
+// Additionally, we update the next sequence number bound for 
the partition, and also have
+// the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+// even if we receive out of order responses.
+batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);

Review comment:
   Can you please test with Java 11 or newer? Looks like you tes

[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-03 Thread GitBox


ijuma commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799125694



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -578,41 +587,45 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 } else {
 if (shouldStopDrainBatchesForPartition(first, tp))
 break;
+}
 
-boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-ProducerIdAndEpoch producerIdAndEpoch =
-transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-ProducerBatch batch = deque.pollFirst();
-if (producerIdAndEpoch != null && !batch.hasSequence()) {
-// If the producer id/epoch of the partition do not 
match the latest one
-// of the producer, we update it and reset the 
sequence. This should be
-// only done when all its in-flight batches have 
completed. This is guarantee
-// in `shouldStopDrainBatchesForPartition`.
-
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-// If the batch already has an assigned sequence, then 
we should not change the producer id and
-// sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-// may actually have been accepted, and if we change 
the producer id and sequence here, this
-// attempt will also be accepted, causing a duplicate.
-//
-// Additionally, we update the next sequence number 
bound for the partition, and also have
-// the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-// even if we receive out of order responses.
-batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-transactionManager.addInFlightBatch(batch);
-}
-batch.close();
-size += batch.records().sizeInBytes();
-ready.add(batch);
+// do the rest of the work by processing outside the lock
+// close() is particularly expensive
+batch = deque.pollFirst();
+}
 
-batch.drained(now);
-}
+boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+ProducerIdAndEpoch producerIdAndEpoch =
+transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+if (producerIdAndEpoch != null && !batch.hasSequence()) {
+// If the producer id/epoch of the partition do not match the 
latest one
+// of the producer, we update it and reset the sequence. This 
should be
+// only done when all its in-flight batches have completed. 
This is guarantee
+// in `shouldStopDrainBatchesForPartition`.
+
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+// If the batch already has an assigned sequence, then we 
should not change the producer id and
+// sequence number, since this may introduce duplicates. In 
particular, the previous attempt
+// may actually have been accepted, and if we change the 
producer id and sequence here, this
+// attempt will also be accepted, causing a duplicate.
+//
+// Additionally, we update the next sequence number bound for 
the partition, and also have
+// the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+// even if we receive out of order responses.
+batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);

Review comment:
   Can you please test with Java 11 or newer? Looks like you tes

[GitHub] [kafka] ijuma commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-03 Thread GitBox


ijuma commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799124172



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -392,12 +392,33 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+
+// magic sequence of numbers that produces a function equivalent to 
this lookup
+// table, where the index in the lookup table is provided by the 
number of
+// leading zeros of the value, and the result is the number of bytes 
used
+// in the output
+
+// see the test cases as well to verify the implementation matches the 
prior
+// for-loop logic
+
+// final static byte[] LEADING_ZEROS_TO_U_VARINT_SIZE = new byte[] {
+// // 32 bits, and each 7-bits adds one byte to the output
+// 5, 5, 5, 5, // 32
+// 4, 4, 4, 4, 4, 4, 4, // 28
+// 3, 3, 3, 3, 3, 3, 3, // 21
+// 2, 2, 2, 2, 2, 2, 2, // 14
+// 1, 1, 1, 1, 1, 1, 1, // 7
+// 1 // 0
+// };
+
+// this is the core logic, but the Java encoding is suboptimal when we 
have a narrow
+// range of integers, so we can do better here
+
+// return (38 - leadingZeros) / 7 + leadingZeros / 32;
+
+int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 
0b10010010010010011) >>> 19;
+return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5);

Review comment:
   Very nice. Regarding the comment, do we need the lookup table? Or can we 
explain it based on the core logic comment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] artemlivshits commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-03 Thread GitBox


artemlivshits commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799117854



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -578,41 +587,45 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 } else {
 if (shouldStopDrainBatchesForPartition(first, tp))
 break;
+}
 
-boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-ProducerIdAndEpoch producerIdAndEpoch =
-transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-ProducerBatch batch = deque.pollFirst();
-if (producerIdAndEpoch != null && !batch.hasSequence()) {
-// If the producer id/epoch of the partition do not 
match the latest one
-// of the producer, we update it and reset the 
sequence. This should be
-// only done when all its in-flight batches have 
completed. This is guarantee
-// in `shouldStopDrainBatchesForPartition`.
-
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-// If the batch already has an assigned sequence, then 
we should not change the producer id and
-// sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-// may actually have been accepted, and if we change 
the producer id and sequence here, this
-// attempt will also be accepted, causing a duplicate.
-//
-// Additionally, we update the next sequence number 
bound for the partition, and also have
-// the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-// even if we receive out of order responses.
-batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-transactionManager.addInFlightBatch(batch);
-}
-batch.close();
-size += batch.records().sizeInBytes();
-ready.add(batch);
+// do the rest of the work by processing outside the lock
+// close() is particularly expensive
+batch = deque.pollFirst();
+}
 
-batch.drained(now);
-}
+boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+ProducerIdAndEpoch producerIdAndEpoch =
+transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+if (producerIdAndEpoch != null && !batch.hasSequence()) {
+// If the producer id/epoch of the partition do not match the 
latest one
+// of the producer, we update it and reset the sequence. This 
should be
+// only done when all its in-flight batches have completed. 
This is guarantee
+// in `shouldStopDrainBatchesForPartition`.
+
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+// If the batch already has an assigned sequence, then we 
should not change the producer id and
+// sequence number, since this may introduce duplicates. In 
particular, the previous attempt
+// may actually have been accepted, and if we change the 
producer id and sequence here, this
+// attempt will also be accepted, causing a duplicate.
+//
+// Additionally, we update the next sequence number bound for 
the partition, and also have
+// the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+// even if we receive out of order responses.
+batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);

Review comment:
   Moving `close` outside of locked scope LGTM




-- 
T

[GitHub] [kafka] artemlivshits commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-03 Thread GitBox


artemlivshits commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799111359



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   Nice!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-03 Thread GitBox


jasonk000 commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799107208



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -578,41 +587,45 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 } else {
 if (shouldStopDrainBatchesForPartition(first, tp))
 break;
+}
 
-boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-ProducerIdAndEpoch producerIdAndEpoch =
-transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-ProducerBatch batch = deque.pollFirst();
-if (producerIdAndEpoch != null && !batch.hasSequence()) {
-// If the producer id/epoch of the partition do not 
match the latest one
-// of the producer, we update it and reset the 
sequence. This should be
-// only done when all its in-flight batches have 
completed. This is guarantee
-// in `shouldStopDrainBatchesForPartition`.
-
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-// If the batch already has an assigned sequence, then 
we should not change the producer id and
-// sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-// may actually have been accepted, and if we change 
the producer id and sequence here, this
-// attempt will also be accepted, causing a duplicate.
-//
-// Additionally, we update the next sequence number 
bound for the partition, and also have
-// the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-// even if we receive out of order responses.
-batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-transactionManager.addInFlightBatch(batch);
-}
-batch.close();
-size += batch.records().sizeInBytes();
-ready.add(batch);
+// do the rest of the work by processing outside the lock
+// close() is particularly expensive
+batch = deque.pollFirst();
+}
 
-batch.drained(now);
-}
+boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+ProducerIdAndEpoch producerIdAndEpoch =
+transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+if (producerIdAndEpoch != null && !batch.hasSequence()) {
+// If the producer id/epoch of the partition do not match the 
latest one
+// of the producer, we update it and reset the sequence. This 
should be
+// only done when all its in-flight batches have completed. 
This is guarantee
+// in `shouldStopDrainBatchesForPartition`.
+
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+// If the batch already has an assigned sequence, then we 
should not change the producer id and
+// sequence number, since this may introduce duplicates. In 
particular, the previous attempt
+// may actually have been accepted, and if we change the 
producer id and sequence here, this
+// attempt will also be accepted, causing a duplicate.
+//
+// Additionally, we update the next sequence number bound for 
the partition, and also have
+// the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+// even if we receive out of order responses.
+batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);

Review comment:
   I've addressed this in 34008bf.
   
   Below is flamegrap

[jira] [Commented] (KAFKA-7572) Producer should not send requests with negative partition id

2022-02-03 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486780#comment-17486780
 ] 

Guozhang Wang commented on KAFKA-7572:
--

Thanks for pinging me. I've just made a pass on the PR.

> Producer should not send requests with negative partition id
> 
>
> Key: KAFKA-7572
> URL: https://issues.apache.org/jira/browse/KAFKA-7572
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1, 1.1.1
>Reporter: Yaodong Yang
>Assignee: Wenhao Ji
>Priority: Major
>  Labels: patch-available
>
> h3. Issue:
> In one Kafka producer log from our users, we found the following weird one:
> timestamp="2018-10-09T17:37:41,237-0700",level="ERROR", Message="Write to 
> Kafka failed with: ",exception="java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topicName--2: 30042 ms has passed since batch creation plus linger time
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for topicName--2: 30042 ms has passed since batch creation plus 
> linger time"
> After a few hours debugging, we finally understood the root cause of this 
> issue:
>  # The producer used a buggy custom Partitioner, which sometimes generates 
> negative partition ids for new records.
>  # The corresponding produce requests were rejected by brokers, because it's 
> illegal to have a partition with a negative id.
>  # The client kept refreshing its local cluster metadata, but could not send 
> produce requests successfully.
>  # From the above log, we found a suspicious string "topicName--2":
>  # According to the source code, the format of this string in the log is 
> TopicName+"-"+PartitionId.
>  # It's not easy to notice that there were 2 consecutive dash in the above 
> log.
>  # Eventually, we found that the second dash was a negative sign. Therefore, 
> the partition id is -2, rather than 2.
>  # The bug the custom Partitioner.
> h3. Proposal:
>  # Producer code should check the partitionId before sending requests to 
> brokers.
>  # If there is a negative partition Id, just throw an IllegalStateException{{ 
> }}exception.
>  # Such a quick check can save lots of time for people debugging their 
> producer code.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id

2022-02-03 Thread GitBox


guozhangwang commented on pull request #10525:
URL: https://github.com/apache/kafka/pull/10525#issuecomment-1029565310


   @predatorray Thanks for the PR. Just following @showuon 's comment, I 
thought about the existing `InvalidPartitionsException`, but that has some 
issues to be used by the producer, since it is currently extending 
`ApiException` which is used by the admin / raft controller as returned by the 
brokers, and using it in the producer would also be considered a public change. 
So I think for now just throwing the non-checked `illegal-argument` as a fatal 
error to crash the producer directly is fine.
   
   The PR needs some rebasing at the moment, otherwise it looks good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang closed pull request #5858: KAFKA-7572: Producer should not send requests with negative partition id

2022-02-03 Thread GitBox


guozhangwang closed pull request #5858:
URL: https://github.com/apache/kafka/pull/5858


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ahuang98 opened a new pull request #11732: MINOR: enable KRaft in ConfigCommandTest

2022-02-03 Thread GitBox


ahuang98 opened a new pull request #11732:
URL: https://github.com/apache/kafka/pull/11732


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-03 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799084697



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   @artemlivshits i've updated the implementation in f040109ba.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-03 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799082813



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   OK, got it. Thank you for the nudges. They are now effectively identical 
in performance, and the math-version uses 1 less cmp/mov (for array bounds & 
fetch), with one extra instruction, and no requirement to hit the cache.
   
   ```
   Benchmark
 Mode  Cnt   Score  Error  Units
   ByteUtilsBenchmark.testSizeOfUnsignedVarint  
thrpt   20  503633.665 ± 5214.083 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:IPC  
thrpt2   4.608 insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:L1-dcache-loads  
thrpt2  13.002  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:branches 
thrpt2   5.005  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:instructions 
thrpt2  31.080  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath  
thrpt   20  507438.241 ± 5268.695 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:IPC  
thrpt2   4.789 insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:L1-dcache-loads  
thrpt2  11.992  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:branches 
thrpt2   4.004  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:instructions 
thrpt2  32.059  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal  
thrpt   20  371946.626 ± 3105.947 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:IPC  
thrpt2   5.360 insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:L1-dcache-loads  
thrpt2  14.002  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:branches 
thrpt2   7.992  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:instructions 
thrpt2  48.931  #/op
   ```
   
   I'll switch the code over.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-03 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799082813



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   OK, got it. Thank you for the nudges. They are now effectively identical 
in performance, and the math-version uses 1 less cmp/mov (for array bounds & 
fetch), with one extra instruction, and no requirement to hit the cache.
   
   ```
   Benchmark
 Mode  Cnt   Score  Error  Units
   ByteUtilsBenchmark.testSizeOfUnsignedVarint  
thrpt   20  503633.665 ± 5214.083 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:IPC  
thrpt2   4.608 insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:L1-dcache-loads  
thrpt2  13.002  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:branches 
thrpt2   5.005  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:instructions 
thrpt2  31.080  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath  
thrpt   20  507438.241 ± 5268.695 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:IPC  
thrpt2   4.789 insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:L1-dcache-loads  
thrpt2  11.992  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:branches 
thrpt2   4.004  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:instructions 
thrpt2  32.059  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal  
thrpt   20  371946.626 ± 3105.947 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:IPC  
thrpt2   5.360 insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:L1-dcache-loads  
thrpt2  14.002  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:branches 
thrpt2   7.992  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:instructions 
thrpt2  48.931  #/op
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-03 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799082813



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   OK, got it. Thank you for the nudges. They are now effectively identical 
in performance, and the math-version uses 1 less cmp/mov (for array bounds & 
fetch), with one extra instruction, and no requirement to hit the cache.
   
   ```
   Benchmark
 Mode  Cnt   Score  Error  Units
   ByteUtilsBenchmark.testSizeOfUnsignedVarint  
thrpt   20  503633.665 ± 5214.083 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:IPC  
thrpt2   4.608 insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:L1-dcache-loads  
thrpt2  13.002  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:branches 
thrpt2   5.005  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarint:instructions 
thrpt2  31.080  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath  
thrpt   20  507438.241 ± 5268.695 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:CPI  
thrpt2   0.209 clks/insn
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:L1-dcache-loads  
thrpt2  11.992  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:branches 
thrpt2   4.004  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMath:instructions 
thrpt2  32.059  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal  
thrpt   20  371946.626 ± 3105.947 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:IPC  
thrpt2   5.360 insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:L1-dcache-loads  
thrpt2  14.002  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:branches 
thrpt2   7.992  #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginal:instructions 
thrpt2  48.931  #/op
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (KAFKA-13619) zookeeper.sync.time.ms is no longer used

2022-02-03 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita closed KAFKA-13619.
--

> zookeeper.sync.time.ms is no longer used
> 
>
> Key: KAFKA-13619
> URL: https://issues.apache.org/jira/browse/KAFKA-13619
> Project: Kafka
>  Issue Type: Bug
>  Components: core, documentation
>Affects Versions: 2.0.0, 3.1.0
>Reporter: Tomonari Yamashita
>Assignee: Tomonari Yamashita
>Priority: Minor
>
> - zookeeper.sync.time.ms is no longer used. But it is present in the 
> documentation (1)
>  -- The implementation and documentation of zookeeper.sync.time.ms should be 
> removed.
>  -- As far as I can see, it was already out of use by v2.0.0.
>  - I've submitted pull request : " KAFKA-13619: zookeeper.sync.time.ms is no 
> longer used [#11717|https://github.com/apache/kafka/pull/11717]";
> (1) 
> [https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-03 Thread Tim Patterson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486744#comment-17486744
 ] 

Tim Patterson commented on KAFKA-13600:
---

Thanks  [~cadonna]
yeah I had a bit of a think and I think you're right, it also gets a bit weird 
when dealing with replicas..
Like you almost have to decide what's "caught up", assign the actives, remove 
those node/partitions from the candidates and then recalc for the replicas to 
handle the case where theres a large gab between the most caught up and second 
most caught up.

[~vvcephei] 
> Is the situation that there's an active that's happens to be processing quite 
> a bit ahead of the replicas, such that when the active goes offline, there's 
> no "caught-up" node, and instead of failing the task over to the 
> least-lagging node, we just assign it to a fresh node
Yeah that's it!.
Although what we also hit a couple of times is a variation on clusters with no 
replicas where the active is restarted but its failed to locally checkpoint in 
a couple of minutes, when it comes back up its seen as not being caught up and 
so the task is assigned to a fresh(ish) node
(of course this only occurs when the cluster is already wanting to move that 
task to a new home due to a node being added/removed recently)



One thing I haven't really taken into consideration/thought about is clusters 
with more than one replica, I'm not entirely convinced it works there although 
the unit tests do pass.
 

Did you mean submit as is,  or to create a minimal PR where I only try to 
address that flaw you've identified here
[https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baL92-L96]

I can certainly have a go at that (it was a few months ago that I patched this 
so it might take me a bit to wrap my head around it again lol).

Thanks
Tim

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Priority: Major
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-03 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799031745



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   I can get a decent bump in performance, almost to lookup table 
performance, by swapping the divide by 32 for a direct shift right. I'll need 
to work out the right incantation of casts to get a divide by 7 through shifts 
to give the correct answer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-03 Thread GitBox


jasonk000 commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799030101



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -578,41 +587,45 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 } else {
 if (shouldStopDrainBatchesForPartition(first, tp))
 break;
+}
 
-boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-ProducerIdAndEpoch producerIdAndEpoch =
-transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-ProducerBatch batch = deque.pollFirst();
-if (producerIdAndEpoch != null && !batch.hasSequence()) {
-// If the producer id/epoch of the partition do not 
match the latest one
-// of the producer, we update it and reset the 
sequence. This should be
-// only done when all its in-flight batches have 
completed. This is guarantee
-// in `shouldStopDrainBatchesForPartition`.
-
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-// If the batch already has an assigned sequence, then 
we should not change the producer id and
-// sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-// may actually have been accepted, and if we change 
the producer id and sequence here, this
-// attempt will also be accepted, causing a duplicate.
-//
-// Additionally, we update the next sequence number 
bound for the partition, and also have
-// the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-// even if we receive out of order responses.
-batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-transactionManager.addInFlightBatch(batch);
-}
-batch.close();
-size += batch.records().sizeInBytes();
-ready.add(batch);
+// do the rest of the work by processing outside the lock
+// close() is particularly expensive
+batch = deque.pollFirst();
+}
 
-batch.drained(now);
-}
+boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+ProducerIdAndEpoch producerIdAndEpoch =
+transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+if (producerIdAndEpoch != null && !batch.hasSequence()) {
+// If the producer id/epoch of the partition do not match the 
latest one
+// of the producer, we update it and reset the sequence. This 
should be
+// only done when all its in-flight batches have completed. 
This is guarantee
+// in `shouldStopDrainBatchesForPartition`.
+
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+// If the batch already has an assigned sequence, then we 
should not change the producer id and
+// sequence number, since this may introduce duplicates. In 
particular, the previous attempt
+// may actually have been accepted, and if we change the 
producer id and sequence here, this
+// attempt will also be accepted, causing a duplicate.
+//
+// Additionally, we update the next sequence number bound for 
the partition, and also have
+// the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+// even if we receive out of order responses.
+batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);

Review comment:
   OK, this makes sense. I agree it makes sense to push this

[GitHub] [kafka] dongjinleekr commented on pull request #11720: KAFKA-13625: Fix inconsistency in dynamic application log levels

2022-02-03 Thread GitBox


dongjinleekr commented on pull request #11720:
URL: https://github.com/apache/kafka/pull/11720#issuecomment-1029440873


   Hi @dajac,
   
   Oh yes. As I mentioned in 
[KIP-817](https://cwiki.apache.org/confluence/display/KAFKA/KIP-817%3A+Fix+inconsistency+in+dynamic+application+log+levels),
 I already checked 
[KIP-412](https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels).
 But IMHO, I think omitting `OFF` was a mistake; As you can see in the 
[motivation section in the 
proposal](https://cwiki.apache.org/confluence/display/KAFKA/KIP-817%3A+Fix+inconsistency+in+dynamic+application+log+levels),
 I found that `OFF` is helpful when debugging, for the loggers like 
`kafka.request.logger`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] artemlivshits commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-03 Thread GitBox


artemlivshits commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r798988451



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long 
nowMs) {
 boolean exhausted = this.free.queued() > 0;
 for (Map.Entry> entry : 
this.batches.entrySet()) {
 Deque deque = entry.getValue();
+
+final ProducerBatch batch;
+final long waitedTimeMs;
+final boolean backingOff;
+final boolean full;
+
+// Collect as little as possible inside critical region, determine 
outcome after release
 synchronized (deque) {
-// When producing to a large number of partitions, this path 
is hot and deques are often empty.
-// We check whether a batch exists first to avoid the more 
expensive checks whenever possible.
-ProducerBatch batch = deque.peekFirst();
-if (batch != null) {
-TopicPartition part = entry.getKey();
-Node leader = cluster.leaderFor(part);
-if (leader == null) {
-// This is a partition for which leader is not known, 
but messages are available to send.
-// Note that entries are currently not removed from 
batches when deque is empty.
-unknownLeaderTopics.add(part.topic());
-} else if (!readyNodes.contains(leader) && !isMuted(part)) 
{
-long waitedTimeMs = batch.waitedTimeMs(nowMs);
-boolean backingOff = batch.attempts() > 0 && 
waitedTimeMs < retryBackoffMs;
-long timeToWaitMs = backingOff ? retryBackoffMs : 
lingerMs;
-boolean full = deque.size() > 1 || batch.isFull();
-boolean expired = waitedTimeMs >= timeToWaitMs;
-boolean transactionCompleting = transactionManager != 
null && transactionManager.isCompleting();
-boolean sendable = full
-|| expired
-|| exhausted
-|| closed
-|| flushInProgress()
-|| transactionCompleting;
-if (sendable && !backingOff) {
-readyNodes.add(leader);
-} else {
-long timeLeftMs = Math.max(timeToWaitMs - 
waitedTimeMs, 0);
-// Note that this results in a conservative 
estimate since an un-sendable partition may have
-// a leader that will later be found to have 
sendable data. However, this is good enough
-// since we'll just wake up and then sleep again 
for the remaining time.
-nextReadyCheckDelayMs = Math.min(timeLeftMs, 
nextReadyCheckDelayMs);
-}
-}
+batch = deque.peekFirst();

Review comment:
   Changes in the `ready` function LGTM

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -578,41 +587,45 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 } else {
 if (shouldStopDrainBatchesForPartition(first, tp))
 break;
+}
 
-boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-ProducerIdAndEpoch producerIdAndEpoch =
-transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-ProducerBatch batch = deque.pollFirst();
-if (producerIdAndEpoch != null && !batch.hasSequence()) {
-// If the producer id/epoch of the partition do not 
match the latest one
-// of the producer, we update it and reset the 
sequence. This should be
-// only done when all its in-flight batches have 
completed. This is guarantee
-// in `shouldStopDrainBatchesForPartition`.
-
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-// If the batch already has an assigned sequence, then 
we should not change the producer id and
-// sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-// may actually have been accepted, and if we change 
the producer id and sequence here, this
-// attempt will also be accepted, causing a duplicate.
-

[jira] [Commented] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-03 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486714#comment-17486714
 ] 

John Roesler commented on KAFKA-13600:
--

Hi [~tim.patterson] , thanks for the report and the patch!

 

It sounds like you're reporting two things here:
 # a bug around the acceptable recovery lag.
 # an improvement on assignment balance

If we can discuss those things independently, then we can definitely merge the 
bugfix immediately. Depending on the impact of the improvement, it might also 
fall into the category of a simple ticket, or it might be more appropriate to 
have a KIP as [~cadonna] suggested.

Regarding the bug, I find it completely plausible that we have a bug, but I 
have to confess that I'm not 100% sure I understand the report. Is the 
situation that there's an active that's happens to be processing quite a bit 
ahead of the replicas, such that when the active goes offline, there's no 
"caught-up" node, and instead of failing the task over to the least-lagging 
node, we just assign it to a fresh node? If that's it, then it is certainly not 
the desired behavior.

The notion of acceptableRecoveryLag was introduced because follower replicas 
will always lag the active task, by definition. We want task ownership to be 
able to swap over from the active to a warm-up when it's caught up, but it will 
never be 100% caught up (because it is a follower until it takes over). 
acceptableRecoveryLag is a way to define a small amount of lag that 
"acceptable" so that when a warm-up is only lagging by that amount, we can 
consider it to be effectively caught up and move the active to the warm-up node.

As you can see, this has nothing at all to do with which nodes are eligible to 
take over when an active exits the cluster. In that case, it was always the 
intent that the most-caught-up node should take over active processing, 
regardless of its lag.

I've been squinting at our existing code, and also your patch 
([https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1)]
 . It looks to me like the flaw in the existing implementation is essentially 
just here:

[https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baL92-L96]
{code:java}
// if the desired client is not caught up, and there is another client that 
_is_ caught up, then
// we schedule a movement, so we can move the active task to the caught-up 
client. We'll try to
// assign a warm-up to the desired client so that we can move it later on.{code}
which should indeed be just like what you described:
{code:java}
// if the desired client is not caught up, and there is another client that 
_is_ more caught up,
// then we schedule a movement [to] move the active task to the [most] 
caught-up client. 
// We'll try to assign a warm-up to the desired client so that we can move it 
later on.{code}
On the other hand, we should not lose this important predicate to determine 
whether a task is considered "caught up:

[https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-e50a755ba2a4d2f7306d1016d079018cba22f9f32993ef5dd64408d1a94d79acL245]
{code:java}
activeRunning(taskLag) || unbounded(acceptableRecoveryLag) || 
acceptable(acceptableRecoveryLag, taskLag) {code}
This captures a couple of subtleties in addition to the obvious "a task is 
caught up if it's under the acceptable recovery lag":
 # A running, active task doesn't have a real lag at all, but instead its "lag" 
is the sentinel value `-2`
 # You can disable the "warm up" phase completely by setting 
acceptableRecoveryLag to `Long.MAX_VALUE`, in which case, we ignore lags 
completely and consider all nodes to be caught up, even if they didn't report a 
lag at all.

 

One extra thing I like about your patch is this:

[https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baR54-R56]
{code:java}
// Even if there is a more caught up client, as long as we're within allowable 
lag then
// its best just to stick with what we've got {code}
I agree that, if two nodes are within the acceptableRecoveryLag of each other, 
we should consider their lags to be effectively the same. That's something I 
wanted to do when we wrote this code, but couldn't figure out a good way to do 
it.

 

One thing I'd need more time on is the TaskMovementTest. At first glance, it 
looks like those changes are just about the slightly different method 
signature, but I'd want to be very sure that we're still testing the same 
invariants that we wanted to test.

Would you be willing to submit this bugfix as a PR so that we can formally 
review and merge it?

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 

[GitHub] [kafka] dajac commented on pull request #11720: KAFKA-13625: Fix inconsistency in dynamic application log levels

2022-02-03 Thread GitBox


dajac commented on pull request #11720:
URL: https://github.com/apache/kafka/pull/11720#issuecomment-1029400036


   @dongjinleekr Thanks for the PR. It seems that OFF was intentionally left 
out in 
[KIP-412](https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels).
 See the rejected alternative section. Do you think that we should change our 
mind?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-03 Thread GitBox


guozhangwang commented on pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#issuecomment-1029397689


   > If by "first place" you mean 
https://github.com/apache/kafka/pull/11424#discussion_r796139442, then no, 
other way around actually. The TopologyConfig bug is the one that actually 
breaks compatibility, the TTD one actually doesn't really do anything -- if I 
understand the TTD correctly. See 
https://github.com/apache/kafka/pull/11424#discussion_r798443526
   
   Crystal! Thanks for the clarification @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11672: KAFKA-13577: Replace easymock with mockito in kafka:core - part 1

2022-02-03 Thread GitBox


mimaison commented on pull request #11672:
URL: https://github.com/apache/kafka/pull/11672#issuecomment-1029397144


   Thanks @tombentley. I've pushed an update that improves coverage of all 
mocks whenever possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11672: KAFKA-13577: Replace easymock with mockito in kafka:core - part 1

2022-02-03 Thread GitBox


mimaison commented on a change in pull request #11672:
URL: https://github.com/apache/kafka/pull/11672#discussion_r798964433



##
File path: core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
##
@@ -134,36 +129,33 @@ class InterBrokerSendThreadTest {
 
 val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler)
 
-EasyMock.expect(networkClient.newClientRequest(
-  EasyMock.eq("1"),
-  EasyMock.same(handler.request),
-  EasyMock.anyLong(),
-  EasyMock.eq(true),
-  EasyMock.eq(requestTimeoutMs),
-  EasyMock.same(handler.handler)))
-  .andReturn(clientRequest)
+when(networkClient.newClientRequest(
+  ArgumentMatchers.eq("1"),
+  same(handler.request),
+  anyLong(),
+  ArgumentMatchers.eq(true),
+  ArgumentMatchers.eq(requestTimeoutMs),
+  same(handler.handler)))
+  .thenReturn(clientRequest)
 
-EasyMock.expect(networkClient.ready(node, time.milliseconds()))
-  .andReturn(false)
+when(networkClient.ready(node, time.milliseconds()))
+  .thenReturn(false)
 
-EasyMock.expect(networkClient.connectionDelay(EasyMock.anyObject(), 
EasyMock.anyLong()))
-  .andReturn(0)
+when(networkClient.connectionDelay(any[Node], anyLong()))
+  .thenReturn(0)
 
-EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
-  .andReturn(new util.ArrayList())
+when(networkClient.poll(anyLong(), anyLong()))
+  .thenReturn(new util.ArrayList[ClientResponse]())
 
-EasyMock.expect(networkClient.connectionFailed(node))
-  .andReturn(true)
+   when(networkClient.connectionFailed(node))
+  .thenReturn(true)
 
-EasyMock.expect(networkClient.authenticationException(node))
-  .andReturn(new AuthenticationException(""))
-
-EasyMock.replay(networkClient)
+when(networkClient.authenticationException(node))
+  .thenReturn(new AuthenticationException(""))
 
 sendThread.enqueue(handler)
 sendThread.doWork()
 
-EasyMock.verify(networkClient)

Review comment:
   Yeah it's tricky to get the exact same coverage with Mockito. I've tried 
to verify all the important calls, especially on the component being tested, 
and whenever possible verify all the mocks. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-03 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13600:


Assignee: (was: John Roesler)

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Priority: Major
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-03 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13600:


Assignee: John Roesler

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Assignee: John Roesler
>Priority: Major
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-03 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13600:
-
Affects Version/s: 3.1.0

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Assignee: John Roesler
>Priority: Major
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on pull request #11447: KAFKA-13024

2022-02-03 Thread GitBox


guozhangwang commented on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-1029335322


   > I might be missing the full picture here as I am a bit new to the project, 
but, as I see it, the algorithm I described above does not suffer from this 
issue. There, we essentially extract if (record.key() == null || record.value() 
== null) { code from the operations (e.g. 
https://github.com/apache/kafka/pull/11447/files#diff-ad3728ff5f32ed1e88e11e46c0de970bcce01c39767fd3ae666e4a0bfbd97be3L87-L104),
 except we also don't check for that condition more than necessary.
   
   My rationale is that, we do some optimizations at the end of the building 
phase which may move some repartition up in the topology etc, which would 
effectively change the topology. And hence, we would not know if all of the 
downstream operators after the repartition would be filtering null keys until 
the optimization that changed the topology is done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-03 Thread GitBox


guozhangwang commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r798901691



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##
@@ -329,7 +330,7 @@ private TopologyTestDriver(final InternalTopologyBuilder 
builder,
 
 final ThreadCache cache = new ThreadCache(
 logContext,
-Math.max(0, 
streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
+Math.max(0, 
streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)),

Review comment:
   > got it.. If i understood correctly, we need a new PR with all the 
changes in this PR and the new ones along with document changes, right?
   
   That sounds right to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13642) We should have a command-line tool + API to display fenced / unfenced / etc. brokers

2022-02-03 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13642:


 Summary: We should have a command-line tool + API to display 
fenced / unfenced / etc. brokers
 Key: KAFKA-13642
 URL: https://issues.apache.org/jira/browse/KAFKA-13642
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe


We should have a command-line tool + API to display fenced / unfenced / etc. 
brokers



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-03 Thread Ulrik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486529#comment-17486529
 ] 

Ulrik edited comment on KAFKA-13638 at 2/3/22, 6:28 PM:


Hi [~cadonna],

I forked the repo and added a test. Here's the commit 
https://github.com/ulejon/kafka/commit/fbf8d89231fb0201ee6a48767a8dcec4af8585f6

The test was very quick and took about 900 ms.

Running the test on the trunk branch took 6 seconds.


was (Author: JIRAUSER284585):
Hi [~cadonna],

I forked the repo and added a test. Here's the commit: 
[https://github.com/ulejon/kafka/commit/fbf8d89231fb0201ee6a48767a8dcec4af8585f6.]

The test was very quick and took about 900 ms.

Running the test on the trunk branch took 6 seconds.

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] cmccabe merged pull request #11729: MINOR: fix control plane listener + kraft error message

2022-02-03 Thread GitBox


cmccabe merged pull request #11729:
URL: https://github.com/apache/kafka/pull/11729


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters

2022-02-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486584#comment-17486584
 ] 

Matthias J. Sax commented on KAFKA-13641:
-

The Java API faces the same issue – it's hard from a backward compatibility POV 
to change the signature.

Also the input `null`-value question is tricky: atm, to avoid ambiguity, the 
runtime drops all join input records with `null`-value (also with `null`-key) 
as malformed. Thus, if you get a `null` you always know that it's a left/outer 
join call with no matching join record from the other side.

> Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
> -
>
> Key: KAFKA-13641
> URL: https://issues.apache.org/jira/browse/KAFKA-13641
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mohammad Yousuf Minhaj Zia
>Priority: Minor
>
> Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be 
> nullable, I am wondering if can wrap them around Scala `Option`.
> However, there is also the concern that the left hand side value can be null 
> in the case of tombstone messages, in which the `Option` semantics can be 
> misleading. I still feel this could be a useful feature in reducing the 
> number of `NullPointerExceptions`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters

2022-02-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13641:

Language: scala needs-kip  (was: scala)

> Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
> -
>
> Key: KAFKA-13641
> URL: https://issues.apache.org/jira/browse/KAFKA-13641
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mohammad Yousuf Minhaj Zia
>Priority: Minor
>
> Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be 
> nullable, I am wondering if can wrap them around Scala `Option`.
> However, there is also the concern that the left hand side value can be null 
> in the case of tombstone messages, in which the `Option` semantics can be 
> misleading. I still feel this could be a useful feature in reducing the 
> number of `NullPointerExceptions`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-03 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486578#comment-17486578
 ] 

Bruno Cadonna commented on KAFKA-13638:
---

[~Lejon]Thank you a lot for the testing! I think we can rule out RocksDB then.

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2022-02-03 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486576#comment-17486576
 ] 

Federico Valeri commented on KAFKA-12635:
-

I was able to reproduce the issue on Kafka 2.7.2 and 2.8.1, but not on 3.1.0.

State of the source cluster after producing/consuming 1mln records:

{code}
$ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9090 --describe --topic 
my-topic
Topic: my-topic PartitionCount: 3   ReplicationFactor: 3Configs: 
min.insync.replicas=2,message.format.version=2.8-IV1
Topic: my-topic Partition: 0Leader: 2   Replicas: 2,0,1 Isr: 
2,0,1
Topic: my-topic Partition: 1Leader: 0   Replicas: 0,1,2 Isr: 
0,1,2
Topic: my-topic Partition: 2Leader: 1   Replicas: 1,2,0 Isr: 
1,2,0

$ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9090 --describe 
--group my-group

Consumer group 'my-group' has no active members.

GROUP   TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  
   CONSUMER-ID HOSTCLIENT-ID
my-groupmy-topic0  332600  332600  0
   -   -   -
my-groupmy-topic1  335510  335510  0
   -   -   -
my-groupmy-topic2  331890  331890  0
   -   -   -
{code}

State of the target cluster after MM2 has done its job 
(sync.group.offsets.enabled = true, replication.policy.class = 
io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy):

{code}
$ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9093 --describe --topic 
my-topic
Topic: my-topic PartitionCount: 3   ReplicationFactor: 3Configs: 
min.insync.replicas=2,message.format.version=2.8-IV1
Topic: my-topic Partition: 0Leader: 3   Replicas: 3,4,5 Isr: 
3,4,5
Topic: my-topic Partition: 1Leader: 4   Replicas: 4,5,3 Isr: 
4,5,3
Topic: my-topic Partition: 2Leader: 5   Replicas: 5,3,4 Isr: 
5,3,4

$ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9093 --describe 
--group my-group

Consumer group 'my-group' has no active members.

GROUP   TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  
   CONSUMER-ID HOSTCLIENT-ID
my-groupmy-topic0  332600  0   
-332600 -   -   -
my-groupmy-topic1  335510  0   
-335510 -   -   -
my-groupmy-topic2  331890  0   
-331890 -   -   -
{code}

There is actually no need to set a custom value for retention.ms in order to 
trigger the issue.

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)

2022-02-03 Thread GitBox


mimaison commented on pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#issuecomment-1029158871


   @C0urante Thanks for the PR! It's really massive! Can we try to cut it in 
smaller chunks? That would make it a lot easier to review and allow merging bit 
by bit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-03 Thread Ulrik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486565#comment-17486565
 ] 

Ulrik commented on KAFKA-13638:
---

[~cadonna] Done. Average time for the test was the same as the other branch: 
around 900 ms

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13293) Support client reload of JKS/PEM certificates

2022-02-03 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486563#comment-17486563
 ] 

Elliot West edited comment on KAFKA-13293 at 2/3/22, 4:15 PM:
--

FWIW we've implemented a custom {{SslEngineFactory}} here: 
https://github.com/apache/kafka/pull/11731

Would this be more generally useful as an interim solution? Or is there process 
on the dynamic client configuration work?


was (Author: teabot):
FWIW we've implemented a custom {{SslEngineFactory}} here: 
https://github.com/apache/kafka/pull/11731

> Support client reload of JKS/PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Producer/Consumer clients do not currently automatically reload certificates 
> when the key stores were modified, or certificates expire. Currently one 
> supplies key chains when instantiating clients only - there is no mechanism 
> available to either directly reconfigure the client, or for the client to 
> observe changes to the original properties set reference used in 
> construction. Additionally, no work-arounds are documented that might given 
> users alternative strategies for dealing with expiring certificates. 
> Given that expiration and renewal of certificates is an industry standard 
> practice, it could be argued that the current client certificate 
> implementation is not fit for purpose. A mechanism should be provided such 
> that clients can automatically detect, load, and use updated key chains from 
> some abstracted source.
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client certs (perhaps 
> closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13293) Support client reload of JKS/PEM certificates

2022-02-03 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486563#comment-17486563
 ] 

Elliot West commented on KAFKA-13293:
-

FWIW we've implemented a custom {{SslEngineFactory}} here: 
https://github.com/apache/kafka/pull/11731

> Support client reload of JKS/PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Producer/Consumer clients do not currently automatically reload certificates 
> when the key stores were modified, or certificates expire. Currently one 
> supplies key chains when instantiating clients only - there is no mechanism 
> available to either directly reconfigure the client, or for the client to 
> observe changes to the original properties set reference used in 
> construction. Additionally, no work-arounds are documented that might given 
> users alternative strategies for dealing with expiring certificates. 
> Given that expiration and renewal of certificates is an industry standard 
> practice, it could be argued that the current client certificate 
> implementation is not fit for purpose. A mechanism should be provided such 
> that clients can automatically detect, load, and use updated key chains from 
> some abstracted source.
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client certs (perhaps 
> closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] teabot opened a new pull request #11731: KAFKA-13293: Reloading engine factory

2022-02-03 Thread GitBox


teabot opened a new pull request #11731:
URL: https://github.com/apache/kafka/pull/11731


   This PR adds an optional `SslEngineFactory` implementation that decorates 
and manages a delegate. It is able to recreate the delegate either on a 
schedule, or in response to a SSL configuration patch being applied. The 
purpose of this implementation is to permit the automatic reload of renewed 
client certificates.
   
   This implementation includes a supporting unit test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-03 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486557#comment-17486557
 ] 

Bruno Cadonna commented on KAFKA-13638:
---

[~Lejon] Could you also try with 
https://github.com/cadonna/kafka/tree/test-KAFKA-13638-6.27.3 that uses RocksDB 
6.27.3 with AK 2.8?

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-03 Thread Ulrik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486529#comment-17486529
 ] 

Ulrik edited comment on KAFKA-13638 at 2/3/22, 3:05 PM:


Hi [~cadonna],

I forked the repo and added a test. Here's the commit: 
[https://github.com/ulejon/kafka/commit/fbf8d89231fb0201ee6a48767a8dcec4af8585f6.]

The test was very quick and took about 900 ms.

Running the test on the trunk branch took 6 seconds.


was (Author: JIRAUSER284585):
Hi [~cadonna],

I forked the repo and added a test. Here's the commit: 
[https://github.com/ulejon/kafka/commit/fbf8d89231fb0201ee6a48767a8dcec4af8585f6.]

The test was very quick and took about 900 ms.

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] tombentley commented on a change in pull request #11672: KAFKA-13577: Replace easymock with mockito in kafka:core - part 1

2022-02-03 Thread GitBox


tombentley commented on a change in pull request #11672:
URL: https://github.com/apache/kafka/pull/11672#discussion_r796754101



##
File path: 
core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
##
@@ -85,19 +84,17 @@ class PartitionStateMachineTest {
 controllerContext.updatePartitionFullReplicaAssignment(partition, 
ReplicaAssignment(Seq(brokerId)))
 controllerContext.putPartitionState(partition, NewPartition)
 val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), 
controllerEpoch)
-EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition 
-> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
-  .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null, 
ResponseMetadata(0, 0
-
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
-  partition, leaderIsrAndControllerEpoch, 
replicaAssignment(Seq(brokerId)), isNew = true))
-
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
+when(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> 
leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
+  .thenReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null, 
ResponseMetadata(0, 0
 partitionStateMachine.handleStateChanges(
   partitions,
   OnlinePartition,
   Option(OfflinePartitionLeaderElectionStrategy(false))
 )
-EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
+verify(mockControllerBrokerRequestBatch).newBatch()
+
verify(mockControllerBrokerRequestBatch).addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+  partition, leaderIsrAndControllerEpoch, 
replicaAssignment(Seq(brokerId)), isNew = true)
+
verify(mockControllerBrokerRequestBatch).sendRequestsToBrokers(controllerEpoch)

Review comment:
   I know we mocked it, but shouldn't we also verify 
`createTopicPartitionStatesRaw` here?

##
File path: 
core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
##
@@ -165,22 +158,20 @@ class PartitionStateMachineTest {
 controllerContext.putPartitionLeadershipInfo(partition, 
leaderIsrAndControllerEpoch)
 
 val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
-EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
-  .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
+when(mockZkClient.getTopicPartitionStatesRaw(partitions))
+  .thenReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
 TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, 
ResponseMetadata(0, 0
 
 val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
 val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
-EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
-  .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
Right(updatedLeaderAndIsr)), Seq.empty))
-
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
-  partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, 
controllerEpoch), replicaAssignment(Seq(brokerId)), isNew = false))
-
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
+when(mockZkClient.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
+  .thenReturn(UpdateLeaderAndIsrResult(Map(partition -> 
Right(updatedLeaderAndIsr)), Seq.empty))
 
 partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Option(PreferredReplicaPartitionLeaderElectionStrategy))
-EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
+verify(mockControllerBrokerRequestBatch).newBatch()

Review comment:
   Again, verify the `...Raw()` call?

##
File path: 
core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
##
@@ -165,22 +158,20 @@ class PartitionStateMachineTest {
 controllerContext.putPartitionLeadershipInfo(partition, 
leaderIsrAndControllerEpoch)
 
 val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
-EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
-  .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
+when(mockZkClient.getTopicPartitionStatesRaw(partitions))
+  .thenReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
 TopicPartitionSta

[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-03 Thread Ulrik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486529#comment-17486529
 ] 

Ulrik commented on KAFKA-13638:
---

Hi [~cadonna],

I forked the repo and added a test. Here's the commit: 
[https://github.com/ulejon/kafka/commit/fbf8d89231fb0201ee6a48767a8dcec4af8585f6.]

The test was very quick and took about 900 ms.

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-03 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486510#comment-17486510
 ] 

Bruno Cadonna edited comment on KAFKA-13638 at 2/3/22, 2:09 PM:


Hi [~Lejon],

Could you try to run your test on the following branch?

https://github.com/cadonna/kafka/tree/test-KAFKA-13638

This is a 2.8 branch modified to run with RocksDB 6.19.3 the version used in AK 
3.0.0.


was (Author: cadonna):
Hi [~Lejon],

Could you try to run your test with on the following branch?

https://github.com/cadonna/kafka/tree/test-KAFKA-13638

This is a 2.8 branch modified to run with RocksDB 6.19.3 the version used in AK 
3.0.0.

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-03 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486510#comment-17486510
 ] 

Bruno Cadonna commented on KAFKA-13638:
---

Hi [~Lejon],

Could you try to run your test with on the following branch?

https://github.com/cadonna/kafka/tree/test-KAFKA-13638

This is a 2.8 branch modified to run with RocksDB 6.19.3 the version used in AK 
3.0.0.

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


Indupa edited a comment on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028979162


   Sure @dongjinleekr .Thank you so much for sharing documentation ,Sorry I was 
not aware that -Dlog4j.configuration is VM parameter its pointing to in this  
,so was confused with that Comment. I got it now.let me through the changes 
what i have made to use log4j2 properties by setting KAFKA_LOG4J_OPTS in 
detail,whether i have missed to update it any of the file and Will update 
you.Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


Indupa edited a comment on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028979162


   Sure @dongjinleekr .Thank you so much for sharing documentation ,Sorry I was 
not aware that -Dlog4j.configuration is VM parameter its pointing to ,so was 
confused with that Comment. I got it now.let me through the changes what i have 
made to use log4j2 properties by setting KAFKA_LOG4J_OPTS in detail,whether i 
have missed to update it any of the file and Will update you.Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


Indupa edited a comment on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028979162


   Sure @dongjinleekr .Thank you so much for sharing documentation ,Sorry I was 
not aware that -Dlog4j.configuration is VM parameter ,so was confused with that 
Comment. I got it now.let me through the changes what i have made to use log4j2 
properties by setting KAFKA_LOG4J_OPTS in detail,whether i have missed to 
update it any of the file and Will update you.Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


Indupa commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028979162


   Sure @dongjinleekr .Thank you so much for sharing documentation ,let me 
through the changes what i have made to use log4j2 properties by setting 
KAFKA_LOG4J_OPTS in detail,whether i have missed to update it any of the file. 
Will update you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Indupa removed a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


Indupa removed a comment on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028932370


   Where I could find it @dongjinleekr ..? And can you help me what 
is the use of VM parameter and why is it required? Because i 
Haven't heard of this Vm parameter


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


dongjinleekr commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028965316


   @Indupa Please refer [the 
documentation](https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+Upgrade+log4j+to+log4j2)
 and the log messages.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-03 Thread Indupriya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486434#comment-17486434
 ] 

Indupriya edited comment on KAFKA-9366 at 2/3/22, 12:46 PM:


Hi [~dongjin] 

Could you please help me any approximate dates for official release for latest 
log4j build..? 


was (Author: JIRAUSER284624):
Hi [~dongjin] 

Could you please help us any approximate dates for official release for latest 
log4j build..? 

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-03 Thread Indupriya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486434#comment-17486434
 ] 

Indupriya commented on KAFKA-9366:
--

Hi [~dongjin] 

Could you please help us any approximate dates for official release for latest 
log4j build..? 

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-03 Thread Indupriya (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-9366 ]


Indupriya deleted comment on KAFKA-9366:
--

was (Author: JIRAUSER284624):
Hi [~dongjin] , Con you please confirm. this issue  KAFKA-12399 will resolve 
all the critical vulnerabilities along with CVE-2019-17571 ..?

 

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-03 Thread Indupriya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486380#comment-17486380
 ] 

Indupriya edited comment on KAFKA-9366 at 2/3/22, 12:34 PM:


Hi [~dongjin] , Con you please confirm. this issue  KAFKA-12399 will resolve 
all the critical vulnerabilities along with CVE-2019-17571 ..?

 


was (Author: JIRAUSER284624):
Hi [~dongjin] ,  this issue  KAFKA-12399 will resolve all the critical 
vulnerabilities along with CVE-2019-17571 ..?

 

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-03 Thread Indupriya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486380#comment-17486380
 ] 

Indupriya edited comment on KAFKA-9366 at 2/3/22, 12:31 PM:


Hi [~dongjin] ,  this issue  KAFKA-12399 will resolve all the critical 
vulnerabilities along with CVE-2019-17571 ..?

 


was (Author: JIRAUSER284624):
Hi [~dongjin] , You mean The patch you have released ,will resolve all the 
critical vulnerabilities along with CVE-2019-17571 ..?

 

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-03 Thread Indupriya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486380#comment-17486380
 ] 

Indupriya edited comment on KAFKA-9366 at 2/3/22, 12:29 PM:


Hi [~dongjin] , You mean The patch you have released ,will resolve all the 
critical vulnerabilities along with CVE-2019-17571 ..?

 


was (Author: JIRAUSER284624):
Can Anyone suggests fix for this vulnerabilities in kafka by the use of 
log4j-1.2.17.?

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters

2022-02-03 Thread Mohammad Yousuf Minhaj Zia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486427#comment-17486427
 ] 

Mohammad Yousuf Minhaj Zia commented on KAFKA-13641:


If I am not wrong, the `ValueJoiner` is a java class which is sort of like a 
function signature with generic parameters `value1` and `value2`. What I would 
like is to create a Scala wrapper such that:

```

type ValueLeftJoiner[A, B, C) => (A, Option[B]) => C

type ValueOuterJoiner[A, B, C) => (Option[A], B) => C

type ValueInnerJoiner[A, B, C) => (A, B) => C

```

 

or to deal with tombstones just make both inputs optional with a central 
ValueJoiner:

 

```

type ValueJoiner[A, B, C) => (Option[A], Option[B]) => C

```

 

Unfortunately this makes things quite inconsistent with the rest of the 
codebase because a null could be returned or be a parameter virtually anywhere 
apart from joiners.

Wondering if we will ever making the explicitness of nullable fields better in 
Kafka?

> Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
> -
>
> Key: KAFKA-13641
> URL: https://issues.apache.org/jira/browse/KAFKA-13641
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mohammad Yousuf Minhaj Zia
>Priority: Minor
>
> Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be 
> nullable, I am wondering if can wrap them around Scala `Option`.
> However, there is also the concern that the left hand side value can be null 
> in the case of tombstone messages, in which the `Option` semantics can be 
> misleading. I still feel this could be a useful feature in reducing the 
> number of `NullPointerExceptions`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


Indupa commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028932370


   Where I could find it @dongjinleekr ..? And can you help me what 
is the use of VM parameter and why is it required? Because i 
Haven't heard of this Vm parameter


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11730: MINOR: Use ducktape version 0.7.17

2022-02-03 Thread GitBox


dajac merged pull request #11730:
URL: https://github.com/apache/kafka/pull/11730


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


dongjinleekr commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028885839


   @Indupa Which VM paramater are you using? -Dlog4j.configuration or 
-Dlog4j.configuration**File**?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-03 Thread GitBox


ableegoldman commented on pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#issuecomment-1028879209


   > My read on the code is that we only need to change the TopologyTestDriver, 
while the first place seems fine to me. Did I miss anything?
   
   If by "first place" you mean [the bug in the TopologyConfig 
class](https://github.com/apache/kafka/pull/11424#discussion_r796139442), then 
no, other way around actually. The TopologyConfig bug is the one that actually 
breaks compatibility, the TTD one actually doesn't really do anything -- if I 
understand the TTD correctly. See 
[this](https://github.com/apache/kafka/pull/11424#discussion_r798443526)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


Indupa commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028876857


   Hi @dongjinleekr , Yeah I refered this article earlier,but didn't get to 
know the soluton what they are suggesting like Dlog4j.configuration in VM 
Arguments.I have not seen where we are using this VM arguments.
   
How i can overcome from this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-03 Thread GitBox


vamossagar12 commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r798456223



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##
@@ -329,7 +330,7 @@ private TopologyTestDriver(final InternalTopologyBuilder 
builder,
 
 final ThreadCache cache = new ThreadCache(
 logContext,
-Math.max(0, 
streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
+Math.max(0, 
streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)),

Review comment:
   got it.. If i understood correctly, we need a new PR with all the 
changes in this PR and the new ones along with document changes, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-03 Thread GitBox


vamossagar12 commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r798454215



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##
@@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
 maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 log.info("Topology {} is overriding {} to {}", topologyName, 
BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
 } else {
-maxBufferedSize = 
globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+maxBufferedSize = 
globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+? 
globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : 
-1;
 }
 
-if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
-cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-log.info("Topology {} is overriding {} to {}", topologyName, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) ||
+isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+
+if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+log.info("Topology {} is using both {} and deprecated config 
{}. overriding {} to {}",
+topologyName,
+STATESTORE_CACHE_MAX_BYTES_CONFIG,
+CACHE_MAX_BYTES_BUFFERING_CONFIG,
+STATESTORE_CACHE_MAX_BYTES_CONFIG,
+cacheSize);
+} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+log.info("Topology {} is using deprecated config {}. 
overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+} else {
+cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+log.info("Topology {} is overriding {} to {}", topologyName, 
STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize);
+}
 } else {
-cacheSize = 
globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+cacheSize = 
globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);

Review comment:
   thanks @ableegoldman! i am running slightly occupied currently. But, I 
will make the changes in the next few days. As you said, i will introduce a new 
utility class and move these methods out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-03 Thread Indupriya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486380#comment-17486380
 ] 

Indupriya commented on KAFKA-9366:
--

Can Anyone suggests fix for this vulnerabilities in kafka by the use of 
log4j-1.2.17.?

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


dongjinleekr commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028870380


   Hi @Indupa,
   
   Please refer 
[here](https://stackoverflow.com/questions/66545546/error-statuslogger-reconfiguration-failed-no-configuration-found-for-73d16e93).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters

2022-02-03 Thread Kvicii.Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486378#comment-17486378
 ] 

Kvicii.Yu commented on KAFKA-13641:
---

[~yzia2000] 

hi, I didn't understand what you mean, or you can tell me class name#method 
name need to add Option?

> Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
> -
>
> Key: KAFKA-13641
> URL: https://issues.apache.org/jira/browse/KAFKA-13641
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mohammad Yousuf Minhaj Zia
>Priority: Minor
>
> Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be 
> nullable, I am wondering if can wrap them around Scala `Option`.
> However, there is also the concern that the left hand side value can be null 
> in the case of tombstone messages, in which the `Option` semantics can be 
> misleading. I still feel this could be a useful feature in reducing the 
> number of `NullPointerExceptions`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-03 Thread GitBox


Indupa commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028865528


   Hi @dongjinleekr , yeah I already have made changes to  use log4j2 
properties, set KAFKA_LOG4J_OPTS. But after that as well, as I sent in my 
previous comment, iam getting this following exception.
   
   2022-02-02 05:57:17.158 [INF] [Kafka] Connecting to localhost:2181
   2022-02-02 05:57:27.571 [INF] [Kafka] WATCHER::
   2022-02-02 05:57:27.571 [INF] [Kafka] WatchedEvent state:SyncConnected 
type:None path:null
   2022-02-02 05:57:27.574 [INF] [Kafka] []
   2022-02-02 05:58:17.227 [ERR] [Kafka] ERROR StatusLogger Reconfiguration 
failed: No configuration found for '764c12b6' at 'null' in 'null'.
   
   But still iam getting this exception,Kafka is not running.Can you help me 
with What is the cause of this exception..?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-03 Thread GitBox


ableegoldman commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r798443526



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##
@@ -329,7 +330,7 @@ private TopologyTestDriver(final InternalTopologyBuilder 
builder,
 
 final ThreadCache cache = new ThreadCache(
 logContext,
-Math.max(0, 
streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
+Math.max(0, 
streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)),

Review comment:
   > since it's a test case so it shouldn't really matter. Isn't that the 
case?
   
   Well if someone is using the TTD to write unit tests and those tests start 
to fail after they upgrade because the caching is different, I would say that's 
compatibility change.
   
   Although I read the TTD's javadocs earlier and remembered that it actually 
processes records synchronously, which effectively means that the only thing 
that matters/affects the TTD results is whether the cache size is non-zero or 
has been set to 0 -- and setting it to 0 only matters if it's correctly set to 
0 in the TopologyConfig, not the value here. Which is a long way of saying that 
in hindsight, this config/bug doesn't really impact anything after all. In 
fact, imho we should probably just hard-code the TTD's ThreadCache cache size 
to 0 -- but let's not wrap that change into an already rather large PR in case 
there's something I'm not taking into account here.
   
   So tl;dr, for future reference do still need to maintain backwards 
compatibility in the TTD since it's part of the public interface. But it just 
so happens that this particular bug doesn't actually break anything "real" or 
have any visible impact  (at least AFAICT




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-03 Thread GitBox


ableegoldman commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r798399133



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##
@@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
 maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 log.info("Topology {} is overriding {} to {}", topologyName, 
BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
 } else {
-maxBufferedSize = 
globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+maxBufferedSize = 
globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+? 
globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : 
-1;
 }
 
-if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
-cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-log.info("Topology {} is overriding {} to {}", topologyName, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) ||
+isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+
+if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+log.info("Topology {} is using both {} and deprecated config 
{}. overriding {} to {}",
+topologyName,
+STATESTORE_CACHE_MAX_BYTES_CONFIG,
+CACHE_MAX_BYTES_BUFFERING_CONFIG,
+STATESTORE_CACHE_MAX_BYTES_CONFIG,
+cacheSize);
+} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+log.info("Topology {} is using deprecated config {}. 
overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+} else {
+cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+log.info("Topology {} is overriding {} to {}", topologyName, 
STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize);
+}
 } else {
-cacheSize = 
globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+cacheSize = 
globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);

Review comment:
   Ah, I see the confusion. The `#isTopologyOverride` method checks whether 
the config has been overridden for the specific topology, ie has been set in 
the Properties passed in to `StreamsBuilder#build` -- it's not looking at what 
we call the `globalAppConfigs` which are the actual application configs: ie 
those passed in to the `KafkaStreams` constructor.
   
   So basically there are two sets of configs. The value should be taken as the 
first of these to be set by the user, in the following order:
   1) `statestore.cache.max.bytes` in `topologyOverrides` 
   2) `cache.max.bytes.buffering` in `topologyOverrides` 
   3)`statestore.cache.max.bytes` in `globalAppConfigs`
   4) `cache.max.bytes.buffering` in `globalAppConfigs`
   
   Essentially, using `#getTotalCacheSize` on the `topologyOverrides` if either 
of them is set (which this PR is doing) and on the `globalAppConfigs` if they 
are not (which is the regression here).
   
   On that note -- we also need to move `##getTotalCacheSize` out of 
StreamsConfig, because it's a public class and wasn't listed as a public API in 
the KIP (nor should it be, imo). I recommend creating a new static utility 
class for things like this, eg `StreamsConfigUtils` in the 
`org.apache.kafka.streams.internals` package. There are some other methods that 
would belong there, for example the `StreamThread` methods `#processingMode` 
and `#eosEnabled` should be moved as well
   
   Hope that all makes sense -- and lmk if you don't think you'll have the time 
to put out a full patch, and I or another Streams dev can help out 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13583) Fix FetchRequestBetweenDifferentIbpTest flaky tests

2022-02-03 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13583.
-
Fix Version/s: 3.2.0
   3.1.1
 Reviewer: David Jacot
   Resolution: Fixed

> Fix FetchRequestBetweenDifferentIbpTest flaky tests
> ---
>
> Key: KAFKA-13583
> URL: https://issues.apache.org/jira/browse/KAFKA-13583
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: Kvicii.Yu
>Priority: Minor
> Fix For: 3.2.0, 3.1.1
>
>
> FetchRequestBetweenDifferentIbpTest's tests often fails with:
> {noformat}
> org.opentest4j.AssertionFailedError: expected: <2> but was: <1>
>  at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>  at org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62)
>  at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
>  at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
>  at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:510)
>  at 
> integration.kafka.server.FetchRequestBetweenDifferentIbpTest.testControllerSwitchingIBP(FetchRequestBetweenDifferentIbpTest.scala:113)
>  at 
> integration.kafka.server.FetchRequestBetweenDifferentIbpTest.testControllerOldToNewIBP(FetchRequestBetweenDifferentIbpTest.scala:87)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
> {noformat}
> This might be due to 
> [https://github.com/apache/kafka/commit/e8818e234a879d5ca45accba0121f43f45381f4a]
>  where we reduced the poll timeout.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac merged pull request #11699: Fix FetchRequestBetweenDifferentIbpTest flaky tests

2022-02-03 Thread GitBox


dajac merged pull request #11699:
URL: https://github.com/apache/kafka/pull/11699


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11710: MINOR: MiniTrogdorCluster mutates objects from other threads

2022-02-03 Thread GitBox


dajac merged pull request #11710:
URL: https://github.com/apache/kafka/pull/11710


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11727: MINOR:fix AbstractStickyAssignor doc

2022-02-03 Thread GitBox


dajac merged pull request #11727:
URL: https://github.com/apache/kafka/pull/11727


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13637) User default.api.timeout.ms config as default timeout for KafkaConsumer.endOffsets

2022-02-03 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13637.
-
Fix Version/s: 3.0.1
   3.2.0
   3.1.1
 Reviewer: David Jacot
   Resolution: Fixed

> User default.api.timeout.ms config as default timeout for 
> KafkaConsumer.endOffsets
> --
>
> Key: KAFKA-13637
> URL: https://issues.apache.org/jira/browse/KAFKA-13637
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
> Fix For: 3.0.1, 3.2.0, 3.1.1
>
>
> In KafkaConsumer, we use `request.timeout.ms` in `endOffsets` and 
> `default.api.timeout.ms` when in `beginningOffsets`, we should use 
> `default.api.timeout.ms` for both.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac merged pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets

2022-02-03 Thread GitBox


dajac merged pull request #11726:
URL: https://github.com/apache/kafka/pull/11726


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-03 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486321#comment-17486321
 ] 

Dongjin Lee commented on KAFKA-9366:


Hi [~noonbs], this issue will be resolved with KAFKA-12399. I expect it will be 
done in 3.2.0.

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


  1   2   >