[GitHub] [kafka] satishd closed pull request #10578: MINOR Moved ApiMessageAndVersion and AbstractApiMessageAndVersionSerde to clients module.

2021-05-05 Thread GitBox


satishd closed pull request #10578:
URL: https://github.com/apache/kafka/pull/10578


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-05-05 Thread Travis Bischel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Travis Bischel updated KAFKA-12671:
---
Priority: Blocker  (was: Critical)

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Travis Bischel
>Priority: Blocker
>  Labels: Transactions
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second ProduceRequest is read successfully before shutdown, but 
> I ignore the results because I am shutting down. I've taken out logs related 
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: 
> [DEBUG] read FindCoordinator v3; err: 
> [DEBUG] wrote InitProducerID v4; err: 
> [DEBUG] read InitProducerID v4; err: 
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: 
> [DEBUG] read AddPartitionsToTxn v2; err: 
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] read Produce v8; err: 
> [DEBUG] produced; to: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] wrote EndTxn v2; err: 
> [DEBUG] read EndTxn v2; err: 
> [DEBUG] read from broker errored, killing connection; addr: localhost:9092, 
> id: 1, successful_reads: 1, err: context canceled
> [DEBUG] read Produce v8; err: 
> [DEBUG] produced; to: 
> 

[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-05-05 Thread Travis Bischel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Travis Bischel updated KAFKA-12671:
---
Labels: Transactions  (was: )

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Travis Bischel
>Priority: Critical
>  Labels: Transactions
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second ProduceRequest is read successfully before shutdown, but 
> I ignore the results because I am shutting down. I've taken out logs related 
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: 
> [DEBUG] read FindCoordinator v3; err: 
> [DEBUG] wrote InitProducerID v4; err: 
> [DEBUG] read InitProducerID v4; err: 
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: 
> [DEBUG] read AddPartitionsToTxn v2; err: 
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] read Produce v8; err: 
> [DEBUG] produced; to: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] wrote EndTxn v2; err: 
> [DEBUG] read EndTxn v2; err: 
> [DEBUG] read from broker errored, killing connection; addr: localhost:9092, 
> id: 1, successful_reads: 1, err: context canceled
> [DEBUG] read Produce v8; err: 
> [DEBUG] produced; to: 
> 

[GitHub] [kafka] ableegoldman commented on pull request #10635: KAFKA-9295: increase start stream timeout

2021-05-05 Thread GitBox


ableegoldman commented on pull request #10635:
URL: https://github.com/apache/kafka/pull/10635#issuecomment-833225272


   Merged to trunk -- let's hope these good results continue. I'm going to 
close the ticket again so people are more likely to report it if they see 
things to continue to break.
   
   Merged about 9:58pm PST on May 5th, so please disregard any test failures on 
builds kicked off prior to that time (and raise any new failures by reopening 
the ticket)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10635: KAFKA-9295: increase start stream timeout

2021-05-05 Thread GitBox


ableegoldman merged pull request #10635:
URL: https://github.com/apache/kafka/pull/10635


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-05-05 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339995#comment-17339995
 ] 

Travis Bischel commented on KAFKA-12671:


The only fix for this if it occurs is to restart a broker.

This logic race condition is present in all versions from 0.11 onward.

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Travis Bischel
>Priority: Critical
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second ProduceRequest is read successfully before shutdown, but 
> I ignore the results because I am shutting down. I've taken out logs related 
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: 
> [DEBUG] read FindCoordinator v3; err: 
> [DEBUG] wrote InitProducerID v4; err: 
> [DEBUG] read InitProducerID v4; err: 
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: 
> [DEBUG] read AddPartitionsToTxn v2; err: 
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] read Produce v8; err: 
> [DEBUG] produced; to: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] wrote EndTxn v2; err: 
> [DEBUG] read EndTxn v2; err: 
> [DEBUG] read from broker errored, killing connection; addr: localhost:9092, 
> id: 1, successful_reads: 1, err: context 

[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-05-05 Thread Travis Bischel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Travis Bischel updated KAFKA-12671:
---
Affects Version/s: (was: 2.6.2)
   (was: 2.6.1)
   (was: 2.5.1)
   (was: 2.3.1)
   (was: 2.2.2)

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Travis Bischel
>Priority: Critical
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second ProduceRequest is read successfully before shutdown, but 
> I ignore the results because I am shutting down. I've taken out logs related 
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: 
> [DEBUG] read FindCoordinator v3; err: 
> [DEBUG] wrote InitProducerID v4; err: 
> [DEBUG] read InitProducerID v4; err: 
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: 
> [DEBUG] read AddPartitionsToTxn v2; err: 
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] read Produce v8; err: 
> [DEBUG] produced; to: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] wrote EndTxn v2; err: 
> [DEBUG] read EndTxn v2; err: 
> [DEBUG] read from broker errored, killing connection; addr: localhost:9092, 
> id: 1, 

[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-05-05 Thread Travis Bischel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Travis Bischel updated KAFKA-12671:
---
Affects Version/s: 2.2.2
   2.4.0
   2.3.1
   2.5.0
   2.6.0
   2.5.1
   2.7.0
   2.6.1
   2.8.0
   2.6.2

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 
> 2.8.0, 2.6.2
>Reporter: Travis Bischel
>Priority: Critical
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second ProduceRequest is read successfully before shutdown, but 
> I ignore the results because I am shutting down. I've taken out logs related 
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: 
> [DEBUG] read FindCoordinator v3; err: 
> [DEBUG] wrote InitProducerID v4; err: 
> [DEBUG] read InitProducerID v4; err: 
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: 
> [DEBUG] read AddPartitionsToTxn v2; err: 
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] read Produce v8; err: 
> [DEBUG] produced; to: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] wrote EndTxn v2; err: 

[GitHub] [kafka] showuon commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


showuon commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-833207399


   Thank you very much, @ableegoldman ! Let's make it better together! :)
   I'll address the rest of comments in another PR. And I'll also refine my 
another PR(https://github.com/apache/kafka/pull/10552) for general assign 
later. Will let you know. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10610: MINOR: replace deprecated Class.newInstance() to new one

2021-05-05 Thread GitBox


showuon commented on pull request #10610:
URL: https://github.com/apache/kafka/pull/10610#issuecomment-833206456


   @rhauch , could you help review this simple PR? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10635: KAFKA-9295: increase start stream timeout

2021-05-05 Thread GitBox


showuon commented on pull request #10635:
URL: https://github.com/apache/kafka/pull/10635#issuecomment-833204958


   @ableegoldman , the failed tests are all flaky. (3 `RaftClusterTest`, and 1 
`testMetricsDuringTopicCreateDelete` traced in KAFKA-9009). And most 
importantly, no failed `shouldInnerJoinMultiPartitionQueryable` test!
   
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 11 and Scala 2.13 / 
kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9009) Flaky Test kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete

2021-05-05 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339976#comment-17339976
 ] 

Luke Chen commented on KAFKA-9009:
--

Failed again.

[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10635/1/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/Build___JDK_11_and_Scala_2_13___testMetricsDuringTopicCreateDelete__/]

 
{code:java}
java.lang.AssertionError: assertion failed: UnderReplicatedPartitionCount not 
0: 1
at scala.Predef$.assert(Predef.scala:280)
at 
kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:121)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
{code}

> Flaky Test 
> kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete
> --
>
> Key: KAFKA-9009
> URL: https://issues.apache.org/jira/browse/KAFKA-9009
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Affects Versions: 2.5.0, 2.6.0
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: flaky-test
>
> Failure seen in 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/25644/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/testMetricsDuringTopicCreateDelete/]
>  
> {noformat}
> Error Messagejava.lang.AssertionError: assertion failed: 
> UnderReplicatedPartitionCount not 0: 1Stacktracejava.lang.AssertionError: 
> assertion failed: UnderReplicatedPartitionCount not 0: 1
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:123)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   

[GitHub] [kafka] dengziming commented on pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure

2021-05-05 Thread GitBox


