[GitHub] [kafka] satishd closed pull request #10578: MINOR Moved ApiMessageAndVersion and AbstractApiMessageAndVersionSerde to clients module.
satishd closed pull request #10578: URL: https://github.com/apache/kafka/pull/10578 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset
[ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Travis Bischel updated KAFKA-12671: --- Priority: Blocker (was: Critical) > Out of order processing with a transactional producer can lead to a stuck > LastStableOffset > -- > > Key: KAFKA-12671 > URL: https://issues.apache.org/jira/browse/KAFKA-12671 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0 >Reporter: Travis Bischel >Priority: Blocker > Labels: Transactions > > If there is pathological processing of incoming produce requests and EndTxn > requests, then the LastStableOffset can get stuck, which will block consuming > in READ_COMMITTED mode. > To transactionally produce, the standard flow is to InitProducerId, and then > loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is > responsible for fencing and adding partitions to a transaction, and the end > transaction is responsible for finishing the transaction. Producing itself is > mostly uninvolved with the proper fencing / ending flow, but produce requests > are required to be after AddPartitionsToTxn and before EndTxn. > When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager > to mildly manage transactions. The ProducerStateManager is completely > independent of the TxnCoordinator, and its guarantees are relatively weak. > The ProducerStateManager handles two types of "batches" being added: a data > batch and a transaction marker. When a data batch is added, a "transaction" > is begun and tied to the producer ID that is producing the batch. When a > transaction marker is handled, the ProducerStateManager removes the > transaction for the producer ID (roughly). > EndTxn is what triggers transaction markers to be sent to the > ProducerStateManager. In essence, EndTxn is the one part of the transactional > producer flow that talks across both the TxnCoordinator and the > ProducerStateManager. > If a ProduceRequest is issued before EndTxn, but handled internally in Kafka > after EndTxn, then the ProduceRequest will begin a new transaction in the > ProducerStateManager. If the client was disconnecting, and the EndTxn was the > final request issued, the new transaction created in ProducerStateManager is > orphaned and nothing can clean it up. The LastStableOffset then hangs based > off of this hung transaction. > This same problem can be triggered by a produce request that is issued with a > transactional ID outside of the context of a transaction at all (no > AddPartitionsToTxn). This problem cannot be triggered by producing for so > long that the transaction expires; the difference here is that the > transaction coordinator bumps the epoch for the producer ID, thus producing > again with the old epoch does not work. > Theoretically, we are supposed have unlimited retries on produce requests, > but in the context of wanting to abort everything and shut down, this is not > always feasible. As it currently stands, I'm not sure there's a truly safe > way to shut down _without_ flushing and receiving responses for every record > produced, even if I want to abort everything and quit. The safest approach I > can think of is to actually avoid issuing an EndTxn so that instead we rely > on Kafka internally to time things out after a period of time. > — > For some context, here's my request logs from the client. Note that I write > two ProduceRequests, read one, and then issue EndTxn (because I know I want > to quit). The second ProduceRequest is read successfully before shutdown, but > I ignore the results because I am shutting down. I've taken out logs related > to consuming, but the order of the logs is unchanged: > {noformat} > [INFO] done waiting for unknown topic, metadata was successful; topic: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765 > [INFO] initializing producer id > [DEBUG] wrote FindCoordinator v3; err: > [DEBUG] read FindCoordinator v3; err: > [DEBUG] wrote InitProducerID v4; err: > [DEBUG] read InitProducerID v4; err: > [INFO] producer id initialization success; id: 1463, epoch: 0 > [DEBUG] wrote AddPartitionsToTxn v2; err: > [DEBUG] read AddPartitionsToTxn v2; err: > [DEBUG] wrote Produce v8; err: > [DEBUG] read Produce v8; err: > [DEBUG] produced; to: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}] > [DEBUG] wrote Produce v8; err: > [DEBUG] wrote EndTxn v2; err: > [DEBUG] read EndTxn v2; err: > [DEBUG] read from broker errored, killing connection; addr: localhost:9092, > id: 1, successful_reads: 1, err: context canceled > [DEBUG] read Produce v8; err: > [DEBUG] produced; to: >
[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset
[ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Travis Bischel updated KAFKA-12671: --- Labels: Transactions (was: ) > Out of order processing with a transactional producer can lead to a stuck > LastStableOffset > -- > > Key: KAFKA-12671 > URL: https://issues.apache.org/jira/browse/KAFKA-12671 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0 >Reporter: Travis Bischel >Priority: Critical > Labels: Transactions > > If there is pathological processing of incoming produce requests and EndTxn > requests, then the LastStableOffset can get stuck, which will block consuming > in READ_COMMITTED mode. > To transactionally produce, the standard flow is to InitProducerId, and then > loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is > responsible for fencing and adding partitions to a transaction, and the end > transaction is responsible for finishing the transaction. Producing itself is > mostly uninvolved with the proper fencing / ending flow, but produce requests > are required to be after AddPartitionsToTxn and before EndTxn. > When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager > to mildly manage transactions. The ProducerStateManager is completely > independent of the TxnCoordinator, and its guarantees are relatively weak. > The ProducerStateManager handles two types of "batches" being added: a data > batch and a transaction marker. When a data batch is added, a "transaction" > is begun and tied to the producer ID that is producing the batch. When a > transaction marker is handled, the ProducerStateManager removes the > transaction for the producer ID (roughly). > EndTxn is what triggers transaction markers to be sent to the > ProducerStateManager. In essence, EndTxn is the one part of the transactional > producer flow that talks across both the TxnCoordinator and the > ProducerStateManager. > If a ProduceRequest is issued before EndTxn, but handled internally in Kafka > after EndTxn, then the ProduceRequest will begin a new transaction in the > ProducerStateManager. If the client was disconnecting, and the EndTxn was the > final request issued, the new transaction created in ProducerStateManager is > orphaned and nothing can clean it up. The LastStableOffset then hangs based > off of this hung transaction. > This same problem can be triggered by a produce request that is issued with a > transactional ID outside of the context of a transaction at all (no > AddPartitionsToTxn). This problem cannot be triggered by producing for so > long that the transaction expires; the difference here is that the > transaction coordinator bumps the epoch for the producer ID, thus producing > again with the old epoch does not work. > Theoretically, we are supposed have unlimited retries on produce requests, > but in the context of wanting to abort everything and shut down, this is not > always feasible. As it currently stands, I'm not sure there's a truly safe > way to shut down _without_ flushing and receiving responses for every record > produced, even if I want to abort everything and quit. The safest approach I > can think of is to actually avoid issuing an EndTxn so that instead we rely > on Kafka internally to time things out after a period of time. > — > For some context, here's my request logs from the client. Note that I write > two ProduceRequests, read one, and then issue EndTxn (because I know I want > to quit). The second ProduceRequest is read successfully before shutdown, but > I ignore the results because I am shutting down. I've taken out logs related > to consuming, but the order of the logs is unchanged: > {noformat} > [INFO] done waiting for unknown topic, metadata was successful; topic: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765 > [INFO] initializing producer id > [DEBUG] wrote FindCoordinator v3; err: > [DEBUG] read FindCoordinator v3; err: > [DEBUG] wrote InitProducerID v4; err: > [DEBUG] read InitProducerID v4; err: > [INFO] producer id initialization success; id: 1463, epoch: 0 > [DEBUG] wrote AddPartitionsToTxn v2; err: > [DEBUG] read AddPartitionsToTxn v2; err: > [DEBUG] wrote Produce v8; err: > [DEBUG] read Produce v8; err: > [DEBUG] produced; to: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}] > [DEBUG] wrote Produce v8; err: > [DEBUG] wrote EndTxn v2; err: > [DEBUG] read EndTxn v2; err: > [DEBUG] read from broker errored, killing connection; addr: localhost:9092, > id: 1, successful_reads: 1, err: context canceled > [DEBUG] read Produce v8; err: > [DEBUG] produced; to: >
[GitHub] [kafka] ableegoldman commented on pull request #10635: KAFKA-9295: increase start stream timeout
ableegoldman commented on pull request #10635: URL: https://github.com/apache/kafka/pull/10635#issuecomment-833225272 Merged to trunk -- let's hope these good results continue. I'm going to close the ticket again so people are more likely to report it if they see things to continue to break. Merged about 9:58pm PST on May 5th, so please disregard any test failures on builds kicked off prior to that time (and raise any new failures by reopening the ticket) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10635: KAFKA-9295: increase start stream timeout
ableegoldman merged pull request #10635: URL: https://github.com/apache/kafka/pull/10635 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset
[ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339995#comment-17339995 ] Travis Bischel commented on KAFKA-12671: The only fix for this if it occurs is to restart a broker. This logic race condition is present in all versions from 0.11 onward. > Out of order processing with a transactional producer can lead to a stuck > LastStableOffset > -- > > Key: KAFKA-12671 > URL: https://issues.apache.org/jira/browse/KAFKA-12671 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0 >Reporter: Travis Bischel >Priority: Critical > > If there is pathological processing of incoming produce requests and EndTxn > requests, then the LastStableOffset can get stuck, which will block consuming > in READ_COMMITTED mode. > To transactionally produce, the standard flow is to InitProducerId, and then > loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is > responsible for fencing and adding partitions to a transaction, and the end > transaction is responsible for finishing the transaction. Producing itself is > mostly uninvolved with the proper fencing / ending flow, but produce requests > are required to be after AddPartitionsToTxn and before EndTxn. > When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager > to mildly manage transactions. The ProducerStateManager is completely > independent of the TxnCoordinator, and its guarantees are relatively weak. > The ProducerStateManager handles two types of "batches" being added: a data > batch and a transaction marker. When a data batch is added, a "transaction" > is begun and tied to the producer ID that is producing the batch. When a > transaction marker is handled, the ProducerStateManager removes the > transaction for the producer ID (roughly). > EndTxn is what triggers transaction markers to be sent to the > ProducerStateManager. In essence, EndTxn is the one part of the transactional > producer flow that talks across both the TxnCoordinator and the > ProducerStateManager. > If a ProduceRequest is issued before EndTxn, but handled internally in Kafka > after EndTxn, then the ProduceRequest will begin a new transaction in the > ProducerStateManager. If the client was disconnecting, and the EndTxn was the > final request issued, the new transaction created in ProducerStateManager is > orphaned and nothing can clean it up. The LastStableOffset then hangs based > off of this hung transaction. > This same problem can be triggered by a produce request that is issued with a > transactional ID outside of the context of a transaction at all (no > AddPartitionsToTxn). This problem cannot be triggered by producing for so > long that the transaction expires; the difference here is that the > transaction coordinator bumps the epoch for the producer ID, thus producing > again with the old epoch does not work. > Theoretically, we are supposed have unlimited retries on produce requests, > but in the context of wanting to abort everything and shut down, this is not > always feasible. As it currently stands, I'm not sure there's a truly safe > way to shut down _without_ flushing and receiving responses for every record > produced, even if I want to abort everything and quit. The safest approach I > can think of is to actually avoid issuing an EndTxn so that instead we rely > on Kafka internally to time things out after a period of time. > — > For some context, here's my request logs from the client. Note that I write > two ProduceRequests, read one, and then issue EndTxn (because I know I want > to quit). The second ProduceRequest is read successfully before shutdown, but > I ignore the results because I am shutting down. I've taken out logs related > to consuming, but the order of the logs is unchanged: > {noformat} > [INFO] done waiting for unknown topic, metadata was successful; topic: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765 > [INFO] initializing producer id > [DEBUG] wrote FindCoordinator v3; err: > [DEBUG] read FindCoordinator v3; err: > [DEBUG] wrote InitProducerID v4; err: > [DEBUG] read InitProducerID v4; err: > [INFO] producer id initialization success; id: 1463, epoch: 0 > [DEBUG] wrote AddPartitionsToTxn v2; err: > [DEBUG] read AddPartitionsToTxn v2; err: > [DEBUG] wrote Produce v8; err: > [DEBUG] read Produce v8; err: > [DEBUG] produced; to: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}] > [DEBUG] wrote Produce v8; err: > [DEBUG] wrote EndTxn v2; err: > [DEBUG] read EndTxn v2; err: > [DEBUG] read from broker errored, killing connection; addr: localhost:9092, > id: 1, successful_reads: 1, err: context
[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset
[ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Travis Bischel updated KAFKA-12671: --- Affects Version/s: (was: 2.6.2) (was: 2.6.1) (was: 2.5.1) (was: 2.3.1) (was: 2.2.2) > Out of order processing with a transactional producer can lead to a stuck > LastStableOffset > -- > > Key: KAFKA-12671 > URL: https://issues.apache.org/jira/browse/KAFKA-12671 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0 >Reporter: Travis Bischel >Priority: Critical > > If there is pathological processing of incoming produce requests and EndTxn > requests, then the LastStableOffset can get stuck, which will block consuming > in READ_COMMITTED mode. > To transactionally produce, the standard flow is to InitProducerId, and then > loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is > responsible for fencing and adding partitions to a transaction, and the end > transaction is responsible for finishing the transaction. Producing itself is > mostly uninvolved with the proper fencing / ending flow, but produce requests > are required to be after AddPartitionsToTxn and before EndTxn. > When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager > to mildly manage transactions. The ProducerStateManager is completely > independent of the TxnCoordinator, and its guarantees are relatively weak. > The ProducerStateManager handles two types of "batches" being added: a data > batch and a transaction marker. When a data batch is added, a "transaction" > is begun and tied to the producer ID that is producing the batch. When a > transaction marker is handled, the ProducerStateManager removes the > transaction for the producer ID (roughly). > EndTxn is what triggers transaction markers to be sent to the > ProducerStateManager. In essence, EndTxn is the one part of the transactional > producer flow that talks across both the TxnCoordinator and the > ProducerStateManager. > If a ProduceRequest is issued before EndTxn, but handled internally in Kafka > after EndTxn, then the ProduceRequest will begin a new transaction in the > ProducerStateManager. If the client was disconnecting, and the EndTxn was the > final request issued, the new transaction created in ProducerStateManager is > orphaned and nothing can clean it up. The LastStableOffset then hangs based > off of this hung transaction. > This same problem can be triggered by a produce request that is issued with a > transactional ID outside of the context of a transaction at all (no > AddPartitionsToTxn). This problem cannot be triggered by producing for so > long that the transaction expires; the difference here is that the > transaction coordinator bumps the epoch for the producer ID, thus producing > again with the old epoch does not work. > Theoretically, we are supposed have unlimited retries on produce requests, > but in the context of wanting to abort everything and shut down, this is not > always feasible. As it currently stands, I'm not sure there's a truly safe > way to shut down _without_ flushing and receiving responses for every record > produced, even if I want to abort everything and quit. The safest approach I > can think of is to actually avoid issuing an EndTxn so that instead we rely > on Kafka internally to time things out after a period of time. > — > For some context, here's my request logs from the client. Note that I write > two ProduceRequests, read one, and then issue EndTxn (because I know I want > to quit). The second ProduceRequest is read successfully before shutdown, but > I ignore the results because I am shutting down. I've taken out logs related > to consuming, but the order of the logs is unchanged: > {noformat} > [INFO] done waiting for unknown topic, metadata was successful; topic: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765 > [INFO] initializing producer id > [DEBUG] wrote FindCoordinator v3; err: > [DEBUG] read FindCoordinator v3; err: > [DEBUG] wrote InitProducerID v4; err: > [DEBUG] read InitProducerID v4; err: > [INFO] producer id initialization success; id: 1463, epoch: 0 > [DEBUG] wrote AddPartitionsToTxn v2; err: > [DEBUG] read AddPartitionsToTxn v2; err: > [DEBUG] wrote Produce v8; err: > [DEBUG] read Produce v8; err: > [DEBUG] produced; to: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}] > [DEBUG] wrote Produce v8; err: > [DEBUG] wrote EndTxn v2; err: > [DEBUG] read EndTxn v2; err: > [DEBUG] read from broker errored, killing connection; addr: localhost:9092, > id: 1,
[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset
[ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Travis Bischel updated KAFKA-12671: --- Affects Version/s: 2.2.2 2.4.0 2.3.1 2.5.0 2.6.0 2.5.1 2.7.0 2.6.1 2.8.0 2.6.2 > Out of order processing with a transactional producer can lead to a stuck > LastStableOffset > -- > > Key: KAFKA-12671 > URL: https://issues.apache.org/jira/browse/KAFKA-12671 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.6.0, 2.5.1, 2.7.0, 2.6.1, > 2.8.0, 2.6.2 >Reporter: Travis Bischel >Priority: Critical > > If there is pathological processing of incoming produce requests and EndTxn > requests, then the LastStableOffset can get stuck, which will block consuming > in READ_COMMITTED mode. > To transactionally produce, the standard flow is to InitProducerId, and then > loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is > responsible for fencing and adding partitions to a transaction, and the end > transaction is responsible for finishing the transaction. Producing itself is > mostly uninvolved with the proper fencing / ending flow, but produce requests > are required to be after AddPartitionsToTxn and before EndTxn. > When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager > to mildly manage transactions. The ProducerStateManager is completely > independent of the TxnCoordinator, and its guarantees are relatively weak. > The ProducerStateManager handles two types of "batches" being added: a data > batch and a transaction marker. When a data batch is added, a "transaction" > is begun and tied to the producer ID that is producing the batch. When a > transaction marker is handled, the ProducerStateManager removes the > transaction for the producer ID (roughly). > EndTxn is what triggers transaction markers to be sent to the > ProducerStateManager. In essence, EndTxn is the one part of the transactional > producer flow that talks across both the TxnCoordinator and the > ProducerStateManager. > If a ProduceRequest is issued before EndTxn, but handled internally in Kafka > after EndTxn, then the ProduceRequest will begin a new transaction in the > ProducerStateManager. If the client was disconnecting, and the EndTxn was the > final request issued, the new transaction created in ProducerStateManager is > orphaned and nothing can clean it up. The LastStableOffset then hangs based > off of this hung transaction. > This same problem can be triggered by a produce request that is issued with a > transactional ID outside of the context of a transaction at all (no > AddPartitionsToTxn). This problem cannot be triggered by producing for so > long that the transaction expires; the difference here is that the > transaction coordinator bumps the epoch for the producer ID, thus producing > again with the old epoch does not work. > Theoretically, we are supposed have unlimited retries on produce requests, > but in the context of wanting to abort everything and shut down, this is not > always feasible. As it currently stands, I'm not sure there's a truly safe > way to shut down _without_ flushing and receiving responses for every record > produced, even if I want to abort everything and quit. The safest approach I > can think of is to actually avoid issuing an EndTxn so that instead we rely > on Kafka internally to time things out after a period of time. > — > For some context, here's my request logs from the client. Note that I write > two ProduceRequests, read one, and then issue EndTxn (because I know I want > to quit). The second ProduceRequest is read successfully before shutdown, but > I ignore the results because I am shutting down. I've taken out logs related > to consuming, but the order of the logs is unchanged: > {noformat} > [INFO] done waiting for unknown topic, metadata was successful; topic: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765 > [INFO] initializing producer id > [DEBUG] wrote FindCoordinator v3; err: > [DEBUG] read FindCoordinator v3; err: > [DEBUG] wrote InitProducerID v4; err: > [DEBUG] read InitProducerID v4; err: > [INFO] producer id initialization success; id: 1463, epoch: 0 > [DEBUG] wrote AddPartitionsToTxn v2; err: > [DEBUG] read AddPartitionsToTxn v2; err: > [DEBUG] wrote Produce v8; err: > [DEBUG] read Produce v8; err: > [DEBUG] produced; to: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}] > [DEBUG] wrote Produce v8; err: > [DEBUG] wrote EndTxn v2; err:
[GitHub] [kafka] showuon commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on pull request #10509: URL: https://github.com/apache/kafka/pull/10509#issuecomment-833207399 Thank you very much, @ableegoldman ! Let's make it better together! :) I'll address the rest of comments in another PR. And I'll also refine my another PR(https://github.com/apache/kafka/pull/10552) for general assign later. Will let you know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10610: MINOR: replace deprecated Class.newInstance() to new one
showuon commented on pull request #10610: URL: https://github.com/apache/kafka/pull/10610#issuecomment-833206456 @rhauch , could you help review this simple PR? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10635: KAFKA-9295: increase start stream timeout
showuon commented on pull request #10635: URL: https://github.com/apache/kafka/pull/10635#issuecomment-833204958 @ableegoldman , the failed tests are all flaky. (3 `RaftClusterTest`, and 1 `testMetricsDuringTopicCreateDelete` traced in KAFKA-9009). And most importantly, no failed `shouldInnerJoinMultiPartitionQueryable` test! ``` Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() Build / JDK 11 and Scala 2.13 / kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete() Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9009) Flaky Test kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete
[ https://issues.apache.org/jira/browse/KAFKA-9009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339976#comment-17339976 ] Luke Chen commented on KAFKA-9009: -- Failed again. [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10635/1/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/Build___JDK_11_and_Scala_2_13___testMetricsDuringTopicCreateDelete__/] {code:java} java.lang.AssertionError: assertion failed: UnderReplicatedPartitionCount not 0: 1 at scala.Predef$.assert(Predef.scala:280) at kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:121) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) {code} > Flaky Test > kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete > -- > > Key: KAFKA-9009 > URL: https://issues.apache.org/jira/browse/KAFKA-9009 > Project: Kafka > Issue Type: Test > Components: core >Affects Versions: 2.5.0, 2.6.0 >Reporter: Bill Bejeck >Priority: Major > Labels: flaky-test > > Failure seen in > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/25644/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/testMetricsDuringTopicCreateDelete/] > > {noformat} > Error Messagejava.lang.AssertionError: assertion failed: > UnderReplicatedPartitionCount not 0: 1Stacktracejava.lang.AssertionError: > assertion failed: UnderReplicatedPartitionCount not 0: 1 > at scala.Predef$.assert(Predef.scala:170) > at > kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:123) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) >
[GitHub] [kafka] dengziming commented on pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure
dengziming commented on pull request #9577: URL: https://github.com/apache/kafka/pull/9577#issuecomment-833187678 ping @mumrah to have a look . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r627027404 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -314,22 +321,24 @@ class SessionErrorContext(val error: Errors, override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = {} override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { -FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet.iterator) +FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet.iterator, Collections.emptyMap()) Review comment: ah good catch on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r627027261 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -319,12 +355,25 @@ public int maxBytes() { return data.maxBytes(); } -public Map fetchData() { -return fetchData; +// For versions 13+, throws UnknownTopicIdException if the topic ID was unknown to the server. +public Map fetchData(Map topicNames) throws UnknownTopicIdException { +if (version() < 13) +return fetchData; +return toPartitionDataMap(data.topics(), topicNames); } -public List toForget() { -return toForget; +// For versions 13+, throws UnknownTopicIdException if the topic ID was unknown to the server. +public List forgottenTopics(Map topicNames) throws UnknownTopicIdException { +if (version() >= 13) { +data.forgottenTopicsData().forEach(forgottenTopic -> { +String name = topicNames.get(forgottenTopic.topicId()); +if (name == null) { +throw new UnknownTopicIdException(String.format("Topic Id %s in FetchRequest was unknown to the server", forgottenTopic.topicId())); +} + forgottenTopic.setTopic(topicNames.getOrDefault(forgottenTopic.topicId(), "")); Review comment: I originally did this when dealing with unresolved partitions. I was wondering if it would be better to not create a second data structure. If creating another structure (as done before) is not a problem, we can go back to that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r627026623 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -820,20 +838,30 @@ class KafkaApis(val requestChannel: RequestChannel, def createResponse(throttleTimeMs: Int): FetchResponse = { // Down-convert messages for each partition if required val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] -unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) => - val error = Errors.forCode(unconvertedPartitionData.errorCode) - if (error != Errors.NONE) -debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " + - s"on partition $tp failed due to ${error.exceptionName}") - convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData)) +unconvertedFetchResponse.data().responses().forEach { topicResponse => + if (topicResponse.topic() != "") { Review comment: Realized this was no longer the case and removed in the most recent commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman merged pull request #10509: URL: https://github.com/apache/kafka/pull/10509 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627021459 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - *from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - *should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @returnMap from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMembersHavingMorePartitions = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp :
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627020968 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - *from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - *should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @returnMap from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMembersHavingMorePartitions = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp :
[GitHub] [kafka] shayelkin opened a new pull request #10636: MINOR: Bump Jersey deps to 2.34 due to CVE-2021-28168
shayelkin opened a new pull request #10636: URL: https://github.com/apache/kafka/pull/10636 The version of the Eclipse Jersey library brought as dependences, 2.31, has a known vulnerability, CVE-2021-28168 (https://github.com/advisories/GHSA-c43q-5hpj-4crv). This replaces it with 2.34, which is fully compatible with 2.31, except for bugs and vulnerabilities. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627020382 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - *from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - *should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @returnMap from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMembersHavingMorePartitions = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp :
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627019548 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - *from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - *should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @returnMap from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMembersHavingMorePartitions = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp :
[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r625422142 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -425,16 +438,27 @@ class IncrementalFetchContext(private val time: Time, val topicPart = element.getKey val respData = element.getValue val cachedPart = session.partitionMap.find(new CachedPartition(topicPart)) -val mustRespond = cachedPart.maybeUpdateResponseData(respData, updateFetchContextAndRemoveUnselected) -if (mustRespond) { + +// If we have an situation where there is a valid ID on the partition, but it does not match Review comment: an situation => a situation ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -314,22 +321,24 @@ class SessionErrorContext(val error: Errors, override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = {} override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { -FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet.iterator) +FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet.iterator, Collections.emptyMap()) Review comment: Hmm, it seems that we can't pass in an empty topicIds since partition iterator is not empty? ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -226,4 +284,4 @@ private static FetchResponseData toMessage(Errors error, .setSessionId(sessionId) .setResponses(topicResponseList); } -} \ No newline at end of file +} Review comment: no need for extra new line. ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -319,12 +355,25 @@ public int maxBytes() { return data.maxBytes(); } -public Map fetchData() { +// For versions 13+, throws UnknownTopicIdException if the topic ID was unknown to the server. +public Map fetchData(Map topicNames) throws UnknownTopicIdException { Review comment: Since toPartitionDataMap() handles all versions, could we just simply call toPartitionDataMap()? Then, I am not sure if we need to call toPartitionDataMap() in the constructor. ## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ## @@ -276,7 +284,12 @@ class ReplicaAlterLogDirsThread(name: String, } else { // Set maxWait and minBytes to 0 because the response should return immediately if // the future log has caught up with the current log of the partition - val requestBuilder = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes) + val version: Short = if (ApiKeys.FETCH.latestVersion >= 13 && topics.size() != topicIdsInRequest.size()) +12 + else +ApiKeys.FETCH.latestVersion Review comment: The calculation of version is duplicated between here and ReplicaFetcherThread. Could we share them somehow? ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -80,14 +89,26 @@ public Errors error() { return Errors.forCode(data.errorCode()); } -public LinkedHashMap responseData() { +public LinkedHashMap responseData(Map topicNames, short version) { +return toResponseDataMap(topicNames, version); + +} + +// TODO: Should be replaced or cleaned up. The idea is that in KafkaApis we need to reconstruct responseData even though we could have just passed in and out a map. +// With topic IDs, recreating the map takes a little more time since we have to get the topic name from the topic ID to name map. +// The refactor somewhat helps in KafkaApis where we already have the topic names, but we have to recompute the map using topic IDs instead of just returning what we have. +// Can be replaced when we remove toMessage and change sizeOf as a part of KAFKA-12410. +// Used when we can guarantee responseData is populated with all possible partitions +// This occurs when we have a response version < 13 or we built the FetchResponse with +// responseDataMap as a parameter and we have the same topic IDs available. +public LinkedHashMap resolvedResponseData() { if (responseData == null) { synchronized (this) { if (responseData == null) { responseData = new LinkedHashMap<>(); data.responses().forEach(topicResponse -> -topicResponse.partitions().forEach(partition -> -responseData.put(new TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition)) +
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627016997 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - *from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - *should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @returnMap from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMembersHavingMorePartitions = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp :
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627015270 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - *from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - *should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @returnMap from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMembersHavingMorePartitions = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp :
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627016286 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - *from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - *should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @returnMap from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMembersHavingMorePartitions = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp :
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627015270 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - *from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - *should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @returnMap from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMembersHavingMorePartitions = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp :
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627014485 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - *from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - *should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @returnMap from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMembersHavingMorePartitions = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp :
[GitHub] [kafka] showuon commented on pull request #10635: KAFKA-9295: increase start stream timeout
showuon commented on pull request #10635: URL: https://github.com/apache/kafka/pull/10635#issuecomment-833152754 This should be merged soon, and I'm confident increasing the timeout will reduce the number of failure a lot. 爛 Let's wait for the jenkins build. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627012042 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - *from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - *should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @returnMap from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; Review comment: Just a nit -- and to clarify up front, if you agree with this let's still hold off on doing it here so this PR can finally be merged, as I figure any nits can be addressed in your general assign PR: It's still a bit unclear what this value will be sued for when you first see it, maybe we can work in the word `minQuota` somewhere in the name? Eg `expectedNumMembersWithMoreThanMinQuotaPartitions`, or for a slightly shorter example `numConsumersAssignedOverMinQuota`, or something between or similar to those FYI I'm also ok with it as-is if you prefer the current name -- just wanted to throw out some other suggestions. I'll trust you to pick whatever name feels right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata
wcarlson5 commented on pull request #10634: URL: https://github.com/apache/kafka/pull/10634#issuecomment-833150214 @ableegoldman yep it looks like I am going to have to go back to the drawing board for either the test or the impl. I will look at it tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10635: KAFKA-9295: increase start stream timeout
ableegoldman commented on pull request #10635: URL: https://github.com/apache/kafka/pull/10635#issuecomment-833149684 Just ping me when the build passes, you know the drill By the way, even if it does fail at another point later on in this test, I'd like to go ahead and merge this anyways. Hopefully increasing the timeout will at least reduce the number of failures, of which there have been quite a few, and will make it easier to investigate the remaining problems. But let me know if you'd prefer that I wait in case that does happen. Though I'm feeling optimistic it won't 爛 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10635: KAFKA-9295: increase start stream timeout
showuon commented on pull request #10635: URL: https://github.com/apache/kafka/pull/10635#issuecomment-833148107 @ableegoldman , please take a look. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #10635: KAFKA-9295: increase start stream timeout
showuon opened a new pull request #10635: URL: https://github.com/apache/kafka/pull/10635 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dosvath commented on pull request #10375: KAFKA-12522: Cast SMT should allow null value records to pass through
dosvath commented on pull request #10375: URL: https://github.com/apache/kafka/pull/10375#issuecomment-833132393 @ewencp @mjsax one more ping on this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata
ableegoldman commented on pull request #10634: URL: https://github.com/apache/kafka/pull/10634#issuecomment-833125376 @wcarlson5 there's a failure that I'm guessing is related: `streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
ableegoldman commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626990213 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() { return dbOptions.writeBufferManager(); } +@Override +public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) { +dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes); +return this; +} + +@Override +public long maxWriteBatchGroupSizeBytes() { +return dbOptions.maxWriteBatchGroupSizeBytes(); +} + +@Override +public Options oldDefaults(final int majorVersion, final int minorVersion) { +columnFamilyOptions.oldDefaults(majorVersion, minorVersion); +return this; +} + +@Override +public Options optimizeForSmallDb(final Cache cache) { +return super.optimizeForSmallDb(cache); +} + +@Override +public AbstractCompactionFilter> compactionFilter() { +return columnFamilyOptions.compactionFilter(); +} + +@Override +public AbstractCompactionFilterFactory> compactionFilterFactory() { +return columnFamilyOptions.compactionFilterFactory(); +} + +@Override +public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) { +dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec); +return this; +} + +@Override +public int statsPersistPeriodSec() { +return dbOptions.statsPersistPeriodSec(); +} + +@Override +public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) { +dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize); +return this; +} + +@Override +public long statsHistoryBufferSize() { +return dbOptions.statsHistoryBufferSize(); +} + +@Override +public Options setStrictBytesPerSync(final boolean strictBytesPerSync) { +dbOptions.setStrictBytesPerSync(strictBytesPerSync); +return this; +} + +@Override +public boolean strictBytesPerSync() { +return dbOptions.strictBytesPerSync(); +} + +@Override +public Options setListeners(final List listeners) { +dbOptions.setListeners(listeners); +return this; +} + +@Override +public List listeners() { +return dbOptions.listeners(); +} + +@Override +public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) { +dbOptions.setEnablePipelinedWrite(enablePipelinedWrite); +return this; +} + +@Override +public boolean enablePipelinedWrite() { +return dbOptions.enablePipelinedWrite(); +} + +@Override +public Options setUnorderedWrite(final boolean unorderedWrite) { +dbOptions.setUnorderedWrite(unorderedWrite); +return this; +} + +@Override +public boolean unorderedWrite() { +return dbOptions.unorderedWrite(); +} + +@Override +public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) { + dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen); +return this; +} + +@Override +public boolean skipCheckingSstFileSizesOnDbOpen() { +return dbOptions.skipCheckingSstFileSizesOnDbOpen(); +} + +@Override +public Options setWalFilter(final AbstractWalFilter walFilter) { +dbOptions.setWalFilter(walFilter); Review comment: Yep, in RocksDBStore we set `wOptions.setDisableWAL(true)` -- it's not on the DB/CF options, instead it's a configuration on a separate WriteOptions class. So we do indeed enforce that it's disabled -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
ableegoldman commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626989130 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() { return dbOptions.writeBufferManager(); } +@Override +public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) { +dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes); +return this; +} + +@Override +public long maxWriteBatchGroupSizeBytes() { +return dbOptions.maxWriteBatchGroupSizeBytes(); +} + +@Override +public Options oldDefaults(final int majorVersion, final int minorVersion) { +columnFamilyOptions.oldDefaults(majorVersion, minorVersion); +return this; +} + +@Override +public Options optimizeForSmallDb(final Cache cache) { +return super.optimizeForSmallDb(cache); +} + +@Override +public AbstractCompactionFilter> compactionFilter() { +return columnFamilyOptions.compactionFilter(); +} + +@Override +public AbstractCompactionFilterFactory> compactionFilterFactory() { +return columnFamilyOptions.compactionFilterFactory(); +} + +@Override +public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) { +dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec); +return this; +} + +@Override +public int statsPersistPeriodSec() { +return dbOptions.statsPersistPeriodSec(); +} + +@Override +public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) { +dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize); +return this; +} + +@Override +public long statsHistoryBufferSize() { +return dbOptions.statsHistoryBufferSize(); +} + +@Override +public Options setStrictBytesPerSync(final boolean strictBytesPerSync) { +dbOptions.setStrictBytesPerSync(strictBytesPerSync); +return this; +} + +@Override +public boolean strictBytesPerSync() { +return dbOptions.strictBytesPerSync(); +} + +@Override +public Options setListeners(final List listeners) { +dbOptions.setListeners(listeners); +return this; +} + +@Override +public List listeners() { +return dbOptions.listeners(); +} + +@Override +public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) { +dbOptions.setEnablePipelinedWrite(enablePipelinedWrite); +return this; +} + +@Override +public boolean enablePipelinedWrite() { +return dbOptions.enablePipelinedWrite(); +} + +@Override +public Options setUnorderedWrite(final boolean unorderedWrite) { +dbOptions.setUnorderedWrite(unorderedWrite); +return this; +} + +@Override +public boolean unorderedWrite() { +return dbOptions.unorderedWrite(); +} + +@Override +public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) { + dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen); +return this; +} + +@Override +public boolean skipCheckingSstFileSizesOnDbOpen() { +return dbOptions.skipCheckingSstFileSizesOnDbOpen(); +} + +@Override +public Options setWalFilter(final AbstractWalFilter walFilter) { +dbOptions.setWalFilter(walFilter); Review comment: I thought we did actively disable it, although I'll see if I can find where/whether this is done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339937#comment-17339937 ] Luke Chen commented on KAFKA-9295: -- Let me create another PR for it later. > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > -- > > Key: KAFKA-9295 > URL: https://issues.apache.org/jira/browse/KAFKA-9295 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.4.0, 2.6.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/] > {quote}java.lang.AssertionError: Did not receive all 1 records from topic > output- within 6 ms Expected: is a value equal to or greater than <1> > but: <0> was less than <1> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8531) Change default replication factor config
[ https://issues.apache.org/jira/browse/KAFKA-8531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-8531. Resolution: Fixed > Change default replication factor config > > > Key: KAFKA-8531 > URL: https://issues.apache.org/jira/browse/KAFKA-8531 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Blocker > Labels: kip > Fix For: 3.0.0 > > > With KAFKA-8305, AdminClient allows to create topics based on the broker > default replication factor. > Kafka Streams sets `replication.factor` to 1 by default atm, to give a good > out-of-the-box user experience. The problem is, that people may need to > change the config if they push an application to production. > We should change the default to `-1` to exploit the new AdminClient feature. > This won't impact the out-of-the-box experience and may avoids the need to > change the setting when pushing an application to production. > KIP-733: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-733%3A+change+Kafka+Streams+default+replication+factor+config] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax merged pull request #10532: KAFKA-8531: Change default replication factor config
mjsax merged pull request #10532: URL: https://github.com/apache/kafka/pull/10532 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
ableegoldman commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626176038 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -396,6 +396,21 @@ public void close() { log.info("Skipping to close non-initialized store {}", entry.getKey()); } } +for (final StateStore store : globalStateStores) { Review comment: Heh, @guozhangwang and I reviewed at the same time. I didn't notice that `globalStores` would not be populated until restoration, whereas `globalStateStores` is populated in the constructor. Imo we should just populate `globalStores` in the constructor as well, but I guess that won't be necessary if @guozhangwang does a quick followup to consolidate them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
ableegoldman commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626971805 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() { return dbOptions.writeBufferManager(); } +@Override +public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) { +dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes); +return this; +} + +@Override +public long maxWriteBatchGroupSizeBytes() { +return dbOptions.maxWriteBatchGroupSizeBytes(); +} + +@Override +public Options oldDefaults(final int majorVersion, final int minorVersion) { +columnFamilyOptions.oldDefaults(majorVersion, minorVersion); +return this; +} + +@Override +public Options optimizeForSmallDb(final Cache cache) { +return super.optimizeForSmallDb(cache); +} + +@Override +public AbstractCompactionFilter> compactionFilter() { +return columnFamilyOptions.compactionFilter(); +} + +@Override +public AbstractCompactionFilterFactory> compactionFilterFactory() { +return columnFamilyOptions.compactionFilterFactory(); +} + +@Override +public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) { +dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec); +return this; +} + +@Override +public int statsPersistPeriodSec() { +return dbOptions.statsPersistPeriodSec(); +} + +@Override +public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) { +dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize); +return this; +} + +@Override +public long statsHistoryBufferSize() { +return dbOptions.statsHistoryBufferSize(); +} + +@Override +public Options setStrictBytesPerSync(final boolean strictBytesPerSync) { +dbOptions.setStrictBytesPerSync(strictBytesPerSync); +return this; +} + +@Override +public boolean strictBytesPerSync() { +return dbOptions.strictBytesPerSync(); +} + +@Override +public Options setListeners(final List listeners) { +dbOptions.setListeners(listeners); +return this; +} + +@Override +public List listeners() { +return dbOptions.listeners(); +} + +@Override +public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) { +dbOptions.setEnablePipelinedWrite(enablePipelinedWrite); +return this; +} + +@Override +public boolean enablePipelinedWrite() { +return dbOptions.enablePipelinedWrite(); +} + +@Override +public Options setUnorderedWrite(final boolean unorderedWrite) { +dbOptions.setUnorderedWrite(unorderedWrite); +return this; +} + +@Override +public boolean unorderedWrite() { +return dbOptions.unorderedWrite(); +} + +@Override +public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) { + dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen); +return this; +} + +@Override +public boolean skipCheckingSstFileSizesOnDbOpen() { +return dbOptions.skipCheckingSstFileSizesOnDbOpen(); +} + +@Override +public Options setWalFilter(final AbstractWalFilter walFilter) { +dbOptions.setWalFilter(walFilter); +return this; +} + +@Override +public WalFilter walFilter() { +return dbOptions.walFilter(); +} + +@Override +public Options setAllowIngestBehind(final boolean allowIngestBehind) { +dbOptions.setAllowIngestBehind(allowIngestBehind); +return this; +} + +@Override +public boolean allowIngestBehind() { +return dbOptions.allowIngestBehind(); +} + +@Override +public Options setPreserveDeletes(final boolean preserveDeletes) { +dbOptions.setPreserveDeletes(preserveDeletes); +return this; +} + +@Override +public boolean preserveDeletes() { +return dbOptions.preserveDeletes(); +} + +@Override +public Options setTwoWriteQueues(final boolean twoWriteQueues) { +dbOptions.setTwoWriteQueues(twoWriteQueues); +return this; +} + +@Override +public boolean twoWriteQueues() { +return dbOptions.twoWriteQueues(); +} + +@Override +public Options setManualWalFlush(final boolean manualWalFlush) { +dbOptions.setManualWalFlush(manualWalFlush); +return this; +} + +@Override +public boolean manualWalFlush() { +return dbOptions.manualWalFlush(); +} + +@Override +public Options
[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
ableegoldman commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626971137 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() { return dbOptions.writeBufferManager(); } +@Override +public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) { +dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes); +return this; +} + +@Override +public long maxWriteBatchGroupSizeBytes() { +return dbOptions.maxWriteBatchGroupSizeBytes(); +} + +@Override +public Options oldDefaults(final int majorVersion, final int minorVersion) { +columnFamilyOptions.oldDefaults(majorVersion, minorVersion); +return this; +} + +@Override +public Options optimizeForSmallDb(final Cache cache) { +return super.optimizeForSmallDb(cache); +} + +@Override +public AbstractCompactionFilter> compactionFilter() { +return columnFamilyOptions.compactionFilter(); +} + +@Override +public AbstractCompactionFilterFactory> compactionFilterFactory() { +return columnFamilyOptions.compactionFilterFactory(); +} + +@Override +public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) { +dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec); +return this; +} + +@Override +public int statsPersistPeriodSec() { +return dbOptions.statsPersistPeriodSec(); +} + +@Override +public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) { +dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize); +return this; +} + +@Override +public long statsHistoryBufferSize() { +return dbOptions.statsHistoryBufferSize(); +} + +@Override +public Options setStrictBytesPerSync(final boolean strictBytesPerSync) { +dbOptions.setStrictBytesPerSync(strictBytesPerSync); +return this; +} + +@Override +public boolean strictBytesPerSync() { +return dbOptions.strictBytesPerSync(); +} + +@Override +public Options setListeners(final List listeners) { +dbOptions.setListeners(listeners); +return this; +} + +@Override +public List listeners() { +return dbOptions.listeners(); +} + +@Override +public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) { +dbOptions.setEnablePipelinedWrite(enablePipelinedWrite); +return this; +} + +@Override +public boolean enablePipelinedWrite() { +return dbOptions.enablePipelinedWrite(); +} + +@Override +public Options setUnorderedWrite(final boolean unorderedWrite) { +dbOptions.setUnorderedWrite(unorderedWrite); +return this; +} + +@Override +public boolean unorderedWrite() { +return dbOptions.unorderedWrite(); +} + +@Override +public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) { + dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen); +return this; +} + +@Override +public boolean skipCheckingSstFileSizesOnDbOpen() { +return dbOptions.skipCheckingSstFileSizesOnDbOpen(); +} + +@Override +public Options setWalFilter(final AbstractWalFilter walFilter) { +dbOptions.setWalFilter(walFilter); +return this; +} + +@Override +public WalFilter walFilter() { +return dbOptions.walFilter(); +} + +@Override +public Options setAllowIngestBehind(final boolean allowIngestBehind) { +dbOptions.setAllowIngestBehind(allowIngestBehind); +return this; +} + +@Override +public boolean allowIngestBehind() { +return dbOptions.allowIngestBehind(); +} + +@Override +public Options setPreserveDeletes(final boolean preserveDeletes) { +dbOptions.setPreserveDeletes(preserveDeletes); +return this; +} + +@Override +public boolean preserveDeletes() { +return dbOptions.preserveDeletes(); +} + +@Override +public Options setTwoWriteQueues(final boolean twoWriteQueues) { +dbOptions.setTwoWriteQueues(twoWriteQueues); +return this; +} + +@Override +public boolean twoWriteQueues() { +return dbOptions.twoWriteQueues(); +} + +@Override +public Options setManualWalFlush(final boolean manualWalFlush) { +dbOptions.setManualWalFlush(manualWalFlush); +return this; +} + +@Override +public boolean manualWalFlush() { +return dbOptions.manualWalFlush(); +} + +@Override +public Options
[jira] [Created] (KAFKA-12756) Update Zookeeper to 3.6.3 or higher
Boojapho created KAFKA-12756: Summary: Update Zookeeper to 3.6.3 or higher Key: KAFKA-12756 URL: https://issues.apache.org/jira/browse/KAFKA-12756 Project: Kafka Issue Type: Task Affects Versions: 2.8.0, 2.7.0 Reporter: Boojapho Zookeeper 3.6.3 or higher provides a security fix for [CVE-21409]([https://nvd.nist.gov/vuln/detail/CVE-2021-21409)] which should be included in Apache Kafka to eliminate the vulnerability. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9177) Pause completed partitions on restore consumer
[ https://issues.apache.org/jira/browse/KAFKA-9177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339336#comment-17339336 ] Andrey Polyakov edited comment on KAFKA-9177 at 5/5/21, 10:01 PM: -- It's possible this got missed as part of KAFKA-9113? I'm seeing many messages like this per second in our 2.6.1 Kafka Streams application logs (and also 2.6.2, 2.7.0, and 2.8.0): {code} {"timestamp":{"seconds":1620165908,"nanos":76900},"thread":"myapp-StreamThread-1","severity":"DEBUG","loggerName":"org.apache.kafka.streams.processor.internals.StoreChangelogReader","message":"stream-thread [myapp-StreamThread-1] Finished restoring all changelogs []"} {code} was (Author: apolyakov): It's possible this got missed as part of KAFKA-9113? I'm seeing many messages like this per second in our 2.6.1 Kafka Streams application logs: {code} {"timestamp":{"seconds":1620165908,"nanos":76900},"thread":"myapp-StreamThread-1","severity":"DEBUG","loggerName":"org.apache.kafka.streams.processor.internals.StoreChangelogReader","message":"stream-thread [myapp-StreamThread-1] Finished restoring all changelogs []"} {code} > Pause completed partitions on restore consumer > -- > > Key: KAFKA-9177 > URL: https://issues.apache.org/jira/browse/KAFKA-9177 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > The StoreChangelogReader is responsible for tracking and restoring active > tasks, but once a store has finished restoring it will continue polling for > records on that partition. > Ordinarily this doesn't make a difference as a store is not completely > restored until its entire changelog has been read, so there are no more > records for poll to return anyway. But if the restoring state is actually an > optimized source KTable, the changelog is just the source topic and poll will > keep returning records for that partition until all stores have been restored. > Note that this isn't a correctness issue since it's just the restore > consumer, but it is wasteful to be polling for records and throwing them > away. We should pause completed partitions in StoreChangelogReader so we > don't slow down the restore consumer in reading from the unfinished changelog > topics, and avoid wasted network. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch merged pull request #10014: KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes
rhauch merged pull request #10014: URL: https://github.com/apache/kafka/pull/10014 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339862#comment-17339862 ] Randall Hauch commented on KAFKA-10340: --- I cherry-picked the original PR (https://github.com/apache/kafka/pull/10016) to the `2.8` branch (now that it's not frozen) and updated the fixed versions. This completes all of the planned work for this issue. > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0 >Reporter: Arjun Satish >Assignee: Chris Egerton >Priority: Major > Fix For: 3.0.0, 2.7.1, 2.6.2, 2.8.1 > > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-10340: -- Fix Version/s: 2.8.1 > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0 >Reporter: Arjun Satish >Assignee: Chris Egerton >Priority: Major > Fix For: 3.0.0, 2.7.1, 2.6.2, 2.8.1 > > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12717) Remove internal converter config properties
[ https://issues.apache.org/jira/browse/KAFKA-12717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339833#comment-17339833 ] Chris Egerton commented on KAFKA-12717: --- Filed [https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Removal+of+Connect%27s+internal+converter+properties] for these changes. > Remove internal converter config properties > --- > > Key: KAFKA-12717 > URL: https://issues.apache.org/jira/browse/KAFKA-12717 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Labels: needs-kip > > KAFKA-5540 / > [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig] > deprecated but did not officially remove Connect's internal converter worker > config properties. With the upcoming 3.0 release, we can make the > backwards-incompatible change of completely removing these properties once > and for all. > > One migration path for users who may still be running Connect clusters with > different internal converters can be: > # Stop all workers on the cluster > # For each internal topic (config, offsets, and status): > ## Create a new topic to take the place of the existing one > ## For every message in the existing topic: > ### Deserialize the message's key and value using the Connect cluster's old > internal key and value converters > ### Serialize the message's key and value using the [JSON > converter|https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java] > with schemas disabled (by setting the {{schemas.enable}} property to > {{false}}) > ### Write a message with the new key and value to the new internal topic > # Reconfigure each Connect worker to use the newly-created internal topics > from step 2 > # Start all workers on the cluster -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339825#comment-17339825 ] A. Sophie Blee-Goldman commented on KAFKA-9295: --- [~showuon] are you interested in submitting another PR to increase the timeout of startApplicationAndWaitUntilRunning? If not, or of you can't get to it just yet, you can unassign yourself from the ticket in case someone else is able to pick this up before you have time :) > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > -- > > Key: KAFKA-9295 > URL: https://issues.apache.org/jira/browse/KAFKA-9295 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.4.0, 2.6.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/] > {quote}java.lang.AssertionError: Did not receive all 1 records from topic > output- within 6 ms Expected: is a value equal to or greater than <1> > but: <0> was less than <1> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #10632: MINOR: fix streams_broker_compatibility_test.py
chia7712 commented on pull request #10632: URL: https://github.com/apache/kafka/pull/10632#issuecomment-832903232 > I'm just going to go ahead and merge this so we can get the system tests fixed ASAP. Hope you don't mind thanks for merging this patch :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10632: MINOR: fix streams_broker_compatibility_test.py
ableegoldman merged pull request #10632: URL: https://github.com/apache/kafka/pull/10632 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10632: MINOR: fix streams_broker_compatibility_test.py
ableegoldman commented on pull request #10632: URL: https://github.com/apache/kafka/pull/10632#issuecomment-832901885 @chia7712 I'm just going to go ahead and merge this so we can get the system tests fixed ASAP. Hope you don't mind, and thank you again for the PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #10238: KAFKA-10340: Backport proactively close producer when cancelling source tasks
rhauch commented on pull request #10238: URL: https://github.com/apache/kafka/pull/10238#issuecomment-832898998 Closed without merging. Instead, I cherry-picked the commit from #10016 to the `2.8` branch now that 2.8.0 is out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch closed pull request #10238: KAFKA-10340: Backport proactively close producer when cancelling source tasks
rhauch closed pull request #10238: URL: https://github.com/apache/kafka/pull/10238 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on pull request #10631: MINOR: Stop using hamcrest in system tests
lct45 commented on pull request #10631: URL: https://github.com/apache/kafka/pull/10631#issuecomment-832873282 kicked off https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4492/, if it passes this fix is good enough for now, if it fails then the missing dependencies are bigger than just hamcrest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied
guozhangwang commented on pull request #10613: URL: https://github.com/apache/kafka/pull/10613#issuecomment-832870539 Merged to trunk, thanks @spena -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied
guozhangwang merged pull request #10613: URL: https://github.com/apache/kafka/pull/10613 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12755) Add server-common, server-tools gradle modules
[ https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-12755: Assignee: Colin McCabe > Add server-common, server-tools gradle modules > -- > > Key: KAFKA-12755 > URL: https://issues.apache.org/jira/browse/KAFKA-12755 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Attachments: out.jpg, out2.jpg > > > *Problems* > The core module takes a long time to compile. There are several reasons for > this. One is that it’s too big -- it would be better as several gradle > modules. Gradle is good about compiling multiple modules in parallel, but if > you have one really big module, you lose that parallelism. Another issue > with the core module is that it’s written in Scala, and the Scala compiler > takes longer than the Java one. > A lot of server-side code is in the “clients” module. From there, it ends up > on the CLASSPATH of producers, consumers, and admin clients. This has a lot > of bad effects: it bloats the size of the clients jar, and allows downstream > projects to peek at code that should be isolated to the broker. > A lot of tools can’t be put into the “tools” module because they depend on > classes that are in “core”. And tools can’t have a core dependency, because > that would impose a core dependency on connect as well. > One example of this problem is StorageTool and ClusterTool. These tools > ended up getting written in Scala and put in the “core” module, rather than > the “tools” module. > Our long-term goal is to migrate from Scala to Java, and the monolithic core > module is an obstacle to that. > *Proposed Fixes* > Rename the “metadata” module to “controller” to reflect the fact that it > contains the controller > Make the "controller" module depend on "raft" rather than the other way > around ("raft" used to depend on "metadata") This reflects the fact that the > controller consumes the API provided by the raft module. (There is a > separate PR to do this.) > Create a new “server-common” module for common code which is shared by > several server modules, but not needed for clients. > Remove the dependency between "connect" and "tools" > Create a new “server-tools“ module which depends on “core” > *The Server-Common Module* > The server-common module should contain: > * Pluggable APIs that are used only in the server (not in any client) > * The KIP-405 tiered storage APIs > * Authorizer APIs > * CreateTopicPolicy, AlterConfigPolicy, etc. > * Common Java utility code that is used in the server, but not used in the > client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r626749507 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -661,11 +661,21 @@ class KafkaApis(val requestChannel: RequestChannel, val versionId = request.header.apiVersion val clientId = request.header.clientId val fetchRequest = request.body[FetchRequest] +val (topicIds, topicNames) = + if (fetchRequest.version() >= 13) +metadataCache.topicIdInfo() + else +(Collections.emptyMap[String, Uuid](), Collections.emptyMap[Uuid, String]()) + val fetchContext = fetchManager.newContext( Review comment: I ended up deciding to end the session and throw a top level error when we have an unknown topic ID. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
guozhangwang commented on pull request #10568: URL: https://github.com/apache/kafka/pull/10568#issuecomment-832855351 I'm re-triggering the unit tests again, @cadonna lmk if you think one green is sufficient (i.e. if in the past we are likely to hit one virtual function with one run). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
guozhangwang commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626735704 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java ## @@ -189,16 +199,6 @@ public void shouldCloseStateStoresOnClose() throws Exception { assertFalse(globalStore.isOpen()); } -@Test -public void shouldTransitionToDeadOnClose() throws Exception { Review comment: Thx! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules
[ https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12755: - Description: *Problems* The core module takes a long time to compile. There are several reasons for this. One is that it’s too big -- it would be better as several gradle modules. Gradle is good about compiling multiple modules in parallel, but if you have one really big module, you lose that parallelism. Another issue with the core module is that it’s written in Scala, and the Scala compiler takes longer than the Java one. A lot of server-side code is in the “clients” module. From there, it ends up on the CLASSPATH of producers, consumers, and admin clients. This has a lot of bad effects: it bloats the size of the clients jar, and allows downstream projects to peek at code that should be isolated to the broker. A lot of tools can’t be put into the “tools” module because they depend on classes that are in “core”. And tools can’t have a core dependency, because that would impose a core dependency on connect as well. One example of this problem is StorageTool and ClusterTool. These tools ended up getting written in Scala and put in the “core” module, rather than the “tools” module. Our long-term goal is to migrate from Scala to Java, and the monolithic core module is an obstacle to that. *Proposed Fixes* Rename the “metadata” module to “controller” to reflect the fact that it contains the controller Make the "controller" module depend on "raft" rather than the other way around ("raft" used to depend on "metadata") This reflects the fact that the controller consumes the API provided by the raft module. (There is a separate PR to do this.) Create a new “server-common” module for common code which is shared by several server modules, but not needed for clients. Remove the dependency between "connect" and "tools" Create a new “server-tools“ module which depends on “core” *The Server-Common Module* The server-common module should contain: * Pluggable APIs that are used only in the server (not in any client) * The KIP-405 tiered storage APIs * Authorizer APIs * CreateTopicPolicy, AlterConfigPolicy, etc. * Common Java utility code that is used in the server, but not used in the client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. was: *Problems* The core module takes a long time to compile. There are several reasons for this. One is that it’s too big -- it would be better as several gradle modules. Gradle is good about compiling multiple modules in parallel, but if you have one really big module, you lose that parallelism. Another issue with the core module is that it’s written in Scala, and the Scala compiler takes longer than the Java one. A lot of server-side code is in the “clients” module. From there, it ends up on the CLASSPATH of producers, consumers, and admin clients. This has a lot of bad effects: it bloats the size of the clients jar, and allows downstream projects to peek at code that should be isolated to the broker. A lot of tools can’t be put into the “tools” module because they depend on classes that are in “core”. And tools can’t have a core dependency, because that would impose a core dependency on connect as well. One example of this problem is StorageTool and ClusterTool. These tools ended up getting written in Scala and put in the “core” module, rather than the “tools” module. *Proposed Fixes* Rename the “metadata” module to “controller” to reflect the fact that it contains the controller Make the "controller" module depend on "raft" rather than the other way around ("raft" used to depend on "metadata") This reflects the fact that the controller consumes the API provided by the raft module. (There is a separate PR to do this.) Create a new “server-common” module for common code which is shared by several server modules, but not needed for clients. Remove the dependency between "connect" and "tools" Create a new “server-tools“ module which depends on “core” *The Server-Common Module* The server-common module should contain: * Pluggable APIs that are used only in the server (not in any client) * The KIP-405 tiered storage APIs * Authorizer APIs * CreateTopicPolicy, AlterConfigPolicy, etc. * Common Java utility code that is used in the server, but not used in the client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. > Add server-common, server-tools gradle modules > -- > > Key: KAFKA-12755 > URL: https://issues.apache.org/jira/browse/KAFKA-12755 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Attachments: out.jpg, out2.jpg > > > *Problems* > The core module takes a long time to compile. There are several reasons
[jira] [Commented] (KAFKA-12755) Add server-common, server-tools gradle modules
[ https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339766#comment-17339766 ] Kowshik Prakasam commented on KAFKA-12755: -- cc [~satishd] with whom we discussed some of these today. > Add server-common, server-tools gradle modules > -- > > Key: KAFKA-12755 > URL: https://issues.apache.org/jira/browse/KAFKA-12755 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Attachments: out.jpg, out2.jpg > > > *Problems* > The core module takes a long time to compile. There are several reasons for > this. One is that it’s too big -- it would be better as several gradle > modules. Gradle is good about compiling multiple modules in parallel, but if > you have one really big module, you lose that parallelism. Another issue > with the core module is that it’s written in Scala, and the Scala compiler > takes longer than the Java one. > A lot of server-side code is in the “clients” module. From there, it ends up > on the CLASSPATH of producers, consumers, and admin clients. This has a lot > of bad effects: it bloats the size of the clients jar, and allows downstream > projects to peek at code that should be isolated to the broker. > A lot of tools can’t be put into the “tools” module because they depend on > classes that are in “core”. And tools can’t have a core dependency, because > that would impose a core dependency on connect as well. > One example of this problem is StorageTool and ClusterTool. These tools > ended up getting written in Scala and put in the “core” module, rather than > the “tools” module. > *Proposed Fixes* > Rename the “metadata” module to “controller” to reflect the fact that it > contains the controller > Make the "controller" module depend on "raft" rather than the other way > around ("raft" used to depend on "metadata") This reflects the fact that the > controller consumes the API provided by the raft module. (There is a > separate PR to do this.) > Create a new “server-common” module for common code which is shared by > several server modules, but not needed for clients. > Remove the dependency between "connect" and "tools" > Create a new “server-tools“ module which depends on “core” > *The Server-Common Module* > The server-common module should contain: > * Pluggable APIs that are used only in the server (not in any client) > * The KIP-405 tiered storage APIs > * Authorizer APIs > * CreateTopicPolicy, AlterConfigPolicy, etc. > * Common Java utility code that is used in the server, but not used in the > client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules
[ https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12755: - Description: *Problems* The core module takes a long time to compile. There are several reasons for this. One is that it’s too big -- it would be better as several gradle modules. Gradle is good about compiling multiple modules in parallel, but if you have one really big module, you lose that parallelism. Another issue with the core module is that it’s written in Scala, and the Scala compiler takes longer than the Java one. A lot of server-side code is in the “clients” module. From there, it ends up on the CLASSPATH of producers, consumers, and admin clients. This has a lot of bad effects: it bloats the size of the clients jar, and allows downstream projects to peek at code that should be isolated to the broker. A lot of tools can’t be put into the “tools” module because they depend on classes that are in “core”. And tools can’t have a core dependency, because that would impose a core dependency on connect as well. One example of this problem is StorageTool and ClusterTool. These tools ended up getting written in Scala and put in the “core” module, rather than the “tools” module. *Proposed Fixes* Rename the “metadata” module to “controller” to reflect the fact that it contains the controller Make the "controller" module depend on "raft" rather than the other way around ("raft" used to depend on "metadata") This reflects the fact that the controller consumes the API provided by the raft module. (There is a separate PR to do this.) Create a new “server-common” module for common code which is shared by several server modules, but not needed for clients. Remove the dependency between "connect" and "tools" Create a new “server-tools“ module which depends on “core” *The Server-Common Module* The server-common module should contain: * Pluggable APIs that are used only in the server (not in any client) * The KIP-405 tiered storage APIs * Authorizer APIs * CreateTopicPolicy, AlterConfigPolicy, etc. * Common Java utility code that is used in the server, but not used in the client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. was: *Problems* The core module takes a long time to compile. There are several reasons for this. One is that it’s too big -- it would be better as several gradle modules. Gradle is good about compiling multiple modules in parallel, but if you have one really big module, you lose that parallelism. Another issue with the core module is that it’s written in Scala, and the Scala compiler takes longer than the Java one. A lot of server-side code is in the “clients” module. From there, it ends up on the CLASSPATH of producers, consumers, and admin clients. This has a lot of bad effects: it bloats the size of the clients jar, and allows downstream projects to peek at code that should be isolated to the broker. A lot of tools can’t be put into the “tools” module because they depend on classes that are in “core”. And tools can’t have a core dependency, because that would impose a core dependency on connect as well. One example of this problem is StorageTool and ClusterTool. These tools ended up getting written in Scala and put in the “core” module, rather than the “tools” module. *Proposed Fixes* Rename the “metadata” module to “controller” to reflect the fact that it contains the controller Make the "controller" module depend on "raft" rather than the other way around ("raft" used to depend on "metadata") This reflects the fact that the controller consumes the API provided by the raft module. (There is a separate PR to do this.) Create a new “server-common” module for common code which is shared by several server modules, but not needed for clients. Remove the dependency between "connect" and "tools" Create a new “server-tools“ module which depends on “core” *The Server-Common Module* The server-common module should contain: * Pluggable APIs that are used only in the server (not in any client) * The KIP-405 tiered storage APIs * Authorizer APIs * CreateTopicPolicy, AlterConfigPolicy, etc. * Common Java utility code that is used in the server, but not used in the client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. > Add server-common, server-tools gradle modules > -- > > Key: KAFKA-12755 > URL: https://issues.apache.org/jira/browse/KAFKA-12755 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Attachments: out.jpg, out2.jpg > > > *Problems* > The core module takes a long time to compile. There are several reasons for > this. One is that it’s too big -- it would be better as several gradle > modules. Gradle is good
[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules
[ https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12755: - Description: *Problems* The core module takes a long time to compile. There are several reasons for this. One is that it’s too big -- it would be better as several gradle modules. Gradle is good about compiling multiple modules in parallel, but if you have one really big module, you lose that parallelism. Another issue with the core module is that it’s written in Scala, and the Scala compiler takes longer than the Java one. A lot of server-side code is in the “clients” module. From there, it ends up on the CLASSPATH of producers, consumers, and admin clients. This has a lot of bad effects: it bloats the size of the clients jar, and allows downstream projects to peek at code that should be isolated to the broker. A lot of tools can’t be put into the “tools” module because they depend on classes that are in “core”. And tools can’t have a core dependency, because that would impose a core dependency on connect as well. One example of this problem is StorageTool and ClusterTool. These tools ended up getting written in Scala and put in the “core” module, rather than the “tools” module. *Proposed Fixes* Rename the “metadata” module to “controller” to reflect the fact that it contains the controller Make the "controller" module depend on "raft" rather than the other way around ("raft" used to depend on "metadata") This reflects the fact that the controller consumes the API provided by the raft module. (There is a separate PR to do this.) Create a new “server-common” module for common code which is shared by several server modules, but not needed for clients. Remove the dependency between "connect" and "tools" Create a new “server-tools“ module which depends on “core” *The Server-Common Module* The server-common module should contain: * Pluggable APIs that are used only in the server (not in any client) * The KIP-405 tiered storage APIs * Authorizer APIs * CreateTopicPolicy, AlterConfigPolicy, etc. * Common Java utility code that is used in the server, but not used in the client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. was: *Problems* The core module takes a long time to compile. There are several reasons for this. One is that it’s too big -- it would be better as several gradle modules. Gradle is good about compiling multiple modules in parallel, but if you have one really big module, you lose that parallelism. Another issue with the core module is that it’s written in Scala, and the Scala compiler takes longer than the Java one. A lot of server-side code is in the “clients” module. From there, it ends up on the CLASSPATH of producers, consumers, and admin clients. This has a lot of bad effects: it bloats the size of the clients jar, and allows downstream projects to peek at code that should be isolated to the broker. A lot of tools can’t be put into the “tools” module because they depend on classes that are in “core”. And tools can’t have a core dependency, because that would impose a core dependency on connect as well. One example of this problem is StorageTool and ClusterTool. These tools ended up getting written in Scala and put in the “core” module, rather than the “tools” module. *Proposed Fixes* Rename the “metadata” module to “controller” to reflect the fact that it contains the controller Make the "controller" module depend on "raft" rather than the other way around ("raft" used to depend on "metadata") This reflects the fact that the controller consumes the API provided by the raft module. (There is a separate PR to do this.) Create a new “server-common” module for common code which is shared by several server modules, but not needed for clients. Remove the dependency between "connect" and "tools" Create a new “server-tools“ module which depends on “core” *The Server-Common Module* The server-common module should contain: Pluggable APIs that are used only in the server (not in any client) The KIP-405 tiered storage APIs Authorizer APIs CreateTopicPolicy, AlterConfigPolicy, etc. Common Java utility code that is used in the server, but not used in the client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. > Add server-common, server-tools gradle modules > -- > > Key: KAFKA-12755 > URL: https://issues.apache.org/jira/browse/KAFKA-12755 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Attachments: out.jpg, out2.jpg > > > *Problems* > The core module takes a long time to compile. There are several reasons for > this.
[jira] [Commented] (KAFKA-12755) Add server-common, server-tools gradle modules
[ https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339765#comment-17339765 ] Colin McCabe commented on KAFKA-12755: -- out.jpg reflects the current dependencies out2.jpg reflects the proposed dependencies > Add server-common, server-tools gradle modules > -- > > Key: KAFKA-12755 > URL: https://issues.apache.org/jira/browse/KAFKA-12755 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Attachments: out.jpg, out2.jpg > > > *Problems* > The core module takes a long time to compile. There are several > reasons for this. One is that it’s too big -- it would be better as several > gradle modules. Gradle is good about compiling multiple modules in parallel, > but if you have one really big module, you lose that parallelism. Another > issue with the core module is that it’s written in Scala, and the Scala > compiler takes longer than the Java one. > A lot of server-side code is in the “clients” module. From there, it > ends up on the CLASSPATH of producers, consumers, and admin clients. This > has a lot of bad effects: it bloats the size of the clients jar, and allows > downstream projects to peek at code that should be isolated to the broker. > A lot of tools can’t be put into the “tools” module because they depend > on classes that are in “core”. And tools can’t have a core dependency, > because that would impose a core dependency on connect as well. > One example of this problem is StorageTool and ClusterTool. These > tools ended up getting written in Scala and put in the “core” module, rather > than the “tools” module. > *Proposed Fixes* > Rename the “metadata” module to “controller” to reflect the fact that > it contains the controller > Make the "controller" module depend on "raft" rather than the other way > around ("raft" used to depend on "metadata") This reflects the fact that the > controller consumes the API provided by the raft module. (There is a > separate PR to do this.) > Create a new “server-common” module for common code which is shared by > several server modules, but not needed for clients. > Remove the dependency between "connect" and "tools" > Create a new “server-tools“ module which depends on “core” > *The Server-Common Module* > The server-common module should contain: > Pluggable APIs that are used only in the server (not in any client) > The KIP-405 tiered storage APIs > Authorizer APIs > CreateTopicPolicy, AlterConfigPolicy, etc. > Common Java utility code that is used in the server, but not used in > the client, such as > ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules
[ https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12755: - Attachment: out.jpg > Add server-common, server-tools gradle modules > -- > > Key: KAFKA-12755 > URL: https://issues.apache.org/jira/browse/KAFKA-12755 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Attachments: out.jpg, out2.jpg > > > *Problems* > The core module takes a long time to compile. There are several > reasons for this. One is that it’s too big -- it would be better as several > gradle modules. Gradle is good about compiling multiple modules in parallel, > but if you have one really big module, you lose that parallelism. Another > issue with the core module is that it’s written in Scala, and the Scala > compiler takes longer than the Java one. > A lot of server-side code is in the “clients” module. From there, it > ends up on the CLASSPATH of producers, consumers, and admin clients. This > has a lot of bad effects: it bloats the size of the clients jar, and allows > downstream projects to peek at code that should be isolated to the broker. > A lot of tools can’t be put into the “tools” module because they depend > on classes that are in “core”. And tools can’t have a core dependency, > because that would impose a core dependency on connect as well. > One example of this problem is StorageTool and ClusterTool. These > tools ended up getting written in Scala and put in the “core” module, rather > than the “tools” module. > *Proposed Fixes* > Rename the “metadata” module to “controller” to reflect the fact that > it contains the controller > Make the "controller" module depend on "raft" rather than the other way > around ("raft" used to depend on "metadata") This reflects the fact that the > controller consumes the API provided by the raft module. (There is a > separate PR to do this.) > Create a new “server-common” module for common code which is shared by > several server modules, but not needed for clients. > Remove the dependency between "connect" and "tools" > Create a new “server-tools“ module which depends on “core” > *The Server-Common Module* > The server-common module should contain: > Pluggable APIs that are used only in the server (not in any client) > The KIP-405 tiered storage APIs > Authorizer APIs > CreateTopicPolicy, AlterConfigPolicy, etc. > Common Java utility code that is used in the server, but not used in > the client, such as > ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules
[ https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12755: - Attachment: out2.jpg > Add server-common, server-tools gradle modules > -- > > Key: KAFKA-12755 > URL: https://issues.apache.org/jira/browse/KAFKA-12755 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Attachments: out.jpg, out2.jpg > > > *Problems* > The core module takes a long time to compile. There are several > reasons for this. One is that it’s too big -- it would be better as several > gradle modules. Gradle is good about compiling multiple modules in parallel, > but if you have one really big module, you lose that parallelism. Another > issue with the core module is that it’s written in Scala, and the Scala > compiler takes longer than the Java one. > A lot of server-side code is in the “clients” module. From there, it > ends up on the CLASSPATH of producers, consumers, and admin clients. This > has a lot of bad effects: it bloats the size of the clients jar, and allows > downstream projects to peek at code that should be isolated to the broker. > A lot of tools can’t be put into the “tools” module because they depend > on classes that are in “core”. And tools can’t have a core dependency, > because that would impose a core dependency on connect as well. > One example of this problem is StorageTool and ClusterTool. These > tools ended up getting written in Scala and put in the “core” module, rather > than the “tools” module. > *Proposed Fixes* > Rename the “metadata” module to “controller” to reflect the fact that > it contains the controller > Make the "controller" module depend on "raft" rather than the other way > around ("raft" used to depend on "metadata") This reflects the fact that the > controller consumes the API provided by the raft module. (There is a > separate PR to do this.) > Create a new “server-common” module for common code which is shared by > several server modules, but not needed for clients. > Remove the dependency between "connect" and "tools" > Create a new “server-tools“ module which depends on “core” > *The Server-Common Module* > The server-common module should contain: > Pluggable APIs that are used only in the server (not in any client) > The KIP-405 tiered storage APIs > Authorizer APIs > CreateTopicPolicy, AlterConfigPolicy, etc. > Common Java utility code that is used in the server, but not used in > the client, such as > ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules
[ https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12755: - Description: *Problems* The core module takes a long time to compile. There are several reasons for this. One is that it’s too big -- it would be better as several gradle modules. Gradle is good about compiling multiple modules in parallel, but if you have one really big module, you lose that parallelism. Another issue with the core module is that it’s written in Scala, and the Scala compiler takes longer than the Java one. A lot of server-side code is in the “clients” module. From there, it ends up on the CLASSPATH of producers, consumers, and admin clients. This has a lot of bad effects: it bloats the size of the clients jar, and allows downstream projects to peek at code that should be isolated to the broker. A lot of tools can’t be put into the “tools” module because they depend on classes that are in “core”. And tools can’t have a core dependency, because that would impose a core dependency on connect as well. One example of this problem is StorageTool and ClusterTool. These tools ended up getting written in Scala and put in the “core” module, rather than the “tools” module. *Proposed Fixes* Rename the “metadata” module to “controller” to reflect the fact that it contains the controller Make the "controller" module depend on "raft" rather than the other way around ("raft" used to depend on "metadata") This reflects the fact that the controller consumes the API provided by the raft module. (There is a separate PR to do this.) Create a new “server-common” module for common code which is shared by several server modules, but not needed for clients. Remove the dependency between "connect" and "tools" Create a new “server-tools“ module which depends on “core” *The Server-Common Module* The server-common module should contain: Pluggable APIs that are used only in the server (not in any client) The KIP-405 tiered storage APIs Authorizer APIs CreateTopicPolicy, AlterConfigPolicy, etc. Common Java utility code that is used in the server, but not used in the client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. > Add server-common, server-tools gradle modules > -- > > Key: KAFKA-12755 > URL: https://issues.apache.org/jira/browse/KAFKA-12755 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > > *Problems* > The core module takes a long time to compile. There are several > reasons for this. One is that it’s too big -- it would be better as several > gradle modules. Gradle is good about compiling multiple modules in parallel, > but if you have one really big module, you lose that parallelism. Another > issue with the core module is that it’s written in Scala, and the Scala > compiler takes longer than the Java one. > A lot of server-side code is in the “clients” module. From there, it > ends up on the CLASSPATH of producers, consumers, and admin clients. This > has a lot of bad effects: it bloats the size of the clients jar, and allows > downstream projects to peek at code that should be isolated to the broker. > A lot of tools can’t be put into the “tools” module because they depend > on classes that are in “core”. And tools can’t have a core dependency, > because that would impose a core dependency on connect as well. > One example of this problem is StorageTool and ClusterTool. These > tools ended up getting written in Scala and put in the “core” module, rather > than the “tools” module. > *Proposed Fixes* > Rename the “metadata” module to “controller” to reflect the fact that > it contains the controller > Make the "controller" module depend on "raft" rather than the other way > around ("raft" used to depend on "metadata") This reflects the fact that the > controller consumes the API provided by the raft module. (There is a > separate PR to do this.) > Create a new “server-common” module for common code which is shared by > several server modules, but not needed for clients. > Remove the dependency between "connect" and "tools" > Create a new “server-tools“ module which depends on “core” > *The Server-Common Module* > The server-common module should contain: > Pluggable APIs that are used only in the server (not in any client) > The KIP-405 tiered storage APIs > Authorizer APIs > CreateTopicPolicy, AlterConfigPolicy, etc. > Common Java utility code that is used in the server, but not used in > the client, such as > ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc. -- This
[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
cadonna commented on pull request #10568: URL: https://github.com/apache/kafka/pull/10568#issuecomment-832838475 The build fail due to known flaky tests but not due to SIGABRT. 拾 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata
wcarlson5 commented on pull request #10634: URL: https://github.com/apache/kafka/pull/10634#issuecomment-832838663 @rodesai @abbccdda @ableegoldman I had to make a couple changes to the task metadata to improve when the end offset was updated. Now we get it at poll phase which should give us the highest offset streams has seen at any point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339760#comment-17339760 ] Bruno Cadonna commented on KAFKA-9295: -- Failed multiple times for a PR (https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10568/5/testReport/) {code:java} java.lang.AssertionError: Application did not reach a RUNNING state for all streams instances. Non-running instances: {org.apache.kafka.streams.KafkaStreams@9cbb96e=REBALANCING, org.apache.kafka.streams.KafkaStreams@2e11b5=REBALANCING} at org.junit.Assert.fail(Assert.java:89) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning(IntegrationTestUtils.java:904) at org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:197) at org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:185) {code} > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > -- > > Key: KAFKA-9295 > URL: https://issues.apache.org/jira/browse/KAFKA-9295 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.4.0, 2.6.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/] > {quote}java.lang.AssertionError: Did not receive all 1 records from topic > output- within 6 ms Expected: is a value equal to or greater than <1> > but: <0> was less than <1> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12755) Add server-common, server-tools gradle modules
Colin McCabe created KAFKA-12755: Summary: Add server-common, server-tools gradle modules Key: KAFKA-12755 URL: https://issues.apache.org/jira/browse/KAFKA-12755 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 opened a new pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata
wcarlson5 opened a new pull request #10634: URL: https://github.com/apache/kafka/pull/10634 Improve endOffsets for TaskMetadata also add an int test for TaskMetadata offset collections ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10630: MINOR: Stop logging raw record contents above TRACE level in WorkerSourceTask
C0urante commented on pull request #10630: URL: https://github.com/apache/kafka/pull/10630#issuecomment-832831207 Thanks @tombentley--good point, I agree that the record content is probably not necessary there and think it's safest to remove the record content from the message. I've altered it to reference only the topic and partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10154) Issue in updating metadata if not exists during sending message to different topics
[ https://issues.apache.org/jira/browse/KAFKA-10154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339746#comment-17339746 ] Oleksandr Tomchakov commented on KAFKA-10154: - Hi, I've created test-case: [https://github.com/apache/kafka/compare/trunk...altomch:bug/KAFKA-10154-reproducer] deadlock is here: !Screenshot 2021-05-05 at 19.07.26.png|width=508,height=355! > Issue in updating metadata if not exists during sending message to different > topics > --- > > Key: KAFKA-10154 > URL: https://issues.apache.org/jira/browse/KAFKA-10154 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.5.0 >Reporter: Dipti Gupta >Priority: Major > Attachments: Screenshot 2021-05-05 at 19.07.26.png > > > Project with following behaviour at : > [https://github.com/DiptiGupta/kafka-producer-issue] > > I took reference to this fixed issue > https://issues.apache.org/jira/browse/KAFKA-8623 > But on latest version, > I'm getting following exception during sending messages to different topics > i.e. Topic1 and Topic2. > It's causing exception for once when metadata for *`Topic2`* doesn't exist. > > {code:java} > org.springframework.kafka.KafkaException: Send failed; nested exception is > org.apache.kafka.common.errors.TimeoutException: Topic Topic2 not present in > metadata after 1 ms.org.springframework.kafka.KafkaException: Send > failed; nested exception is org.apache.kafka.common.errors.TimeoutException: > Topic Topic2 not present in metadata after 1 ms. at > org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) > ~[spring-kafka-2.5.1.RELEASE.jar:2.5.1.RELEASE]{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10154) Issue in updating metadata if not exists during sending message to different topics
[ https://issues.apache.org/jira/browse/KAFKA-10154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Tomchakov updated KAFKA-10154: Attachment: Screenshot 2021-05-05 at 19.07.26.png > Issue in updating metadata if not exists during sending message to different > topics > --- > > Key: KAFKA-10154 > URL: https://issues.apache.org/jira/browse/KAFKA-10154 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.5.0 >Reporter: Dipti Gupta >Priority: Major > Attachments: Screenshot 2021-05-05 at 19.07.26.png > > > Project with following behaviour at : > [https://github.com/DiptiGupta/kafka-producer-issue] > > I took reference to this fixed issue > https://issues.apache.org/jira/browse/KAFKA-8623 > But on latest version, > I'm getting following exception during sending messages to different topics > i.e. Topic1 and Topic2. > It's causing exception for once when metadata for *`Topic2`* doesn't exist. > > {code:java} > org.springframework.kafka.KafkaException: Send failed; nested exception is > org.apache.kafka.common.errors.TimeoutException: Topic Topic2 not present in > metadata after 1 ms.org.springframework.kafka.KafkaException: Send > failed; nested exception is org.apache.kafka.common.errors.TimeoutException: > Topic Topic2 not present in metadata after 1 ms. at > org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) > ~[spring-kafka-2.5.1.RELEASE.jar:2.5.1.RELEASE]{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read
Walker Carlson created KAFKA-12754: -- Summary: TaskMetadata endOffsets does not update when the offsets are read Key: KAFKA-12754 URL: https://issues.apache.org/jira/browse/KAFKA-12754 Project: Kafka Issue Type: Bug Components: streams Reporter: Walker Carlson Assignee: Walker Carlson The high water mark in StreamTask is not updated optimally. Also it would be good to have the metadata offsets have a initial value of -1 instead of an empty map that way the set of TopicPartitions won't change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12753) Add configuration to prevent MM2 from automatically creating topics on target cluster
Dave Beech created KAFKA-12753: -- Summary: Add configuration to prevent MM2 from automatically creating topics on target cluster Key: KAFKA-12753 URL: https://issues.apache.org/jira/browse/KAFKA-12753 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Dave Beech In a scenario where kafka topics and configuration are usually created and managed by an infrastructure-as-code approach (ansible, terraform etc) it might be desirable to prevent MM2 creating or syncing any resources by itself. We already provide config options to disable refreshing/syncing of topics, but currently have no control over their initial creation. Adding new config parameters for this would fill the gap. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12730) A single Kerberos login failure fails all future connections from Java 9 onwards
[ https://issues.apache.org/jira/browse/KAFKA-12730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-12730: --- Fix Version/s: 2.8.1 2.7.2 2.6.3 2.5.2 > A single Kerberos login failure fails all future connections from Java 9 > onwards > > > Key: KAFKA-12730 > URL: https://issues.apache.org/jira/browse/KAFKA-12730 > Project: Kafka > Issue Type: Bug > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.0.0, 2.5.2, 2.6.3, 2.7.2, 2.8.1 > > > The refresh thread for Kerberos performs re-login by logging out and then > logging in again. If login fails, we retry after a backoff. Every iteration > of the loop performs loginContext.logout() and loginContext.login(). If login > fails, we end up with two consecutive logouts. This used to work, but from > Java 9 onwards, this results in a NullPointerException due to > https://bugs.openjdk.java.net/browse/JDK-8173069. We should check if logout > is required before attempting logout. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10727) Kafka clients throw AuthenticationException during Kerberos re-login
[ https://issues.apache.org/jira/browse/KAFKA-10727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-10727: --- Fix Version/s: 2.7.2 2.6.3 2.5.2 > Kafka clients throw AuthenticationException during Kerberos re-login > > > Key: KAFKA-10727 > URL: https://issues.apache.org/jira/browse/KAFKA-10727 > Project: Kafka > Issue Type: Bug >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.5.2, 2.8.0, 2.6.3, 2.7.2 > > > During Kerberos re-login, we log out and login again. There is a timing issue > where the principal in the Subject has been cleared, but a new one hasn't > been populated yet. We need to ensure that we don't throw > AuthenticationException in this case to avoid Kafka clients > (consumer/producer etc.) failing instead of retrying. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12752) CVE-2021-28168 upgrade jersey to 2.34 or 3.02
John Stacy created KAFKA-12752: -- Summary: CVE-2021-28168 upgrade jersey to 2.34 or 3.02 Key: KAFKA-12752 URL: https://issues.apache.org/jira/browse/KAFKA-12752 Project: Kafka Issue Type: Bug Affects Versions: 2.8.0 Reporter: John Stacy [https://nvd.nist.gov/vuln/detail/CVE-2021-28168] CVE-2021-28168 affects jersey versions <=2.33, <=3.0.1. Upgrading to 2.34 or 3.02 should resolve the issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12751) ISRs remain in in-flight state if proposed state is same as actual state
Rajini Sivaram created KAFKA-12751: -- Summary: ISRs remain in in-flight state if proposed state is same as actual state Key: KAFKA-12751 URL: https://issues.apache.org/jira/browse/KAFKA-12751 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.8.0, 2.7.0, 2.7.1 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.7.2, 2.8.1 If proposed ISR state in an AlterIsr request is the same as the actual state, Controller returns a successful response without performing any updates. But the broker code that processes the response leaves the ISR state in in-flight state without committing. This prevents further ISR updates until the next leader election. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] feyman2016 edited a comment on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 edited a comment on pull request #10593: URL: https://github.com/apache/kafka/pull/10593#issuecomment-832764031 @jsancio Hi, could you help to take a look? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on pull request #10593: URL: https://github.com/apache/kafka/pull/10593#issuecomment-832764031 @jsancio Hi, could you help to take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12429) Serdes for all message types in internal topic which is used in default implementation for RLMM.
[ https://issues.apache.org/jira/browse/KAFKA-12429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-12429. - Fix Version/s: 3.0.0 Resolution: Fixed merged the PR to trunk > Serdes for all message types in internal topic which is used in default > implementation for RLMM. > > > Key: KAFKA-12429 > URL: https://issues.apache.org/jira/browse/KAFKA-12429 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Fix For: 3.0.0 > > > RLMM default implementation is based on storing all the metadata in an > internal topic. > We need serdes and format of the message types that need to be stored in the > topic. > You can see more details in the > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-MessageFormat] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
junrao merged pull request #10271: URL: https://github.com/apache/kafka/pull/10271 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram opened a new pull request #10633: MINOR: Reset AlterIsr in-flight state for duplicate update requests
rajinisivaram opened a new pull request #10633: URL: https://github.com/apache/kafka/pull/10633 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied
spena commented on pull request #10613: URL: https://github.com/apache/kafka/pull/10613#issuecomment-832730649 Thanks @guozhangwang, that seems the culprit of performance problem. In fact, when the performance tests finish, the script takes some time to complete cleaning - I assume it is taking time to clean all consumed memory. I will work on this fix in another follow-up PR, is that ok? I addressed all comments in this so it is ready to merge if you're ok with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #10631: MINOR: Stop using hamcrest in system tests
lct45 commented on a change in pull request #10631: URL: https://github.com/apache/kafka/pull/10631#discussion_r626608131 ## File path: streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java ## @@ -806,20 +799,14 @@ public static boolean verifySync(final String broker, final Instant deadline) th } } -public static void assertThat(final AtomicBoolean pass, - final StringBuilder failures, - final String message, - final T actual, - final Matcher matcher) { -if (!matcher.matches(actual)) { +public static void assertThat(final AtomicBoolean pass, + final StringBuilder failures, + final String message, + final boolean passed) { +if (!passed) { if (failures != null) { -final Description description = new StringDescription(failures); -description.appendText("\n" + message) - .appendText("\nExpected: ") - .appendDescriptionOf(matcher) - .appendText("\n but: "); -matcher.describeMismatch(actual, description); -description.appendText("\n"); +final StringBuffer description = new StringBuffer(failures); +description.append("\n" + message); Review comment: I was following the earlier pattern of `matcher` here, but it doesn't look like we do anything with this description I think it makes more sense to append this to the `failures` that we pass in. I'm not sure if `matcher` did that automatically or if this was just a gap? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
cadonna commented on pull request #10568: URL: https://github.com/apache/kafka/pull/10568#issuecomment-832644717 I increased the RocksDB version since I found a more recent version on maven central. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4
cadonna commented on pull request #10568: URL: https://github.com/apache/kafka/pull/10568#issuecomment-832643004 OK, the other builds failed due to test failures, not due to SIGABRT. Thta is a good sign. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4
cadonna commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626511978 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java ## @@ -189,16 +199,6 @@ public void shouldCloseStateStoresOnClose() throws Exception { assertFalse(globalStore.isOpen()); } -@Test -public void shouldTransitionToDeadOnClose() throws Exception { Review comment: Because test `shouldStopRunningWhenClosedByUser()` above is exactly the same test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12750) kafka.common.serialization.Serializer.serialize() method is ignoring Headers argument
Naresh created KAFKA-12750: -- Summary: kafka.common.serialization.Serializer.serialize() method is ignoring Headers argument Key: KAFKA-12750 URL: https://issues.apache.org/jira/browse/KAFKA-12750 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.7.0 Reporter: Naresh I am going through kafka's builtin serializers and have found that there is a variant of serialize method which accepts Headers and in the interface it doesn't use those. Is this intended ? [https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java#L61] I do see that other datatype based implementations also dont use this method type. Is this method not supported by the kafka protocol ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4
cadonna commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626510307 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java ## @@ -383,8 +386,7 @@ public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersionLatest( } private void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics(final String builtInMetricsVersion) { -final InternalMockProcessorContext context = createInternalMockProcessorContext(builtInMetricsVersion); -processor.init(context); +setup(builtInMetricsVersion, true); Review comment: I do not know. Will set it to `false`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4
cadonna commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626508070 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() { return dbOptions.writeBufferManager(); } +@Override +public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) { +dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes); +return this; +} + +@Override +public long maxWriteBatchGroupSizeBytes() { +return dbOptions.maxWriteBatchGroupSizeBytes(); +} + +@Override +public Options oldDefaults(final int majorVersion, final int minorVersion) { +columnFamilyOptions.oldDefaults(majorVersion, minorVersion); +return this; +} + +@Override +public Options optimizeForSmallDb(final Cache cache) { +return super.optimizeForSmallDb(cache); +} + +@Override +public AbstractCompactionFilter> compactionFilter() { +return columnFamilyOptions.compactionFilter(); +} + +@Override +public AbstractCompactionFilterFactory> compactionFilterFactory() { +return columnFamilyOptions.compactionFilterFactory(); +} + +@Override +public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) { +dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec); +return this; +} + +@Override +public int statsPersistPeriodSec() { +return dbOptions.statsPersistPeriodSec(); +} + +@Override +public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) { +dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize); +return this; +} + +@Override +public long statsHistoryBufferSize() { +return dbOptions.statsHistoryBufferSize(); +} + +@Override +public Options setStrictBytesPerSync(final boolean strictBytesPerSync) { +dbOptions.setStrictBytesPerSync(strictBytesPerSync); +return this; +} + +@Override +public boolean strictBytesPerSync() { +return dbOptions.strictBytesPerSync(); +} + +@Override +public Options setListeners(final List listeners) { +dbOptions.setListeners(listeners); +return this; +} + +@Override +public List listeners() { +return dbOptions.listeners(); +} + +@Override +public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) { +dbOptions.setEnablePipelinedWrite(enablePipelinedWrite); +return this; +} + +@Override +public boolean enablePipelinedWrite() { +return dbOptions.enablePipelinedWrite(); +} + +@Override +public Options setUnorderedWrite(final boolean unorderedWrite) { +dbOptions.setUnorderedWrite(unorderedWrite); +return this; +} + +@Override +public boolean unorderedWrite() { +return dbOptions.unorderedWrite(); +} + +@Override +public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) { + dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen); +return this; +} + +@Override +public boolean skipCheckingSstFileSizesOnDbOpen() { +return dbOptions.skipCheckingSstFileSizesOnDbOpen(); +} + +@Override +public Options setWalFilter(final AbstractWalFilter walFilter) { +dbOptions.setWalFilter(walFilter); +return this; +} + +@Override +public WalFilter walFilter() { +return dbOptions.walFilter(); +} + +@Override +public Options setAllowIngestBehind(final boolean allowIngestBehind) { +dbOptions.setAllowIngestBehind(allowIngestBehind); +return this; +} + +@Override +public boolean allowIngestBehind() { +return dbOptions.allowIngestBehind(); +} + +@Override +public Options setPreserveDeletes(final boolean preserveDeletes) { +dbOptions.setPreserveDeletes(preserveDeletes); +return this; +} + +@Override +public boolean preserveDeletes() { +return dbOptions.preserveDeletes(); +} + +@Override +public Options setTwoWriteQueues(final boolean twoWriteQueues) { +dbOptions.setTwoWriteQueues(twoWriteQueues); +return this; +} + +@Override +public boolean twoWriteQueues() { +return dbOptions.twoWriteQueues(); +} + +@Override +public Options setManualWalFlush(final boolean manualWalFlush) { +dbOptions.setManualWalFlush(manualWalFlush); +return this; +} + +@Override +public boolean manualWalFlush() { +return dbOptions.manualWalFlush(); +} + +@Override +public Options setCfPaths(final
[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4
cadonna commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626506480 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -99,35 +104,7 @@ public Env getEnv() { @Override public Options prepareForBulkLoad() { -/* From https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ - * - * Q: What's the fastest way to load data into RocksDB? - * - * A: A fast way to direct insert data to the DB: - * - * 1. using single writer thread and insert in sorted order - * 2. batch hundreds of keys into one write batch - * 3. use vector memtable - * 4. make sure options.max_background_flushes is at least 4 - * 5. before inserting the data, - * disable automatic compaction, - * set options.level0_file_num_compaction_trigger, - * options.level0_slowdown_writes_trigger - * and options.level0_stop_writes_trigger to very large. - * After inserting all the data, issue a manual compaction. - * - * 3-5 will be automatically done if you call Options::PrepareForBulkLoad() to your option - */ -// (1) not in our control -// (2) is done via bulk-loading API -// (3) skipping because, not done in actual PrepareForBulkLoad() code in https://github.com/facebook/rocksdb/blob/master/options/options.cc -//columnFamilyOptions.setMemTableConfig(new VectorMemTableConfig()); -// (4-5) below: -dbOptions.setMaxBackgroundFlushes(4); -columnFamilyOptions.setDisableAutoCompactions(true); -columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30); -columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30); -columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30); Review comment: TBH, I do not understand why we set the options here instead of simply calling super.prepareForBulkLoad(). Also in the version that we currently use (5.18.4) the options are set like here. What value would it have to leave a comment with the RocksDB version? I guess, we will not downgrade RocksDB less than the version we currently use, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4
cadonna commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626501582 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() { return dbOptions.writeBufferManager(); } +@Override +public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) { +dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes); +return this; +} + +@Override +public long maxWriteBatchGroupSizeBytes() { +return dbOptions.maxWriteBatchGroupSizeBytes(); +} + +@Override +public Options oldDefaults(final int majorVersion, final int minorVersion) { +columnFamilyOptions.oldDefaults(majorVersion, minorVersion); +return this; +} + +@Override +public Options optimizeForSmallDb(final Cache cache) { +return super.optimizeForSmallDb(cache); +} + +@Override +public AbstractCompactionFilter> compactionFilter() { +return columnFamilyOptions.compactionFilter(); +} + +@Override +public AbstractCompactionFilterFactory> compactionFilterFactory() { +return columnFamilyOptions.compactionFilterFactory(); +} + +@Override +public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) { +dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec); +return this; +} + +@Override +public int statsPersistPeriodSec() { +return dbOptions.statsPersistPeriodSec(); +} + +@Override +public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) { +dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize); +return this; +} + +@Override +public long statsHistoryBufferSize() { +return dbOptions.statsHistoryBufferSize(); +} + +@Override +public Options setStrictBytesPerSync(final boolean strictBytesPerSync) { +dbOptions.setStrictBytesPerSync(strictBytesPerSync); +return this; +} + +@Override +public boolean strictBytesPerSync() { +return dbOptions.strictBytesPerSync(); +} + +@Override +public Options setListeners(final List listeners) { +dbOptions.setListeners(listeners); +return this; +} + +@Override +public List listeners() { +return dbOptions.listeners(); +} + +@Override +public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) { +dbOptions.setEnablePipelinedWrite(enablePipelinedWrite); +return this; +} + +@Override +public boolean enablePipelinedWrite() { +return dbOptions.enablePipelinedWrite(); +} + +@Override +public Options setUnorderedWrite(final boolean unorderedWrite) { +dbOptions.setUnorderedWrite(unorderedWrite); +return this; +} + +@Override +public boolean unorderedWrite() { +return dbOptions.unorderedWrite(); +} + +@Override +public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) { + dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen); +return this; +} + +@Override +public boolean skipCheckingSstFileSizesOnDbOpen() { +return dbOptions.skipCheckingSstFileSizesOnDbOpen(); +} + +@Override +public Options setWalFilter(final AbstractWalFilter walFilter) { +dbOptions.setWalFilter(walFilter); Review comment: I am not sure since users could theoretically activate the WAL in the config setter. We do not overwrite user settings as far as I can see. That is consistent with the rest of Streams where the users can overwrite parameters set by the DSL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org