dengziming commented on pull request #9577:
URL: https://github.com/apache/kafka/pull/9577#issuecomment-833187678


   ping @mumrah to have a look .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-05-05 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r627027404



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -314,22 +321,24 @@ class SessionErrorContext(val error: Errors,
   override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {}
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-FetchResponse.sizeOf(versionId, (new 
FetchSession.RESP_MAP).entrySet.iterator)
+FetchResponse.sizeOf(versionId, (new 
FetchSession.RESP_MAP).entrySet.iterator, Collections.emptyMap())

Review comment:
   ah good catch on this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-05-05 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r627027261



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -319,12 +355,25 @@ public int maxBytes() {
 return data.maxBytes();
 }
 
-public Map fetchData() {
-return fetchData;
+// For versions 13+, throws UnknownTopicIdException if the topic ID was 
unknown to the server.
+public Map fetchData(Map 
topicNames) throws UnknownTopicIdException {
+if (version() < 13)
+return fetchData;
+return toPartitionDataMap(data.topics(), topicNames);
 }
 
-public List toForget() {
-return toForget;
+// For versions 13+, throws UnknownTopicIdException if the topic ID was 
unknown to the server.
+public List forgottenTopics(Map topicNames) throws UnknownTopicIdException {
+if (version() >= 13) {
+data.forgottenTopicsData().forEach(forgottenTopic -> {
+String name = topicNames.get(forgottenTopic.topicId());
+if (name == null) {
+throw new UnknownTopicIdException(String.format("Topic Id 
%s in FetchRequest was unknown to the server", forgottenTopic.topicId()));
+}
+
forgottenTopic.setTopic(topicNames.getOrDefault(forgottenTopic.topicId(), ""));

Review comment:
   I originally did this when dealing with unresolved partitions. I was 
wondering if it would be better to not create a second data structure. If 
creating another structure (as done before) is not a problem, we can go back to 
that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-05-05 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r627026623



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -820,20 +838,30 @@ class KafkaApis(val requestChannel: RequestChannel,
   def createResponse(throttleTimeMs: Int): FetchResponse = {
 // Down-convert messages for each partition if required
 val convertedData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-unconvertedFetchResponse.responseData.forEach { (tp, 
unconvertedPartitionData) =>
-  val error = Errors.forCode(unconvertedPartitionData.errorCode)
-  if (error != Errors.NONE)
-debug(s"Fetch request with correlation id 
${request.header.correlationId} from client $clientId " +
-  s"on partition $tp failed due to ${error.exceptionName}")
-  convertedData.put(tp, maybeConvertFetchedData(tp, 
unconvertedPartitionData))
+unconvertedFetchResponse.data().responses().forEach { topicResponse =>
+  if (topicResponse.topic() != "") {

Review comment:
   Realized this was no longer the case and removed in the most recent 
commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman merged pull request #10509:
URL: https://github.com/apache/kafka/pull/10509


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627021459



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign previously owned partitions:
+ *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+ *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+ *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
  *
  * @param partitionsPerTopic  The number of partitions for each 
subscribed topic
  * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
  *
- * @return Map from each member to the list of partitions assigned to them.
+ * @returnMap from each member to the list of 
partitions assigned to them.
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMembersHavingMorePartitions = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627020968



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign previously owned partitions:
+ *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+ *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+ *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
  *
  * @param partitionsPerTopic  The number of partitions for each 
subscribed topic
  * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
  *
- * @return Map from each member to the list of partitions assigned to them.
+ * @returnMap from each member to the list of 
partitions assigned to them.
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMembersHavingMorePartitions = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : 

[GitHub] [kafka] shayelkin opened a new pull request #10636: MINOR: Bump Jersey deps to 2.34 due to CVE-2021-28168

2021-05-05 Thread GitBox


shayelkin opened a new pull request #10636:
URL: https://github.com/apache/kafka/pull/10636


   The version of the Eclipse Jersey library brought as dependences,
   2.31, has a known vulnerability, CVE-2021-28168 
(https://github.com/advisories/GHSA-c43q-5hpj-4crv).
   
   This replaces it with 2.34, which is fully compatible with
   2.31, except for bugs and vulnerabilities.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627020382



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign previously owned partitions:
+ *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+ *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+ *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
  *
  * @param partitionsPerTopic  The number of partitions for each 
subscribed topic
  * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
  *
- * @return Map from each member to the list of partitions assigned to them.
+ * @returnMap from each member to the list of 
partitions assigned to them.
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMembersHavingMorePartitions = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627019548



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign previously owned partitions:
+ *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+ *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+ *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
  *
  * @param partitionsPerTopic  The number of partitions for each 
subscribed topic
  * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
  *
- * @return Map from each member to the list of partitions assigned to them.
+ * @returnMap from each member to the list of 
partitions assigned to them.
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMembersHavingMorePartitions = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : 

[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-05-05 Thread GitBox


junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r625422142



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -425,16 +438,27 @@ class IncrementalFetchContext(private val time: Time,
 val topicPart = element.getKey
 val respData = element.getValue
 val cachedPart = session.partitionMap.find(new 
CachedPartition(topicPart))
-val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
-if (mustRespond) {
+
+// If we have an situation where there is a valid ID on the partition, 
but it does not match

Review comment:
   an situation => a situation

##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -314,22 +321,24 @@ class SessionErrorContext(val error: Errors,
   override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {}
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-FetchResponse.sizeOf(versionId, (new 
FetchSession.RESP_MAP).entrySet.iterator)
+FetchResponse.sizeOf(versionId, (new 
FetchSession.RESP_MAP).entrySet.iterator, Collections.emptyMap())

Review comment:
   Hmm, it seems that we can't pass in an empty topicIds since partition 
iterator is not empty?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -226,4 +284,4 @@ private static FetchResponseData toMessage(Errors error,
 .setSessionId(sessionId)
 .setResponses(topicResponseList);
 }
-}
\ No newline at end of file
+}

Review comment:
   no need for extra new line.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -319,12 +355,25 @@ public int maxBytes() {
 return data.maxBytes();
 }
 
-public Map fetchData() {
+// For versions 13+, throws UnknownTopicIdException if the topic ID was 
unknown to the server.
+public Map fetchData(Map 
topicNames) throws UnknownTopicIdException {

Review comment:
   Since toPartitionDataMap() handles all versions, could we just simply 
call toPartitionDataMap()? Then, I am not sure if we need to call 
toPartitionDataMap() in the constructor.

##
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##
@@ -276,7 +284,12 @@ class ReplicaAlterLogDirsThread(name: String,
 } else {
   // Set maxWait and minBytes to 0 because the response should return 
immediately if
   // the future log has caught up with the current log of the partition
-  val requestBuilder = 
FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, 
requestMap).setMaxBytes(maxBytes)
+  val version: Short = if (ApiKeys.FETCH.latestVersion >= 13 && 
topics.size() != topicIdsInRequest.size())
+12
+  else
+ApiKeys.FETCH.latestVersion

Review comment:
   The calculation of version is duplicated between here and 
ReplicaFetcherThread. Could we share them somehow?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -80,14 +89,26 @@ public Errors error() {
 return Errors.forCode(data.errorCode());
 }
 
-public LinkedHashMap 
responseData() {
+public LinkedHashMap 
responseData(Map topicNames, short version) {
+return toResponseDataMap(topicNames, version);
+
+}
+
+// TODO: Should be replaced or cleaned up. The idea is that in KafkaApis 
we need to reconstruct responseData even though we could have just passed in 
and out a map.
+//  With topic IDs, recreating the map takes a little more time since we 
have to get the topic name from the topic ID to name map.
+//  The refactor somewhat helps in KafkaApis where we already have the 
topic names, but we have to recompute the map using topic IDs instead of just 
returning what we have.
+//  Can be replaced when we remove toMessage and change sizeOf as a part 
of KAFKA-12410.
+// Used when we can guarantee responseData is populated with all possible 
partitions
+// This occurs when we have a response version < 13 or we built the 
FetchResponse with
+// responseDataMap as a parameter and we have the same topic IDs available.
+public LinkedHashMap 
resolvedResponseData() {
 if (responseData == null) {
 synchronized (this) {
 if (responseData == null) {
 responseData = new LinkedHashMap<>();
 data.responses().forEach(topicResponse ->
-topicResponse.partitions().forEach(partition ->
-responseData.put(new 
TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition))
+

[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627016997



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign previously owned partitions:
+ *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+ *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+ *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
  *
  * @param partitionsPerTopic  The number of partitions for each 
subscribed topic
  * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
  *
- * @return Map from each member to the list of partitions assigned to them.
+ * @returnMap from each member to the list of 
partitions assigned to them.
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMembersHavingMorePartitions = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627015270



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign previously owned partitions:
+ *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+ *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+ *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
  *
  * @param partitionsPerTopic  The number of partitions for each 
subscribed topic
  * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
  *
- * @return Map from each member to the list of partitions assigned to them.
+ * @returnMap from each member to the list of 
partitions assigned to them.
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMembersHavingMorePartitions = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627016286



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign previously owned partitions:
+ *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+ *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+ *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
  *
  * @param partitionsPerTopic  The number of partitions for each 
subscribed topic
  * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
  *
- * @return Map from each member to the list of partitions assigned to them.
+ * @returnMap from each member to the list of 
partitions assigned to them.
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMembersHavingMorePartitions = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627015270



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign previously owned partitions:
+ *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+ *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+ *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
  *
  * @param partitionsPerTopic  The number of partitions for each 
subscribed topic
  * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
  *
- * @return Map from each member to the list of partitions assigned to them.
+ * @returnMap from each member to the list of 
partitions assigned to them.
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMembersHavingMorePartitions = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627014485



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign previously owned partitions:
+ *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+ *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+ *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
  *
  * @param partitionsPerTopic  The number of partitions for each 
subscribed topic
  * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
  *
- * @return Map from each member to the list of partitions assigned to them.
+ * @returnMap from each member to the list of 
partitions assigned to them.
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMembersHavingMorePartitions = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : 

[GitHub] [kafka] showuon commented on pull request #10635: KAFKA-9295: increase start stream timeout

2021-05-05 Thread GitBox


showuon commented on pull request #10635:
URL: https://github.com/apache/kafka/pull/10635#issuecomment-833152754


   This should be merged soon, and I'm confident increasing the timeout will 
reduce the number of failure a lot. 爛 Let's wait for the jenkins build. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627012042



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign previously owned partitions:
+ *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+ *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+ *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
  *
  * @param partitionsPerTopic  The number of partitions for each 
subscribed topic
  * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
  *
- * @return Map from each member to the list of partitions assigned to them.
+ * @returnMap from each member to the list of 
partitions assigned to them.
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;

Review comment:
   Just a nit -- and to clarify up front, if you agree with this let's 
still hold off on doing it here so this PR can finally be merged, as I figure 
any nits can be addressed in your general assign PR:
   
   It's still a bit unclear what this value will be sued for when you first see 
it, maybe we can work in the word `minQuota` somewhere in the name? Eg 
`expectedNumMembersWithMoreThanMinQuotaPartitions`, or for a slightly shorter 
example `numConsumersAssignedOverMinQuota`, or something between or similar to 
those
   
   FYI I'm also ok with it as-is if you prefer the current name -- just wanted 
to throw out some other suggestions. I'll trust you to pick whatever name feels 
right  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata

2021-05-05 Thread GitBox


wcarlson5 commented on pull request #10634:
URL: https://github.com/apache/kafka/pull/10634#issuecomment-833150214


   @ableegoldman yep it looks like I am going to have to go back to the drawing 
board for either the test or the impl. I will look at it tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10635: KAFKA-9295: increase start stream timeout

2021-05-05 Thread GitBox


ableegoldman commented on pull request #10635:
URL: https://github.com/apache/kafka/pull/10635#issuecomment-833149684


   Just ping me when the build passes, you know the drill  
   
   By the way, even if it does fail at another point later on in this test, I'd 
like to go ahead and merge this anyways. Hopefully increasing the timeout will 
at least reduce the number of failures, of which there have been quite a few, 
and will make it easier to investigate the remaining problems. But let me know 
if you'd prefer that I wait in case that does happen. Though I'm feeling 
optimistic it won't 爛 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10635: KAFKA-9295: increase start stream timeout

2021-05-05 Thread GitBox


showuon commented on pull request #10635:
URL: https://github.com/apache/kafka/pull/10635#issuecomment-833148107


   @ableegoldman , please take a look. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #10635: KAFKA-9295: increase start stream timeout

2021-05-05 Thread GitBox


showuon opened a new pull request #10635:
URL: https://github.com/apache/kafka/pull/10635


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dosvath commented on pull request #10375: KAFKA-12522: Cast SMT should allow null value records to pass through

2021-05-05 Thread GitBox


dosvath commented on pull request #10375:
URL: https://github.com/apache/kafka/pull/10375#issuecomment-833132393


   @ewencp @mjsax one more ping on this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata

2021-05-05 Thread GitBox


ableegoldman commented on pull request #10634:
URL: https://github.com/apache/kafka/pull/10634#issuecomment-833125376


   @wcarlson5 there's a failure that I'm guessing is related: 
`streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626990213



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
 return dbOptions.writeBufferManager();
 }
 
+@Override
+public Options setMaxWriteBatchGroupSizeBytes(final long 
maxWriteBatchGroupSizeBytes) {
+dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+return this;
+}
+
+@Override
+public long maxWriteBatchGroupSizeBytes() {
+return dbOptions.maxWriteBatchGroupSizeBytes();
+}
+
+@Override
+public Options oldDefaults(final int majorVersion, final int minorVersion) 
{
+columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+return this;
+}
+
+@Override
+public Options optimizeForSmallDb(final Cache cache) {
+return super.optimizeForSmallDb(cache);
+}
+
+@Override
+public AbstractCompactionFilter> 
compactionFilter() {
+return columnFamilyOptions.compactionFilter();
+}
+
+@Override
+public AbstractCompactionFilterFactory> compactionFilterFactory() {
+return columnFamilyOptions.compactionFilterFactory();
+}
+
+@Override
+public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+return this;
+}
+
+@Override
+public int statsPersistPeriodSec() {
+return dbOptions.statsPersistPeriodSec();
+}
+
+@Override
+public Options setStatsHistoryBufferSize(final long 
statsHistoryBufferSize) {
+dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+return this;
+}
+
+@Override
+public long statsHistoryBufferSize() {
+return dbOptions.statsHistoryBufferSize();
+}
+
+@Override
+public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+return this;
+}
+
+@Override
+public boolean strictBytesPerSync() {
+return dbOptions.strictBytesPerSync();
+}
+
+@Override
+public Options setListeners(final List listeners) {
+dbOptions.setListeners(listeners);
+return this;
+}
+
+@Override
+public List listeners() {
+return dbOptions.listeners();
+}
+
+@Override
+public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) 
{
+dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+return this;
+}
+
+@Override
+public boolean enablePipelinedWrite() {
+return dbOptions.enablePipelinedWrite();
+}
+
+@Override
+public Options setUnorderedWrite(final boolean unorderedWrite) {
+dbOptions.setUnorderedWrite(unorderedWrite);
+return this;
+}
+
+@Override
+public boolean unorderedWrite() {
+return dbOptions.unorderedWrite();
+}
+
+@Override
+public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean 
skipCheckingSstFileSizesOnDbOpen) {
+
dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+return this;
+}
+
+@Override
+public boolean skipCheckingSstFileSizesOnDbOpen() {
+return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+}
+
+@Override
+public Options setWalFilter(final AbstractWalFilter walFilter) {
+dbOptions.setWalFilter(walFilter);

Review comment:
   Yep, in RocksDBStore we set `wOptions.setDisableWAL(true)` -- it's not 
on the DB/CF options, instead it's a configuration on a separate WriteOptions 
class. So we do indeed enforce that it's disabled
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626989130



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
 return dbOptions.writeBufferManager();
 }
 
+@Override
+public Options setMaxWriteBatchGroupSizeBytes(final long 
maxWriteBatchGroupSizeBytes) {
+dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+return this;
+}
+
+@Override
+public long maxWriteBatchGroupSizeBytes() {
+return dbOptions.maxWriteBatchGroupSizeBytes();
+}
+
+@Override
+public Options oldDefaults(final int majorVersion, final int minorVersion) 
{
+columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+return this;
+}
+
+@Override
+public Options optimizeForSmallDb(final Cache cache) {
+return super.optimizeForSmallDb(cache);
+}
+
+@Override
+public AbstractCompactionFilter> 
compactionFilter() {
+return columnFamilyOptions.compactionFilter();
+}
+
+@Override
+public AbstractCompactionFilterFactory> compactionFilterFactory() {
+return columnFamilyOptions.compactionFilterFactory();
+}
+
+@Override
+public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+return this;
+}
+
+@Override
+public int statsPersistPeriodSec() {
+return dbOptions.statsPersistPeriodSec();
+}
+
+@Override
+public Options setStatsHistoryBufferSize(final long 
statsHistoryBufferSize) {
+dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+return this;
+}
+
+@Override
+public long statsHistoryBufferSize() {
+return dbOptions.statsHistoryBufferSize();
+}
+
+@Override
+public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+return this;
+}
+
+@Override
+public boolean strictBytesPerSync() {
+return dbOptions.strictBytesPerSync();
+}
+
+@Override
+public Options setListeners(final List listeners) {
+dbOptions.setListeners(listeners);
+return this;
+}
+
+@Override
+public List listeners() {
+return dbOptions.listeners();
+}
+
+@Override
+public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) 
{
+dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+return this;
+}
+
+@Override
+public boolean enablePipelinedWrite() {
+return dbOptions.enablePipelinedWrite();
+}
+
+@Override
+public Options setUnorderedWrite(final boolean unorderedWrite) {
+dbOptions.setUnorderedWrite(unorderedWrite);
+return this;
+}
+
+@Override
+public boolean unorderedWrite() {
+return dbOptions.unorderedWrite();
+}
+
+@Override
+public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean 
skipCheckingSstFileSizesOnDbOpen) {
+
dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+return this;
+}
+
+@Override
+public boolean skipCheckingSstFileSizesOnDbOpen() {
+return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+}
+
+@Override
+public Options setWalFilter(final AbstractWalFilter walFilter) {
+dbOptions.setWalFilter(walFilter);

Review comment:
   I thought we did actively disable it, although I'll see if I can find 
where/whether this is done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-05-05 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339937#comment-17339937
 ] 

Luke Chen commented on KAFKA-9295:
--

Let me create another PR for it later.

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8531) Change default replication factor config

2021-05-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8531.

Resolution: Fixed

> Change default replication factor config
> 
>
> Key: KAFKA-8531
> URL: https://issues.apache.org/jira/browse/KAFKA-8531
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: kip
> Fix For: 3.0.0
>
>
> With KAFKA-8305, AdminClient allows to create topics based on the broker 
> default replication factor.
> Kafka Streams sets `replication.factor` to 1 by default atm, to give a good 
> out-of-the-box user experience. The problem is, that people may need to 
> change the config if they push an application to production.
> We should change the default to `-1` to exploit the new AdminClient feature. 
> This won't impact the out-of-the-box experience and may avoids the need to 
> change the setting when pushing an application to production.
> KIP-733: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-733%3A+change+Kafka+Streams+default+replication+factor+config]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax merged pull request #10532: KAFKA-8531: Change default replication factor config

2021-05-05 Thread GitBox


mjsax merged pull request #10532:
URL: https://github.com/apache/kafka/pull/10532


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626176038



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -396,6 +396,21 @@ public void close() {
 log.info("Skipping to close non-initialized store {}", 
entry.getKey());
 }
 }
+for (final StateStore store : globalStateStores) {

Review comment:
   Heh, @guozhangwang and I reviewed at the same time. I didn't notice that 
`globalStores` would not be populated until restoration, whereas 
`globalStateStores` is populated in the constructor. Imo we should just 
populate `globalStores` in the constructor as well, but I guess that won't be 
necessary if @guozhangwang does a quick followup to consolidate them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626971805



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
 return dbOptions.writeBufferManager();
 }
 
+@Override
+public Options setMaxWriteBatchGroupSizeBytes(final long 
maxWriteBatchGroupSizeBytes) {
+dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+return this;
+}
+
+@Override
+public long maxWriteBatchGroupSizeBytes() {
+return dbOptions.maxWriteBatchGroupSizeBytes();
+}
+
+@Override
+public Options oldDefaults(final int majorVersion, final int minorVersion) 
{
+columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+return this;
+}
+
+@Override
+public Options optimizeForSmallDb(final Cache cache) {
+return super.optimizeForSmallDb(cache);
+}
+
+@Override
+public AbstractCompactionFilter> 
compactionFilter() {
+return columnFamilyOptions.compactionFilter();
+}
+
+@Override
+public AbstractCompactionFilterFactory> compactionFilterFactory() {
+return columnFamilyOptions.compactionFilterFactory();
+}
+
+@Override
+public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+return this;
+}
+
+@Override
+public int statsPersistPeriodSec() {
+return dbOptions.statsPersistPeriodSec();
+}
+
+@Override
+public Options setStatsHistoryBufferSize(final long 
statsHistoryBufferSize) {
+dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+return this;
+}
+
+@Override
+public long statsHistoryBufferSize() {
+return dbOptions.statsHistoryBufferSize();
+}
+
+@Override
+public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+return this;
+}
+
+@Override
+public boolean strictBytesPerSync() {
+return dbOptions.strictBytesPerSync();
+}
+
+@Override
+public Options setListeners(final List listeners) {
+dbOptions.setListeners(listeners);
+return this;
+}
+
+@Override
+public List listeners() {
+return dbOptions.listeners();
+}
+
+@Override
+public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) 
{
+dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+return this;
+}
+
+@Override
+public boolean enablePipelinedWrite() {
+return dbOptions.enablePipelinedWrite();
+}
+
+@Override
+public Options setUnorderedWrite(final boolean unorderedWrite) {
+dbOptions.setUnorderedWrite(unorderedWrite);
+return this;
+}
+
+@Override
+public boolean unorderedWrite() {
+return dbOptions.unorderedWrite();
+}
+
+@Override
+public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean 
skipCheckingSstFileSizesOnDbOpen) {
+
dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+return this;
+}
+
+@Override
+public boolean skipCheckingSstFileSizesOnDbOpen() {
+return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+}
+
+@Override
+public Options setWalFilter(final AbstractWalFilter walFilter) {
+dbOptions.setWalFilter(walFilter);
+return this;
+}
+
+@Override
+public WalFilter walFilter() {
+return dbOptions.walFilter();
+}
+
+@Override
+public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+dbOptions.setAllowIngestBehind(allowIngestBehind);
+return this;
+}
+
+@Override
+public boolean allowIngestBehind() {
+return dbOptions.allowIngestBehind();
+}
+
+@Override
+public Options setPreserveDeletes(final boolean preserveDeletes) {
+dbOptions.setPreserveDeletes(preserveDeletes);
+return this;
+}
+
+@Override
+public boolean preserveDeletes() {
+return dbOptions.preserveDeletes();
+}
+
+@Override
+public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+dbOptions.setTwoWriteQueues(twoWriteQueues);
+return this;
+}
+
+@Override
+public boolean twoWriteQueues() {
+return dbOptions.twoWriteQueues();
+}
+
+@Override
+public Options setManualWalFlush(final boolean manualWalFlush) {
+dbOptions.setManualWalFlush(manualWalFlush);
+return this;
+}
+
+@Override
+public boolean manualWalFlush() {
+return dbOptions.manualWalFlush();
+}
+
+@Override
+public Options 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-05-05 Thread GitBox


ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626971137



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
 return dbOptions.writeBufferManager();
 }
 
+@Override
+public Options setMaxWriteBatchGroupSizeBytes(final long 
maxWriteBatchGroupSizeBytes) {
+dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+return this;
+}
+
+@Override
+public long maxWriteBatchGroupSizeBytes() {
+return dbOptions.maxWriteBatchGroupSizeBytes();
+}
+
+@Override
+public Options oldDefaults(final int majorVersion, final int minorVersion) 
{
+columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+return this;
+}
+
+@Override
+public Options optimizeForSmallDb(final Cache cache) {
+return super.optimizeForSmallDb(cache);
+}
+
+@Override
+public AbstractCompactionFilter> 
compactionFilter() {
+return columnFamilyOptions.compactionFilter();
+}
+
+@Override
+public AbstractCompactionFilterFactory> compactionFilterFactory() {
+return columnFamilyOptions.compactionFilterFactory();
+}
+
+@Override
+public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+return this;
+}
+
+@Override
+public int statsPersistPeriodSec() {
+return dbOptions.statsPersistPeriodSec();
+}
+
+@Override
+public Options setStatsHistoryBufferSize(final long 
statsHistoryBufferSize) {
+dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+return this;
+}
+
+@Override
+public long statsHistoryBufferSize() {
+return dbOptions.statsHistoryBufferSize();
+}
+
+@Override
+public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+return this;
+}
+
+@Override
+public boolean strictBytesPerSync() {
+return dbOptions.strictBytesPerSync();
+}
+
+@Override
+public Options setListeners(final List listeners) {
+dbOptions.setListeners(listeners);
+return this;
+}
+
+@Override
+public List listeners() {
+return dbOptions.listeners();
+}
+
+@Override
+public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) 
{
+dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+return this;
+}
+
+@Override
+public boolean enablePipelinedWrite() {
+return dbOptions.enablePipelinedWrite();
+}
+
+@Override
+public Options setUnorderedWrite(final boolean unorderedWrite) {
+dbOptions.setUnorderedWrite(unorderedWrite);
+return this;
+}
+
+@Override
+public boolean unorderedWrite() {
+return dbOptions.unorderedWrite();
+}
+
+@Override
+public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean 
skipCheckingSstFileSizesOnDbOpen) {
+
dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+return this;
+}
+
+@Override
+public boolean skipCheckingSstFileSizesOnDbOpen() {
+return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+}
+
+@Override
+public Options setWalFilter(final AbstractWalFilter walFilter) {
+dbOptions.setWalFilter(walFilter);
+return this;
+}
+
+@Override
+public WalFilter walFilter() {
+return dbOptions.walFilter();
+}
+
+@Override
+public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+dbOptions.setAllowIngestBehind(allowIngestBehind);
+return this;
+}
+
+@Override
+public boolean allowIngestBehind() {
+return dbOptions.allowIngestBehind();
+}
+
+@Override
+public Options setPreserveDeletes(final boolean preserveDeletes) {
+dbOptions.setPreserveDeletes(preserveDeletes);
+return this;
+}
+
+@Override
+public boolean preserveDeletes() {
+return dbOptions.preserveDeletes();
+}
+
+@Override
+public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+dbOptions.setTwoWriteQueues(twoWriteQueues);
+return this;
+}
+
+@Override
+public boolean twoWriteQueues() {
+return dbOptions.twoWriteQueues();
+}
+
+@Override
+public Options setManualWalFlush(final boolean manualWalFlush) {
+dbOptions.setManualWalFlush(manualWalFlush);
+return this;
+}
+
+@Override
+public boolean manualWalFlush() {
+return dbOptions.manualWalFlush();
+}
+
+@Override
+public Options 

[jira] [Created] (KAFKA-12756) Update Zookeeper to 3.6.3 or higher

2021-05-05 Thread Boojapho (Jira)
Boojapho created KAFKA-12756:


 Summary: Update Zookeeper to 3.6.3 or higher
 Key: KAFKA-12756
 URL: https://issues.apache.org/jira/browse/KAFKA-12756
 Project: Kafka
  Issue Type: Task
Affects Versions: 2.8.0, 2.7.0
Reporter: Boojapho


Zookeeper 3.6.3 or higher provides a security fix for 
[CVE-21409]([https://nvd.nist.gov/vuln/detail/CVE-2021-21409)] which should be 
included in Apache Kafka to eliminate the vulnerability.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9177) Pause completed partitions on restore consumer

2021-05-05 Thread Andrey Polyakov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339336#comment-17339336
 ] 

Andrey Polyakov edited comment on KAFKA-9177 at 5/5/21, 10:01 PM:
--

It's possible this got missed as part of KAFKA-9113? I'm seeing many messages 
like this per second in our 2.6.1 Kafka Streams application logs (and also 
2.6.2, 2.7.0, and 2.8.0):
{code}
{"timestamp":{"seconds":1620165908,"nanos":76900},"thread":"myapp-StreamThread-1","severity":"DEBUG","loggerName":"org.apache.kafka.streams.processor.internals.StoreChangelogReader","message":"stream-thread
 [myapp-StreamThread-1] Finished restoring all changelogs []"}
{code}


was (Author: apolyakov):
It's possible this got missed as part of KAFKA-9113? I'm seeing many messages 
like this per second in our 2.6.1 Kafka Streams application logs:
{code}
{"timestamp":{"seconds":1620165908,"nanos":76900},"thread":"myapp-StreamThread-1","severity":"DEBUG","loggerName":"org.apache.kafka.streams.processor.internals.StoreChangelogReader","message":"stream-thread
 [myapp-StreamThread-1] Finished restoring all changelogs []"}
{code}


> Pause completed partitions on restore consumer
> --
>
> Key: KAFKA-9177
> URL: https://issues.apache.org/jira/browse/KAFKA-9177
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> The StoreChangelogReader is responsible for tracking and restoring active 
> tasks, but once a store has finished restoring it will continue polling for 
> records on that partition.
> Ordinarily this doesn't make a difference as a store is not completely 
> restored until its entire changelog has been read, so there are no more 
> records for poll to return anyway. But if the restoring state is actually an 
> optimized source KTable, the changelog is just the source topic and poll will 
> keep returning records for that partition until all stores have been restored.
> Note that this isn't a correctness issue since it's just the restore 
> consumer, but it is wasteful to be polling for records and throwing them 
> away. We should pause completed partitions in StoreChangelogReader so we 
> don't slow down the restore consumer in reading from the unfinished changelog 
> topics, and avoid wasted network.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch merged pull request #10014: KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes

2021-05-05 Thread GitBox


rhauch merged pull request #10014:
URL: https://github.com/apache/kafka/pull/10014


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-05-05 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339862#comment-17339862
 ] 

Randall Hauch commented on KAFKA-10340:
---

I cherry-picked the original PR (https://github.com/apache/kafka/pull/10016) to 
the `2.8` branch (now that it's not frozen) and updated the fixed versions.

This completes all of the planned work for this issue.

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1, 2.6.2, 2.8.1
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-05-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10340:
--
Fix Version/s: 2.8.1

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1, 2.6.2, 2.8.1
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12717) Remove internal converter config properties

2021-05-05 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339833#comment-17339833
 ] 

Chris Egerton commented on KAFKA-12717:
---

Filed 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Removal+of+Connect%27s+internal+converter+properties]
 for these changes.

> Remove internal converter config properties
> ---
>
> Key: KAFKA-12717
> URL: https://issues.apache.org/jira/browse/KAFKA-12717
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> KAFKA-5540 / 
> [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]
>  deprecated but did not officially remove Connect's internal converter worker 
> config properties. With the upcoming 3.0 release, we can make the 
> backwards-incompatible change of completely removing these properties once 
> and for all.
>  
> One migration path for users who may still be running Connect clusters with 
> different internal converters can be:
>  # Stop all workers on the cluster
>  # For each internal topic (config, offsets, and status):
>  ## Create a new topic to take the place of the existing one
>  ## For every message in the existing topic:
>  ### Deserialize the message's key and value using the Connect cluster's old 
> internal key and value converters
>  ### Serialize the message's key and value using the [JSON 
> converter|https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java]
>  with schemas disabled (by setting the {{schemas.enable}} property to 
> {{false}})
>  ### Write a message with the new key and value to the new internal topic
>  # Reconfigure each Connect worker to use the newly-created internal topics 
> from step 2
>  # Start all workers on the cluster



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-05-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339825#comment-17339825
 ] 

A. Sophie Blee-Goldman commented on KAFKA-9295:
---

[~showuon] are you interested in submitting another PR to increase the timeout 
of startApplicationAndWaitUntilRunning? If not, or of you can't get to it just 
yet, you can unassign yourself from the ticket in case someone else is able to 
pick this up before you have time :) 

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10632: MINOR: fix streams_broker_compatibility_test.py

2021-05-05 Thread GitBox


chia7712 commented on pull request #10632:
URL: https://github.com/apache/kafka/pull/10632#issuecomment-832903232


   >  I'm just going to go ahead and merge this so we can get the system tests 
fixed ASAP. Hope you don't mind
   
   thanks for merging this patch :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10632: MINOR: fix streams_broker_compatibility_test.py

2021-05-05 Thread GitBox


ableegoldman merged pull request #10632:
URL: https://github.com/apache/kafka/pull/10632


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10632: MINOR: fix streams_broker_compatibility_test.py

2021-05-05 Thread GitBox


ableegoldman commented on pull request #10632:
URL: https://github.com/apache/kafka/pull/10632#issuecomment-832901885


   @chia7712 I'm just going to go ahead and merge this so we can get the system 
tests fixed ASAP. Hope you don't mind, and thank you again for the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10238: KAFKA-10340: Backport proactively close producer when cancelling source tasks

2021-05-05 Thread GitBox


rhauch commented on pull request #10238:
URL: https://github.com/apache/kafka/pull/10238#issuecomment-832898998


   Closed without merging. Instead, I cherry-picked the commit from #10016 to 
the `2.8` branch now that 2.8.0 is out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch closed pull request #10238: KAFKA-10340: Backport proactively close producer when cancelling source tasks

2021-05-05 Thread GitBox


rhauch closed pull request #10238:
URL: https://github.com/apache/kafka/pull/10238


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on pull request #10631: MINOR: Stop using hamcrest in system tests

2021-05-05 Thread GitBox


lct45 commented on pull request #10631:
URL: https://github.com/apache/kafka/pull/10631#issuecomment-832873282


   kicked off 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4492/, if it 
passes this fix is good enough for now, if it fails then the missing 
dependencies are bigger than just hamcrest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

2021-05-05 Thread GitBox


guozhangwang commented on pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#issuecomment-832870539


   Merged to trunk, thanks @spena 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

2021-05-05 Thread GitBox


guozhangwang merged pull request #10613:
URL: https://github.com/apache/kafka/pull/10613


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12755) Add server-common, server-tools gradle modules

2021-05-05 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-12755:


Assignee: Colin McCabe

> Add server-common, server-tools gradle modules
> --
>
> Key: KAFKA-12755
> URL: https://issues.apache.org/jira/browse/KAFKA-12755
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Attachments: out.jpg, out2.jpg
>
>
> *Problems*
> The core module takes a long time to compile.  There are several reasons for 
> this.  One is that it’s too big -- it would be better as several gradle 
> modules. Gradle is good about compiling multiple modules in parallel, but if 
> you have one really big module, you lose that parallelism.  Another issue 
> with the core module is that it’s written in Scala, and the Scala compiler 
> takes longer than the Java one.
> A lot of server-side code is in the “clients” module.  From there, it ends up 
> on the CLASSPATH of producers, consumers, and admin clients.  This has a lot 
> of bad effects: it bloats the size of the clients jar, and allows downstream 
> projects to peek at code that should be isolated to the broker.
> A lot of tools can’t be put into the “tools” module because they depend on 
> classes that are in “core”.  And tools can’t have a core dependency, because 
> that would impose a core dependency on connect as well.
> One example of this problem is StorageTool and ClusterTool.  These tools 
> ended up getting written in Scala and put in the “core” module, rather than 
> the “tools” module.
> Our long-term goal is to migrate from Scala to Java, and the monolithic core 
> module is an obstacle to that.
> *Proposed Fixes*
> Rename the “metadata” module to “controller” to reflect the fact that it 
> contains the controller
> Make the "controller" module depend on "raft" rather than the other way 
> around ("raft" used to depend on "metadata")  This reflects the fact that the 
> controller consumes the API provided by the raft module.  (There is a 
> separate PR to do this.)
> Create a new “server-common” module for common code which is shared by 
> several server modules, but not needed for clients.
> Remove the dependency between "connect" and "tools"
> Create a new “server-tools“ module which depends on “core”
> *The Server-Common Module*
> The server-common module should contain:
> * Pluggable APIs that are used only in the server (not in any client)
> * The KIP-405 tiered storage APIs
> * Authorizer APIs
> * CreateTopicPolicy, AlterConfigPolicy, etc.
> * Common Java utility code that is used in the server, but not used in the 
> client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-05-05 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r626749507



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -661,11 +661,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 val versionId = request.header.apiVersion
 val clientId = request.header.clientId
 val fetchRequest = request.body[FetchRequest]
+val (topicIds, topicNames) =
+  if (fetchRequest.version() >= 13)
+metadataCache.topicIdInfo()
+  else
+(Collections.emptyMap[String, Uuid](), Collections.emptyMap[Uuid, 
String]())
+
 val fetchContext = fetchManager.newContext(

Review comment:
   I ended up deciding to end the session and throw a top level error when 
we have an unknown topic ID.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-05-05 Thread GitBox


guozhangwang commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832855351


   I'm re-triggering the unit tests again, @cadonna lmk if you think one green 
is sufficient (i.e. if in the past we are likely to hit one virtual function 
with one run).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-05-05 Thread GitBox


guozhangwang commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626735704



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##
@@ -189,16 +199,6 @@ public void shouldCloseStateStoresOnClose() throws 
Exception {
 assertFalse(globalStore.isOpen());
 }
 
-@Test
-public void shouldTransitionToDeadOnClose() throws Exception {

Review comment:
   Thx!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules

2021-05-05 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12755:
-
Description: 
*Problems*
The core module takes a long time to compile.  There are several reasons for 
this.  One is that it’s too big -- it would be better as several gradle 
modules. Gradle is good about compiling multiple modules in parallel, but if 
you have one really big module, you lose that parallelism.  Another issue with 
the core module is that it’s written in Scala, and the Scala compiler takes 
longer than the Java one.

A lot of server-side code is in the “clients” module.  From there, it ends up 
on the CLASSPATH of producers, consumers, and admin clients.  This has a lot of 
bad effects: it bloats the size of the clients jar, and allows downstream 
projects to peek at code that should be isolated to the broker.

A lot of tools can’t be put into the “tools” module because they depend on 
classes that are in “core”.  And tools can’t have a core dependency, because 
that would impose a core dependency on connect as well.

One example of this problem is StorageTool and ClusterTool.  These tools ended 
up getting written in Scala and put in the “core” module, rather than the 
“tools” module.

Our long-term goal is to migrate from Scala to Java, and the monolithic core 
module is an obstacle to that.

*Proposed Fixes*
Rename the “metadata” module to “controller” to reflect the fact that it 
contains the controller

Make the "controller" module depend on "raft" rather than the other way around 
("raft" used to depend on "metadata")  This reflects the fact that the 
controller consumes the API provided by the raft module.  (There is a separate 
PR to do this.)

Create a new “server-common” module for common code which is shared by several 
server modules, but not needed for clients.

Remove the dependency between "connect" and "tools"

Create a new “server-tools“ module which depends on “core”

*The Server-Common Module*
The server-common module should contain:
* Pluggable APIs that are used only in the server (not in any client)
* The KIP-405 tiered storage APIs
* Authorizer APIs
* CreateTopicPolicy, AlterConfigPolicy, etc.
* Common Java utility code that is used in the server, but not used in the 
client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.


  was:
*Problems*
The core module takes a long time to compile.  There are several reasons for 
this.  One is that it’s too big -- it would be better as several gradle 
modules. Gradle is good about compiling multiple modules in parallel, but if 
you have one really big module, you lose that parallelism.  Another issue with 
the core module is that it’s written in Scala, and the Scala compiler takes 
longer than the Java one.

A lot of server-side code is in the “clients” module.  From there, it ends up 
on the CLASSPATH of producers, consumers, and admin clients.  This has a lot of 
bad effects: it bloats the size of the clients jar, and allows downstream 
projects to peek at code that should be isolated to the broker.

A lot of tools can’t be put into the “tools” module because they depend on 
classes that are in “core”.  And tools can’t have a core dependency, because 
that would impose a core dependency on connect as well.

One example of this problem is StorageTool and ClusterTool.  These tools ended 
up getting written in Scala and put in the “core” module, rather than the 
“tools” module.

*Proposed Fixes*
Rename the “metadata” module to “controller” to reflect the fact that it 
contains the controller

Make the "controller" module depend on "raft" rather than the other way around 
("raft" used to depend on "metadata")  This reflects the fact that the 
controller consumes the API provided by the raft module.  (There is a separate 
PR to do this.)

Create a new “server-common” module for common code which is shared by several 
server modules, but not needed for clients.

Remove the dependency between "connect" and "tools"

Create a new “server-tools“ module which depends on “core”

*The Server-Common Module*
The server-common module should contain:
* Pluggable APIs that are used only in the server (not in any client)
* The KIP-405 tiered storage APIs
* Authorizer APIs
* CreateTopicPolicy, AlterConfigPolicy, etc.
* Common Java utility code that is used in the server, but not used in the 
client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.



> Add server-common, server-tools gradle modules
> --
>
> Key: KAFKA-12755
> URL: https://issues.apache.org/jira/browse/KAFKA-12755
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
> Attachments: out.jpg, out2.jpg
>
>
> *Problems*
> The core module takes a long time to compile.  There are several reasons 

[jira] [Commented] (KAFKA-12755) Add server-common, server-tools gradle modules

2021-05-05 Thread Kowshik Prakasam (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339766#comment-17339766
 ] 

Kowshik Prakasam commented on KAFKA-12755:
--

cc [~satishd]  with whom we discussed some of these today.

> Add server-common, server-tools gradle modules
> --
>
> Key: KAFKA-12755
> URL: https://issues.apache.org/jira/browse/KAFKA-12755
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
> Attachments: out.jpg, out2.jpg
>
>
> *Problems*
> The core module takes a long time to compile.  There are several reasons for 
> this.  One is that it’s too big -- it would be better as several gradle 
> modules. Gradle is good about compiling multiple modules in parallel, but if 
> you have one really big module, you lose that parallelism.  Another issue 
> with the core module is that it’s written in Scala, and the Scala compiler 
> takes longer than the Java one.
> A lot of server-side code is in the “clients” module.  From there, it ends up 
> on the CLASSPATH of producers, consumers, and admin clients.  This has a lot 
> of bad effects: it bloats the size of the clients jar, and allows downstream 
> projects to peek at code that should be isolated to the broker.
> A lot of tools can’t be put into the “tools” module because they depend on 
> classes that are in “core”.  And tools can’t have a core dependency, because 
> that would impose a core dependency on connect as well.
> One example of this problem is StorageTool and ClusterTool.  These tools 
> ended up getting written in Scala and put in the “core” module, rather than 
> the “tools” module.
> *Proposed Fixes*
> Rename the “metadata” module to “controller” to reflect the fact that it 
> contains the controller
> Make the "controller" module depend on "raft" rather than the other way 
> around ("raft" used to depend on "metadata")  This reflects the fact that the 
> controller consumes the API provided by the raft module.  (There is a 
> separate PR to do this.)
> Create a new “server-common” module for common code which is shared by 
> several server modules, but not needed for clients.
> Remove the dependency between "connect" and "tools"
> Create a new “server-tools“ module which depends on “core”
> *The Server-Common Module*
> The server-common module should contain:
> * Pluggable APIs that are used only in the server (not in any client)
> * The KIP-405 tiered storage APIs
> * Authorizer APIs
> * CreateTopicPolicy, AlterConfigPolicy, etc.
> * Common Java utility code that is used in the server, but not used in the 
> client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules

2021-05-05 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12755:
-
Description: 
*Problems*
The core module takes a long time to compile.  There are several reasons for 
this.  One is that it’s too big -- it would be better as several gradle 
modules. Gradle is good about compiling multiple modules in parallel, but if 
you have one really big module, you lose that parallelism.  Another issue with 
the core module is that it’s written in Scala, and the Scala compiler takes 
longer than the Java one.

A lot of server-side code is in the “clients” module.  From there, it ends up 
on the CLASSPATH of producers, consumers, and admin clients.  This has a lot of 
bad effects: it bloats the size of the clients jar, and allows downstream 
projects to peek at code that should be isolated to the broker.

A lot of tools can’t be put into the “tools” module because they depend on 
classes that are in “core”.  And tools can’t have a core dependency, because 
that would impose a core dependency on connect as well.

One example of this problem is StorageTool and ClusterTool.  These tools ended 
up getting written in Scala and put in the “core” module, rather than the 
“tools” module.

*Proposed Fixes*
Rename the “metadata” module to “controller” to reflect the fact that it 
contains the controller

Make the "controller" module depend on "raft" rather than the other way around 
("raft" used to depend on "metadata")  This reflects the fact that the 
controller consumes the API provided by the raft module.  (There is a separate 
PR to do this.)

Create a new “server-common” module for common code which is shared by several 
server modules, but not needed for clients.

Remove the dependency between "connect" and "tools"

Create a new “server-tools“ module which depends on “core”

*The Server-Common Module*
The server-common module should contain:
* Pluggable APIs that are used only in the server (not in any client)
* The KIP-405 tiered storage APIs
* Authorizer APIs
* CreateTopicPolicy, AlterConfigPolicy, etc.
* Common Java utility code that is used in the server, but not used in the 
client, such as ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.


  was:
*Problems*
The core module takes a long time to compile.  There are several reasons for 
this.  One is that it’s too big -- it would be better as several gradle 
modules. Gradle is good about compiling multiple modules in parallel, but if 
you have one really big module, you lose that parallelism.  Another issue with 
the core module is that it’s written in Scala, and the Scala compiler takes 
longer than the Java one.

A lot of server-side code is in the “clients” module.  From there, it ends up 
on the CLASSPATH of producers, consumers, and admin clients.  This has a lot of 
bad effects: it bloats the size of the clients jar, and allows downstream 
projects to peek at code that should be isolated to the broker.

A lot of tools can’t be put into the “tools” module because they depend on 
classes that are in “core”.  And tools can’t have a core dependency, because 
that would impose a core dependency on connect as well.

One example of this problem is StorageTool and ClusterTool.  These tools ended 
up getting written in Scala and put in the “core” module, rather than the 
“tools” module.

*Proposed Fixes*
Rename the “metadata” module to “controller” to reflect the fact that it 
contains the controller

Make the "controller" module depend on "raft" rather than the other way around 
("raft" used to depend on "metadata")  This reflects the fact that the 
controller consumes the API provided by the raft module.  (There is a separate 
PR to do this.)

Create a new “server-common” module for common code which is shared by several 
server modules, but not needed for clients.

Remove the dependency between "connect" and "tools"

Create a new “server-tools“ module which depends on “core”

*The Server-Common Module*
The server-common module should contain:
* Pluggable APIs that are used only in the server (not in any client)
* The KIP-405 tiered storage APIs
* Authorizer APIs
* CreateTopicPolicy, AlterConfigPolicy, etc.
* Common Java utility code that is used in the server, but not used in the 
client, such as
ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.



> Add server-common, server-tools gradle modules
> --
>
> Key: KAFKA-12755
> URL: https://issues.apache.org/jira/browse/KAFKA-12755
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
> Attachments: out.jpg, out2.jpg
>
>
> *Problems*
> The core module takes a long time to compile.  There are several reasons for 
> this.  One is that it’s too big -- it would be better as several gradle 
> modules. Gradle is good 

[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules

2021-05-05 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12755:
-
Description: 
*Problems*
The core module takes a long time to compile.  There are several reasons for 
this.  One is that it’s too big -- it would be better as several gradle 
modules. Gradle is good about compiling multiple modules in parallel, but if 
you have one really big module, you lose that parallelism.  Another issue with 
the core module is that it’s written in Scala, and the Scala compiler takes 
longer than the Java one.

A lot of server-side code is in the “clients” module.  From there, it ends up 
on the CLASSPATH of producers, consumers, and admin clients.  This has a lot of 
bad effects: it bloats the size of the clients jar, and allows downstream 
projects to peek at code that should be isolated to the broker.

A lot of tools can’t be put into the “tools” module because they depend on 
classes that are in “core”.  And tools can’t have a core dependency, because 
that would impose a core dependency on connect as well.

One example of this problem is StorageTool and ClusterTool.  These tools ended 
up getting written in Scala and put in the “core” module, rather than the 
“tools” module.

*Proposed Fixes*
Rename the “metadata” module to “controller” to reflect the fact that it 
contains the controller

Make the "controller" module depend on "raft" rather than the other way around 
("raft" used to depend on "metadata")  This reflects the fact that the 
controller consumes the API provided by the raft module.  (There is a separate 
PR to do this.)

Create a new “server-common” module for common code which is shared by several 
server modules, but not needed for clients.

Remove the dependency between "connect" and "tools"

Create a new “server-tools“ module which depends on “core”

*The Server-Common Module*
The server-common module should contain:
* Pluggable APIs that are used only in the server (not in any client)
* The KIP-405 tiered storage APIs
* Authorizer APIs
* CreateTopicPolicy, AlterConfigPolicy, etc.
* Common Java utility code that is used in the server, but not used in the 
client, such as
ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.


  was:
*Problems*
The core module takes a long time to compile.  There are several 
reasons for this.  One is that it’s too big -- it would be better as several 
gradle modules. Gradle is good about compiling multiple modules in parallel, 
but if you have one really big module, you lose that parallelism.  Another 
issue with the core module is that it’s written in Scala, and the Scala 
compiler takes longer than the Java one.
A lot of server-side code is in the “clients” module.  From there, it 
ends up on the CLASSPATH of producers, consumers, and admin clients.  This has 
a lot of bad effects: it bloats the size of the clients jar, and allows 
downstream projects to peek at code that should be isolated to the broker.
A lot of tools can’t be put into the “tools” module because they depend 
on classes that are in “core”.  And tools can’t have a core dependency, because 
that would impose a core dependency on connect as well.
One example of this problem is StorageTool and ClusterTool.  These 
tools ended up getting written in Scala and put in the “core” module, rather 
than the “tools” module.

*Proposed Fixes*
Rename the “metadata” module to “controller” to reflect the fact that 
it contains the controller
Make the "controller" module depend on "raft" rather than the other way 
around ("raft" used to depend on "metadata")  This reflects the fact that the 
controller consumes the API provided by the raft module.  (There is a separate 
PR to do this.)
Create a new “server-common” module for common code which is shared by 
several server modules, but not needed for clients.
Remove the dependency between "connect" and "tools"
Create a new “server-tools“ module which depends on “core”

*The Server-Common Module*
The server-common module should contain:
Pluggable APIs that are used only in the server (not in any client)
The KIP-405 tiered storage APIs
Authorizer APIs
CreateTopicPolicy, AlterConfigPolicy, etc.
Common Java utility code that is used in the server, but not used in 
the client, such as
ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.



> Add server-common, server-tools gradle modules
> --
>
> Key: KAFKA-12755
> URL: https://issues.apache.org/jira/browse/KAFKA-12755
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
> Attachments: out.jpg, out2.jpg
>
>
> *Problems*
> The core module takes a long time to compile.  There are several reasons for 
> this.  

[jira] [Commented] (KAFKA-12755) Add server-common, server-tools gradle modules

2021-05-05 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339765#comment-17339765
 ] 

Colin McCabe commented on KAFKA-12755:
--

out.jpg reflects the current dependencies

out2.jpg reflects the proposed dependencies

> Add server-common, server-tools gradle modules
> --
>
> Key: KAFKA-12755
> URL: https://issues.apache.org/jira/browse/KAFKA-12755
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
> Attachments: out.jpg, out2.jpg
>
>
> *Problems*
>   The core module takes a long time to compile.  There are several 
> reasons for this.  One is that it’s too big -- it would be better as several 
> gradle modules. Gradle is good about compiling multiple modules in parallel, 
> but if you have one really big module, you lose that parallelism.  Another 
> issue with the core module is that it’s written in Scala, and the Scala 
> compiler takes longer than the Java one.
>   A lot of server-side code is in the “clients” module.  From there, it 
> ends up on the CLASSPATH of producers, consumers, and admin clients.  This 
> has a lot of bad effects: it bloats the size of the clients jar, and allows 
> downstream projects to peek at code that should be isolated to the broker.
>   A lot of tools can’t be put into the “tools” module because they depend 
> on classes that are in “core”.  And tools can’t have a core dependency, 
> because that would impose a core dependency on connect as well.
>   One example of this problem is StorageTool and ClusterTool.  These 
> tools ended up getting written in Scala and put in the “core” module, rather 
> than the “tools” module.
> *Proposed Fixes*
>   Rename the “metadata” module to “controller” to reflect the fact that 
> it contains the controller
>   Make the "controller" module depend on "raft" rather than the other way 
> around ("raft" used to depend on "metadata")  This reflects the fact that the 
> controller consumes the API provided by the raft module.  (There is a 
> separate PR to do this.)
>   Create a new “server-common” module for common code which is shared by 
> several server modules, but not needed for clients.
>   Remove the dependency between "connect" and "tools"
>   Create a new “server-tools“ module which depends on “core”
> *The Server-Common Module*
> The server-common module should contain:
>   Pluggable APIs that are used only in the server (not in any client)
>   The KIP-405 tiered storage APIs
>   Authorizer APIs
>   CreateTopicPolicy, AlterConfigPolicy, etc.
>   Common Java utility code that is used in the server, but not used in 
> the client, such as
> ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules

2021-05-05 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12755:
-
Attachment: out.jpg

> Add server-common, server-tools gradle modules
> --
>
> Key: KAFKA-12755
> URL: https://issues.apache.org/jira/browse/KAFKA-12755
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
> Attachments: out.jpg, out2.jpg
>
>
> *Problems*
>   The core module takes a long time to compile.  There are several 
> reasons for this.  One is that it’s too big -- it would be better as several 
> gradle modules. Gradle is good about compiling multiple modules in parallel, 
> but if you have one really big module, you lose that parallelism.  Another 
> issue with the core module is that it’s written in Scala, and the Scala 
> compiler takes longer than the Java one.
>   A lot of server-side code is in the “clients” module.  From there, it 
> ends up on the CLASSPATH of producers, consumers, and admin clients.  This 
> has a lot of bad effects: it bloats the size of the clients jar, and allows 
> downstream projects to peek at code that should be isolated to the broker.
>   A lot of tools can’t be put into the “tools” module because they depend 
> on classes that are in “core”.  And tools can’t have a core dependency, 
> because that would impose a core dependency on connect as well.
>   One example of this problem is StorageTool and ClusterTool.  These 
> tools ended up getting written in Scala and put in the “core” module, rather 
> than the “tools” module.
> *Proposed Fixes*
>   Rename the “metadata” module to “controller” to reflect the fact that 
> it contains the controller
>   Make the "controller" module depend on "raft" rather than the other way 
> around ("raft" used to depend on "metadata")  This reflects the fact that the 
> controller consumes the API provided by the raft module.  (There is a 
> separate PR to do this.)
>   Create a new “server-common” module for common code which is shared by 
> several server modules, but not needed for clients.
>   Remove the dependency between "connect" and "tools"
>   Create a new “server-tools“ module which depends on “core”
> *The Server-Common Module*
> The server-common module should contain:
>   Pluggable APIs that are used only in the server (not in any client)
>   The KIP-405 tiered storage APIs
>   Authorizer APIs
>   CreateTopicPolicy, AlterConfigPolicy, etc.
>   Common Java utility code that is used in the server, but not used in 
> the client, such as
> ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules

2021-05-05 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12755:
-
Attachment: out2.jpg

> Add server-common, server-tools gradle modules
> --
>
> Key: KAFKA-12755
> URL: https://issues.apache.org/jira/browse/KAFKA-12755
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
> Attachments: out.jpg, out2.jpg
>
>
> *Problems*
>   The core module takes a long time to compile.  There are several 
> reasons for this.  One is that it’s too big -- it would be better as several 
> gradle modules. Gradle is good about compiling multiple modules in parallel, 
> but if you have one really big module, you lose that parallelism.  Another 
> issue with the core module is that it’s written in Scala, and the Scala 
> compiler takes longer than the Java one.
>   A lot of server-side code is in the “clients” module.  From there, it 
> ends up on the CLASSPATH of producers, consumers, and admin clients.  This 
> has a lot of bad effects: it bloats the size of the clients jar, and allows 
> downstream projects to peek at code that should be isolated to the broker.
>   A lot of tools can’t be put into the “tools” module because they depend 
> on classes that are in “core”.  And tools can’t have a core dependency, 
> because that would impose a core dependency on connect as well.
>   One example of this problem is StorageTool and ClusterTool.  These 
> tools ended up getting written in Scala and put in the “core” module, rather 
> than the “tools” module.
> *Proposed Fixes*
>   Rename the “metadata” module to “controller” to reflect the fact that 
> it contains the controller
>   Make the "controller" module depend on "raft" rather than the other way 
> around ("raft" used to depend on "metadata")  This reflects the fact that the 
> controller consumes the API provided by the raft module.  (There is a 
> separate PR to do this.)
>   Create a new “server-common” module for common code which is shared by 
> several server modules, but not needed for clients.
>   Remove the dependency between "connect" and "tools"
>   Create a new “server-tools“ module which depends on “core”
> *The Server-Common Module*
> The server-common module should contain:
>   Pluggable APIs that are used only in the server (not in any client)
>   The KIP-405 tiered storage APIs
>   Authorizer APIs
>   CreateTopicPolicy, AlterConfigPolicy, etc.
>   Common Java utility code that is used in the server, but not used in 
> the client, such as
> ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12755) Add server-common, server-tools gradle modules

2021-05-05 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12755:
-
Description: 
*Problems*
The core module takes a long time to compile.  There are several 
reasons for this.  One is that it’s too big -- it would be better as several 
gradle modules. Gradle is good about compiling multiple modules in parallel, 
but if you have one really big module, you lose that parallelism.  Another 
issue with the core module is that it’s written in Scala, and the Scala 
compiler takes longer than the Java one.
A lot of server-side code is in the “clients” module.  From there, it 
ends up on the CLASSPATH of producers, consumers, and admin clients.  This has 
a lot of bad effects: it bloats the size of the clients jar, and allows 
downstream projects to peek at code that should be isolated to the broker.
A lot of tools can’t be put into the “tools” module because they depend 
on classes that are in “core”.  And tools can’t have a core dependency, because 
that would impose a core dependency on connect as well.
One example of this problem is StorageTool and ClusterTool.  These 
tools ended up getting written in Scala and put in the “core” module, rather 
than the “tools” module.

*Proposed Fixes*
Rename the “metadata” module to “controller” to reflect the fact that 
it contains the controller
Make the "controller" module depend on "raft" rather than the other way 
around ("raft" used to depend on "metadata")  This reflects the fact that the 
controller consumes the API provided by the raft module.  (There is a separate 
PR to do this.)
Create a new “server-common” module for common code which is shared by 
several server modules, but not needed for clients.
Remove the dependency between "connect" and "tools"
Create a new “server-tools“ module which depends on “core”

*The Server-Common Module*
The server-common module should contain:
Pluggable APIs that are used only in the server (not in any client)
The KIP-405 tiered storage APIs
Authorizer APIs
CreateTopicPolicy, AlterConfigPolicy, etc.
Common Java utility code that is used in the server, but not used in 
the client, such as
ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.


> Add server-common, server-tools gradle modules
> --
>
> Key: KAFKA-12755
> URL: https://issues.apache.org/jira/browse/KAFKA-12755
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>
> *Problems*
>   The core module takes a long time to compile.  There are several 
> reasons for this.  One is that it’s too big -- it would be better as several 
> gradle modules. Gradle is good about compiling multiple modules in parallel, 
> but if you have one really big module, you lose that parallelism.  Another 
> issue with the core module is that it’s written in Scala, and the Scala 
> compiler takes longer than the Java one.
>   A lot of server-side code is in the “clients” module.  From there, it 
> ends up on the CLASSPATH of producers, consumers, and admin clients.  This 
> has a lot of bad effects: it bloats the size of the clients jar, and allows 
> downstream projects to peek at code that should be isolated to the broker.
>   A lot of tools can’t be put into the “tools” module because they depend 
> on classes that are in “core”.  And tools can’t have a core dependency, 
> because that would impose a core dependency on connect as well.
>   One example of this problem is StorageTool and ClusterTool.  These 
> tools ended up getting written in Scala and put in the “core” module, rather 
> than the “tools” module.
> *Proposed Fixes*
>   Rename the “metadata” module to “controller” to reflect the fact that 
> it contains the controller
>   Make the "controller" module depend on "raft" rather than the other way 
> around ("raft" used to depend on "metadata")  This reflects the fact that the 
> controller consumes the API provided by the raft module.  (There is a 
> separate PR to do this.)
>   Create a new “server-common” module for common code which is shared by 
> several server modules, but not needed for clients.
>   Remove the dependency between "connect" and "tools"
>   Create a new “server-tools“ module which depends on “core”
> *The Server-Common Module*
> The server-common module should contain:
>   Pluggable APIs that are used only in the server (not in any client)
>   The KIP-405 tiered storage APIs
>   Authorizer APIs
>   CreateTopicPolicy, AlterConfigPolicy, etc.
>   Common Java utility code that is used in the server, but not used in 
> the client, such as
> ApiMessageAndVersion, KafkaEventQueue, VersionRange, etc.



--
This 

[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-05-05 Thread GitBox


cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832838475


   The build fail due to known flaky tests but not due to SIGABRT. 拾 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata

2021-05-05 Thread GitBox


wcarlson5 commented on pull request #10634:
URL: https://github.com/apache/kafka/pull/10634#issuecomment-832838663


   @rodesai @abbccdda @ableegoldman I had to make a couple changes to the task 
metadata to improve when the end offset was updated. Now we get it at poll 
phase which should give us the highest offset streams has seen at any point


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-05-05 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339760#comment-17339760
 ] 

Bruno Cadonna commented on KAFKA-9295:
--

Failed multiple times for a PR 
(https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10568/5/testReport/)
{code:java}
java.lang.AssertionError: Application did not reach a RUNNING state for all 
streams instances. Non-running instances: 
{org.apache.kafka.streams.KafkaStreams@9cbb96e=REBALANCING, 
org.apache.kafka.streams.KafkaStreams@2e11b5=REBALANCING}
at org.junit.Assert.fail(Assert.java:89)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning(IntegrationTestUtils.java:904)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:197)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:185)
 {code}
 

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12755) Add server-common, server-tools gradle modules

2021-05-05 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12755:


 Summary: Add server-common, server-tools gradle modules
 Key: KAFKA-12755
 URL: https://issues.apache.org/jira/browse/KAFKA-12755
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 opened a new pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata

2021-05-05 Thread GitBox


wcarlson5 opened a new pull request #10634:
URL: https://github.com/apache/kafka/pull/10634


   Improve endOffsets for TaskMetadata also add an int test for TaskMetadata 
offset collections
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10630: MINOR: Stop logging raw record contents above TRACE level in WorkerSourceTask

2021-05-05 Thread GitBox


C0urante commented on pull request #10630:
URL: https://github.com/apache/kafka/pull/10630#issuecomment-832831207


   Thanks @tombentley--good point, I agree that the record content is probably 
not necessary there and think it's safest to remove the record content from the 
message. I've altered it to reference only the topic and partition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10154) Issue in updating metadata if not exists during sending message to different topics

2021-05-05 Thread Oleksandr Tomchakov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17339746#comment-17339746
 ] 

Oleksandr Tomchakov commented on KAFKA-10154:
-

Hi, I've created test-case:
[https://github.com/apache/kafka/compare/trunk...altomch:bug/KAFKA-10154-reproducer]

deadlock is here:
!Screenshot 2021-05-05 at 19.07.26.png|width=508,height=355!

> Issue in updating metadata if not exists during sending message to different 
> topics
> ---
>
> Key: KAFKA-10154
> URL: https://issues.apache.org/jira/browse/KAFKA-10154
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0
>Reporter: Dipti Gupta
>Priority: Major
> Attachments: Screenshot 2021-05-05 at 19.07.26.png
>
>
> Project with following behaviour at : 
> [https://github.com/DiptiGupta/kafka-producer-issue]
>  
> I took reference to this fixed issue 
> https://issues.apache.org/jira/browse/KAFKA-8623
> But on latest version, 
> I'm getting following exception during sending messages to different topics 
> i.e. Topic1 and Topic2.
> It's causing exception for once when metadata for *`Topic2`* doesn't exist.
>  
> {code:java}
> org.springframework.kafka.KafkaException: Send failed; nested exception is 
> org.apache.kafka.common.errors.TimeoutException: Topic Topic2 not present in 
> metadata after 1 ms.org.springframework.kafka.KafkaException: Send 
> failed; nested exception is org.apache.kafka.common.errors.TimeoutException: 
> Topic Topic2 not present in metadata after 1 ms. at 
> org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) 
> ~[spring-kafka-2.5.1.RELEASE.jar:2.5.1.RELEASE]{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10154) Issue in updating metadata if not exists during sending message to different topics

2021-05-05 Thread Oleksandr Tomchakov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Tomchakov updated KAFKA-10154:

Attachment: Screenshot 2021-05-05 at 19.07.26.png

> Issue in updating metadata if not exists during sending message to different 
> topics
> ---
>
> Key: KAFKA-10154
> URL: https://issues.apache.org/jira/browse/KAFKA-10154
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0
>Reporter: Dipti Gupta
>Priority: Major
> Attachments: Screenshot 2021-05-05 at 19.07.26.png
>
>
> Project with following behaviour at : 
> [https://github.com/DiptiGupta/kafka-producer-issue]
>  
> I took reference to this fixed issue 
> https://issues.apache.org/jira/browse/KAFKA-8623
> But on latest version, 
> I'm getting following exception during sending messages to different topics 
> i.e. Topic1 and Topic2.
> It's causing exception for once when metadata for *`Topic2`* doesn't exist.
>  
> {code:java}
> org.springframework.kafka.KafkaException: Send failed; nested exception is 
> org.apache.kafka.common.errors.TimeoutException: Topic Topic2 not present in 
> metadata after 1 ms.org.springframework.kafka.KafkaException: Send 
> failed; nested exception is org.apache.kafka.common.errors.TimeoutException: 
> Topic Topic2 not present in metadata after 1 ms. at 
> org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) 
> ~[spring-kafka-2.5.1.RELEASE.jar:2.5.1.RELEASE]{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-05 Thread Walker Carlson (Jira)
Walker Carlson created KAFKA-12754:
--

 Summary: TaskMetadata endOffsets does not update when the offsets 
are read
 Key: KAFKA-12754
 URL: https://issues.apache.org/jira/browse/KAFKA-12754
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Walker Carlson
Assignee: Walker Carlson


The high water mark in StreamTask is not updated optimally. Also it would be 
good to have the metadata offsets have a initial value of -1 instead of an 
empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12753) Add configuration to prevent MM2 from automatically creating topics on target cluster

2021-05-05 Thread Dave Beech (Jira)
Dave Beech created KAFKA-12753:
--

 Summary: Add configuration to prevent MM2 from automatically 
creating topics on target cluster
 Key: KAFKA-12753
 URL: https://issues.apache.org/jira/browse/KAFKA-12753
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Dave Beech


In a scenario where kafka topics and configuration are usually created and 
managed by an infrastructure-as-code approach (ansible, terraform etc) it might 
be desirable to prevent MM2 creating or syncing any resources by itself. 

We already provide config options to disable refreshing/syncing of topics, but 
currently have no control over their initial creation. Adding new config 
parameters for this would fill the gap. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12730) A single Kerberos login failure fails all future connections from Java 9 onwards

2021-05-05 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-12730:
---
Fix Version/s: 2.8.1
   2.7.2
   2.6.3
   2.5.2

> A single Kerberos login failure fails all future connections from Java 9 
> onwards
> 
>
> Key: KAFKA-12730
> URL: https://issues.apache.org/jira/browse/KAFKA-12730
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.0.0, 2.5.2, 2.6.3, 2.7.2, 2.8.1
>
>
> The refresh thread for Kerberos performs re-login by logging out and then 
> logging in again. If login fails, we retry after a backoff. Every iteration 
> of the loop performs loginContext.logout() and loginContext.login(). If login 
> fails, we end up with two consecutive logouts. This used to work, but from 
> Java 9 onwards, this results in a NullPointerException due to 
> https://bugs.openjdk.java.net/browse/JDK-8173069. We should check if logout 
> is required before attempting logout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10727) Kafka clients throw AuthenticationException during Kerberos re-login

2021-05-05 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-10727:
---
Fix Version/s: 2.7.2
   2.6.3
   2.5.2

> Kafka clients throw AuthenticationException during Kerberos re-login
> 
>
> Key: KAFKA-10727
> URL: https://issues.apache.org/jira/browse/KAFKA-10727
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.5.2, 2.8.0, 2.6.3, 2.7.2
>
>
> During Kerberos re-login, we log out and login again. There is a timing issue 
> where the principal in the Subject has been cleared, but a new one hasn't 
> been populated yet. We need to ensure that we don't throw 
> AuthenticationException in this case to avoid Kafka clients 
> (consumer/producer etc.) failing instead of retrying.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12752) CVE-2021-28168 upgrade jersey to 2.34 or 3.02

2021-05-05 Thread John Stacy (Jira)
John Stacy created KAFKA-12752:
--

 Summary: CVE-2021-28168 upgrade jersey to 2.34 or 3.02
 Key: KAFKA-12752
 URL: https://issues.apache.org/jira/browse/KAFKA-12752
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.0
Reporter: John Stacy


[https://nvd.nist.gov/vuln/detail/CVE-2021-28168]

CVE-2021-28168 affects jersey versions <=2.33, <=3.0.1. Upgrading to 2.34 or 
3.02 should resolve the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12751) ISRs remain in in-flight state if proposed state is same as actual state

2021-05-05 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-12751:
--

 Summary: ISRs remain in in-flight state if proposed state is same 
as actual state
 Key: KAFKA-12751
 URL: https://issues.apache.org/jira/browse/KAFKA-12751
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.8.0, 2.7.0, 2.7.1
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.7.2, 2.8.1


If proposed ISR state in an AlterIsr request is the same as the actual state, 
Controller returns a successful response without performing any updates. But 
the broker code that processes the response leaves the ISR state in in-flight 
state without committing. This prevents further ISR updates until the next 
leader election.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] feyman2016 edited a comment on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-05-05 Thread GitBox


feyman2016 edited a comment on pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#issuecomment-832764031


   @jsancio Hi, could you help to take a look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-05-05 Thread GitBox


feyman2016 commented on pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#issuecomment-832764031


   @jsancio Hi, could you help to take a look? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12429) Serdes for all message types in internal topic which is used in default implementation for RLMM.

2021-05-05 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-12429.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

merged the PR to trunk

> Serdes for all message types in internal topic which is used in default 
> implementation for RLMM.
> 
>
> Key: KAFKA-12429
> URL: https://issues.apache.org/jira/browse/KAFKA-12429
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.0.0
>
>
> RLMM default implementation is based on storing all the metadata in an 
> internal topic.
> We need serdes and format of the message types that need to be stored in the 
> topic.
> You can see more details in the 
> [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-MessageFormat]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-05-05 Thread GitBox


junrao merged pull request #10271:
URL: https://github.com/apache/kafka/pull/10271


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram opened a new pull request #10633: MINOR: Reset AlterIsr in-flight state for duplicate update requests

2021-05-05 Thread GitBox


rajinisivaram opened a new pull request #10633:
URL: https://github.com/apache/kafka/pull/10633


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

2021-05-05 Thread GitBox


spena commented on pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#issuecomment-832730649


   Thanks @guozhangwang, that seems the culprit of performance problem. In 
fact, when the performance tests finish, the script takes some time to complete 
cleaning - I assume it is taking time to clean all consumed memory.
   
   I will work on this fix in another follow-up PR, is that ok? I addressed all 
comments in this so it is ready to merge if you're ok with it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #10631: MINOR: Stop using hamcrest in system tests

2021-05-05 Thread GitBox


lct45 commented on a change in pull request #10631:
URL: https://github.com/apache/kafka/pull/10631#discussion_r626608131



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java
##
@@ -806,20 +799,14 @@ public static boolean verifySync(final String broker, 
final Instant deadline) th
 }
 }
 
-public static  void assertThat(final AtomicBoolean pass,
-  final StringBuilder failures,
-  final String message,
-  final T actual,
-  final Matcher matcher) {
-if (!matcher.matches(actual)) {
+public static void assertThat(final AtomicBoolean pass,
+  final StringBuilder failures,
+  final String message,
+  final boolean passed) {
+if (!passed) {
 if (failures != null) {
-final Description description = new 
StringDescription(failures);
-description.appendText("\n" + message)
-   .appendText("\nExpected: ")
-   .appendDescriptionOf(matcher)
-   .appendText("\n but: ");
-matcher.describeMismatch(actual, description);
-description.appendText("\n");
+final StringBuffer description = new 
StringBuffer(failures);
+description.append("\n" + message);

Review comment:
   I was following the earlier pattern of `matcher` here, but it doesn't 
look like we do anything with this description I think it makes more sense 
to append this to the `failures` that we pass in. I'm not sure if `matcher` did 
that automatically or if this was just a gap?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-05-05 Thread GitBox


cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832644717


   I increased the RocksDB version since I found a more recent version on maven 
central.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-05 Thread GitBox


cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832643004


   OK, the other builds failed due to test failures, not due to SIGABRT. Thta 
is a good sign.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-05 Thread GitBox


cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626511978



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##
@@ -189,16 +199,6 @@ public void shouldCloseStateStoresOnClose() throws 
Exception {
 assertFalse(globalStore.isOpen());
 }
 
-@Test
-public void shouldTransitionToDeadOnClose() throws Exception {

Review comment:
   Because test `shouldStopRunningWhenClosedByUser()` above is exactly the 
same test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12750) kafka.common.serialization.Serializer.serialize() method is ignoring Headers argument

2021-05-05 Thread Naresh (Jira)
Naresh created KAFKA-12750:
--

 Summary: kafka.common.serialization.Serializer.serialize() method 
is ignoring Headers argument
 Key: KAFKA-12750
 URL: https://issues.apache.org/jira/browse/KAFKA-12750
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.7.0
Reporter: Naresh


I am going through kafka's builtin serializers and have found that there is a 
variant of serialize method which accepts Headers and in the interface it 
doesn't use those. Is this intended ?

[https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java#L61]

I do see that other datatype based implementations also dont use this method 
type. Is this method not supported by the kafka protocol ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-05 Thread GitBox


cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626510307



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##
@@ -383,8 +386,7 @@ public void 
shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersionLatest(
 }
 
 private void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics(final 
String builtInMetricsVersion) {
-final InternalMockProcessorContext context = 
createInternalMockProcessorContext(builtInMetricsVersion);
-processor.init(context);
+setup(builtInMetricsVersion, true);

Review comment:
   I do not know. Will set it to `false`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-05 Thread GitBox


cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626508070



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
 return dbOptions.writeBufferManager();
 }
 
+@Override
+public Options setMaxWriteBatchGroupSizeBytes(final long 
maxWriteBatchGroupSizeBytes) {
+dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+return this;
+}
+
+@Override
+public long maxWriteBatchGroupSizeBytes() {
+return dbOptions.maxWriteBatchGroupSizeBytes();
+}
+
+@Override
+public Options oldDefaults(final int majorVersion, final int minorVersion) 
{
+columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+return this;
+}
+
+@Override
+public Options optimizeForSmallDb(final Cache cache) {
+return super.optimizeForSmallDb(cache);
+}
+
+@Override
+public AbstractCompactionFilter> 
compactionFilter() {
+return columnFamilyOptions.compactionFilter();
+}
+
+@Override
+public AbstractCompactionFilterFactory> compactionFilterFactory() {
+return columnFamilyOptions.compactionFilterFactory();
+}
+
+@Override
+public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+return this;
+}
+
+@Override
+public int statsPersistPeriodSec() {
+return dbOptions.statsPersistPeriodSec();
+}
+
+@Override
+public Options setStatsHistoryBufferSize(final long 
statsHistoryBufferSize) {
+dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+return this;
+}
+
+@Override
+public long statsHistoryBufferSize() {
+return dbOptions.statsHistoryBufferSize();
+}
+
+@Override
+public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+return this;
+}
+
+@Override
+public boolean strictBytesPerSync() {
+return dbOptions.strictBytesPerSync();
+}
+
+@Override
+public Options setListeners(final List listeners) {
+dbOptions.setListeners(listeners);
+return this;
+}
+
+@Override
+public List listeners() {
+return dbOptions.listeners();
+}
+
+@Override
+public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) 
{
+dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+return this;
+}
+
+@Override
+public boolean enablePipelinedWrite() {
+return dbOptions.enablePipelinedWrite();
+}
+
+@Override
+public Options setUnorderedWrite(final boolean unorderedWrite) {
+dbOptions.setUnorderedWrite(unorderedWrite);
+return this;
+}
+
+@Override
+public boolean unorderedWrite() {
+return dbOptions.unorderedWrite();
+}
+
+@Override
+public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean 
skipCheckingSstFileSizesOnDbOpen) {
+
dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+return this;
+}
+
+@Override
+public boolean skipCheckingSstFileSizesOnDbOpen() {
+return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+}
+
+@Override
+public Options setWalFilter(final AbstractWalFilter walFilter) {
+dbOptions.setWalFilter(walFilter);
+return this;
+}
+
+@Override
+public WalFilter walFilter() {
+return dbOptions.walFilter();
+}
+
+@Override
+public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+dbOptions.setAllowIngestBehind(allowIngestBehind);
+return this;
+}
+
+@Override
+public boolean allowIngestBehind() {
+return dbOptions.allowIngestBehind();
+}
+
+@Override
+public Options setPreserveDeletes(final boolean preserveDeletes) {
+dbOptions.setPreserveDeletes(preserveDeletes);
+return this;
+}
+
+@Override
+public boolean preserveDeletes() {
+return dbOptions.preserveDeletes();
+}
+
+@Override
+public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+dbOptions.setTwoWriteQueues(twoWriteQueues);
+return this;
+}
+
+@Override
+public boolean twoWriteQueues() {
+return dbOptions.twoWriteQueues();
+}
+
+@Override
+public Options setManualWalFlush(final boolean manualWalFlush) {
+dbOptions.setManualWalFlush(manualWalFlush);
+return this;
+}
+
+@Override
+public boolean manualWalFlush() {
+return dbOptions.manualWalFlush();
+}
+
+@Override
+public Options setCfPaths(final 

[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-05 Thread GitBox


cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626506480



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -99,35 +104,7 @@ public Env getEnv() {
 
 @Override
 public Options prepareForBulkLoad() {
-/* From https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
- *
- * Q: What's the fastest way to load data into RocksDB?
- *
- * A: A fast way to direct insert data to the DB:
- *
- *  1. using single writer thread and insert in sorted order
- *  2. batch hundreds of keys into one write batch
- *  3. use vector memtable
- *  4. make sure options.max_background_flushes is at least 4
- *  5. before inserting the data,
- *   disable automatic compaction,
- *   set options.level0_file_num_compaction_trigger,
- *   options.level0_slowdown_writes_trigger
- *   and options.level0_stop_writes_trigger to very large.
- * After inserting all the data, issue a manual compaction.
- *
- * 3-5 will be automatically done if you call 
Options::PrepareForBulkLoad() to your option
- */
-// (1) not in our control
-// (2) is done via bulk-loading API
-// (3) skipping because, not done in actual PrepareForBulkLoad() code 
in https://github.com/facebook/rocksdb/blob/master/options/options.cc
-//columnFamilyOptions.setMemTableConfig(new VectorMemTableConfig());
-// (4-5) below:
-dbOptions.setMaxBackgroundFlushes(4);
-columnFamilyOptions.setDisableAutoCompactions(true);
-columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
-columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
-columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);

Review comment:
   TBH, I do not understand why we set the options here instead of simply 
calling super.prepareForBulkLoad(). Also in the version that we currently use 
(5.18.4) the options are set like here.
   
   What value would it have to leave a comment with the RocksDB version? I 
guess, we will not downgrade RocksDB less than the version we currently use, 
right?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-05 Thread GitBox


cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626501582



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
 return dbOptions.writeBufferManager();
 }
 
+@Override
+public Options setMaxWriteBatchGroupSizeBytes(final long 
maxWriteBatchGroupSizeBytes) {
+dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+return this;
+}
+
+@Override
+public long maxWriteBatchGroupSizeBytes() {
+return dbOptions.maxWriteBatchGroupSizeBytes();
+}
+
+@Override
+public Options oldDefaults(final int majorVersion, final int minorVersion) 
{
+columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+return this;
+}
+
+@Override
+public Options optimizeForSmallDb(final Cache cache) {
+return super.optimizeForSmallDb(cache);
+}
+
+@Override
+public AbstractCompactionFilter> 
compactionFilter() {
+return columnFamilyOptions.compactionFilter();
+}
+
+@Override
+public AbstractCompactionFilterFactory> compactionFilterFactory() {
+return columnFamilyOptions.compactionFilterFactory();
+}
+
+@Override
+public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+return this;
+}
+
+@Override
+public int statsPersistPeriodSec() {
+return dbOptions.statsPersistPeriodSec();
+}
+
+@Override
+public Options setStatsHistoryBufferSize(final long 
statsHistoryBufferSize) {
+dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+return this;
+}
+
+@Override
+public long statsHistoryBufferSize() {
+return dbOptions.statsHistoryBufferSize();
+}
+
+@Override
+public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+return this;
+}
+
+@Override
+public boolean strictBytesPerSync() {
+return dbOptions.strictBytesPerSync();
+}
+
+@Override
+public Options setListeners(final List listeners) {
+dbOptions.setListeners(listeners);
+return this;
+}
+
+@Override
+public List listeners() {
+return dbOptions.listeners();
+}
+
+@Override
+public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) 
{
+dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+return this;
+}
+
+@Override
+public boolean enablePipelinedWrite() {
+return dbOptions.enablePipelinedWrite();
+}
+
+@Override
+public Options setUnorderedWrite(final boolean unorderedWrite) {
+dbOptions.setUnorderedWrite(unorderedWrite);
+return this;
+}
+
+@Override
+public boolean unorderedWrite() {
+return dbOptions.unorderedWrite();
+}
+
+@Override
+public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean 
skipCheckingSstFileSizesOnDbOpen) {
+
dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+return this;
+}
+
+@Override
+public boolean skipCheckingSstFileSizesOnDbOpen() {
+return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+}
+
+@Override
+public Options setWalFilter(final AbstractWalFilter walFilter) {
+dbOptions.setWalFilter(walFilter);

Review comment:
   I am not sure since users could theoretically activate the WAL in the 
config setter. We do not overwrite user settings as far as I can see. That is 
consistent with the rest of Streams where the users can overwrite parameters 
set by the DSL.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >