[jira] [Comment Edited] (KAFKA-10313) Out of range offset errors leading to offset reset

2021-08-25 Thread qiang Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403089#comment-17403089
 ] 

qiang Liu edited comment on KAFKA-10313 at 8/26/21, 6:35 AM:
-

I am using 1.1.1, got offset out of range when partition leader switch. it may 
be caused by KAFKA-9835


was (Author: iamgd67):
I am using 1.1.1, got offset out of range when partition leader switch. it may 
be caused by  [KAFKA-9835|https://issues.apache.org/jira/browse/KAFKA-9835]

> Out of range offset errors leading to offset reset
> --
>
> Key: KAFKA-10313
> URL: https://issues.apache.org/jira/browse/KAFKA-10313
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2
>Reporter: Varsha Abhinandan
>Priority: Major
>
> Hi,
>   
>  We have been occasionally noticing offset resets happening on the Kafka 
> consumer because of offset out of range error. However, I don't see any 
> errors in the broker logs. No logs related to leader-election, replica lag, 
> Kafka broker pod restarts or anything. (just info logs were enabled in the 
> prod environment).
>   
>  It appeared from the logs that the out of range error was because of the 
> fetch offset being larger than the offset range on the broker. Noticed this 
> happening multiple times on different consumers, stream apps in the prod 
> environment. So, it doesn't seem like an application bug and more like a bug 
> in the KafkaConsumer. Would like to understand the cause for such errors.
>   
>  Also, none of the offset reset options are desirable. Choosing "earliest" 
> creates a sudden huge lag (we have a retention of 24hours) and choosing 
> "latest" leads to data loss (the records produced between the out of range 
> error and when offset reset happens on the consumer). So, wondering if it is 
> better for the Kafka client to separate out 'auto.offset.reset' config for 
> just offset not found. For, out of range error maybe the Kafka client can 
> automatically reset the offset to latest if the fetch offset is higher to 
> prevent data loss. Also, automatically reset it to earliest if the fetch 
> offset is lesser than the start offset. 
>   
>  Following are the logs on the consumer side :
> {noformat}
> [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range 
> for partition prd453-19-event-upsert-32, resetting offset
> [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Resetting offset for partition 
> prd453-19-event-upsert-32 to offset 453223789.
>   {noformat}
> Broker logs for the partition :
> {noformat}
> [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable 
> segments with base offsets [452091893] due to retention time 8640ms breach
>  [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log 
> segment [baseOffset 452091893, size 1073741693] for deletion.
>  [2020-07-17T07:40:12,083Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log 
> start offset to 453223789
>  [2020-07-17T07:41:12,083Z]  [INFO ]  [kafka-scheduler-7]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 
> 452091893
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted log 
> /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted offset index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted time index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted.
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  
> [kafka.log.ProducerStateManager]  [ProducerStateManager 
> partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 
> 475609786
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  
> [k

[jira] [Comment Edited] (KAFKA-10313) Out of range offset errors leading to offset reset

2021-08-25 Thread qiang Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403089#comment-17403089
 ] 

qiang Liu edited comment on KAFKA-10313 at 8/26/21, 6:31 AM:
-

I am using 1.1.1, got offset out of range when partition leader switch. it may 
be caused by  [KAFKA-9835|https://issues.apache.org/jira/browse/KAFKA-9835]


was (Author: iamgd67):
I am using 1.1.1, got offset out of range when partition leader switch. it may 
be caused by  https://issues.apache.org/jira/browse/KAFKA-9835

> Out of range offset errors leading to offset reset
> --
>
> Key: KAFKA-10313
> URL: https://issues.apache.org/jira/browse/KAFKA-10313
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2
>Reporter: Varsha Abhinandan
>Priority: Major
>
> Hi,
>   
>  We have been occasionally noticing offset resets happening on the Kafka 
> consumer because of offset out of range error. However, I don't see any 
> errors in the broker logs. No logs related to leader-election, replica lag, 
> Kafka broker pod restarts or anything. (just info logs were enabled in the 
> prod environment).
>   
>  It appeared from the logs that the out of range error was because of the 
> fetch offset being larger than the offset range on the broker. Noticed this 
> happening multiple times on different consumers, stream apps in the prod 
> environment. So, it doesn't seem like an application bug and more like a bug 
> in the KafkaConsumer. Would like to understand the cause for such errors.
>   
>  Also, none of the offset reset options are desirable. Choosing "earliest" 
> creates a sudden huge lag (we have a retention of 24hours) and choosing 
> "latest" leads to data loss (the records produced between the out of range 
> error and when offset reset happens on the consumer). So, wondering if it is 
> better for the Kafka client to separate out 'auto.offset.reset' config for 
> just offset not found. For, out of range error maybe the Kafka client can 
> automatically reset the offset to latest if the fetch offset is higher to 
> prevent data loss. Also, automatically reset it to earliest if the fetch 
> offset is lesser than the start offset. 
>   
>  Following are the logs on the consumer side :
> {noformat}
> [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range 
> for partition prd453-19-event-upsert-32, resetting offset
> [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Resetting offset for partition 
> prd453-19-event-upsert-32 to offset 453223789.
>   {noformat}
> Broker logs for the partition :
> {noformat}
> [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable 
> segments with base offsets [452091893] due to retention time 8640ms breach
>  [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log 
> segment [baseOffset 452091893, size 1073741693] for deletion.
>  [2020-07-17T07:40:12,083Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log 
> start offset to 453223789
>  [2020-07-17T07:41:12,083Z]  [INFO ]  [kafka-scheduler-7]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 
> 452091893
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted log 
> /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted offset index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted time index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted.
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  
> [kafka.log.ProducerStateManager]  [ProducerStateManager 
> partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 
> 475609786
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [dat

[jira] [Comment Edited] (KAFKA-10313) Out of range offset errors leading to offset reset

2021-08-25 Thread qiang Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403089#comment-17403089
 ] 

qiang Liu edited comment on KAFKA-10313 at 8/26/21, 6:30 AM:
-

I am using 1.1.1, got offset out of range when partition leader switch. it may 
be caused by  https://issues.apache.org/jira/browse/KAFKA-9835


was (Author: iamgd67):
I am using 1.1.1, likely encounter the same issue.

> Out of range offset errors leading to offset reset
> --
>
> Key: KAFKA-10313
> URL: https://issues.apache.org/jira/browse/KAFKA-10313
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2
>Reporter: Varsha Abhinandan
>Priority: Major
>
> Hi,
>   
>  We have been occasionally noticing offset resets happening on the Kafka 
> consumer because of offset out of range error. However, I don't see any 
> errors in the broker logs. No logs related to leader-election, replica lag, 
> Kafka broker pod restarts or anything. (just info logs were enabled in the 
> prod environment).
>   
>  It appeared from the logs that the out of range error was because of the 
> fetch offset being larger than the offset range on the broker. Noticed this 
> happening multiple times on different consumers, stream apps in the prod 
> environment. So, it doesn't seem like an application bug and more like a bug 
> in the KafkaConsumer. Would like to understand the cause for such errors.
>   
>  Also, none of the offset reset options are desirable. Choosing "earliest" 
> creates a sudden huge lag (we have a retention of 24hours) and choosing 
> "latest" leads to data loss (the records produced between the out of range 
> error and when offset reset happens on the consumer). So, wondering if it is 
> better for the Kafka client to separate out 'auto.offset.reset' config for 
> just offset not found. For, out of range error maybe the Kafka client can 
> automatically reset the offset to latest if the fetch offset is higher to 
> prevent data loss. Also, automatically reset it to earliest if the fetch 
> offset is lesser than the start offset. 
>   
>  Following are the logs on the consumer side :
> {noformat}
> [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range 
> for partition prd453-19-event-upsert-32, resetting offset
> [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Resetting offset for partition 
> prd453-19-event-upsert-32 to offset 453223789.
>   {noformat}
> Broker logs for the partition :
> {noformat}
> [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable 
> segments with base offsets [452091893] due to retention time 8640ms breach
>  [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log 
> segment [baseOffset 452091893, size 1073741693] for deletion.
>  [2020-07-17T07:40:12,083Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log 
> start offset to 453223789
>  [2020-07-17T07:41:12,083Z]  [INFO ]  [kafka-scheduler-7]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 
> 452091893
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted log 
> /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted offset index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted time index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted.
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  
> [kafka.log.ProducerStateManager]  [ProducerStateManager 
> partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 
> 475609786
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  
> [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data

[jira] [Created] (KAFKA-13229) KIP-761: implement a total blocked time metric in Kafka Streams

2021-08-25 Thread Rohan Desai (Jira)
Rohan Desai created KAFKA-13229:
---

 Summary: KIP-761: implement a total blocked time metric in Kafka 
Streams
 Key: KAFKA-13229
 URL: https://issues.apache.org/jira/browse/KAFKA-13229
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.1.0
Reporter: Rohan Desai
 Fix For: 3.1.0


KIP-761 proposes a total blocked time metric in streams that measures the total 
time (since the thread was started) that a given thread is blocked on Kafka.

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman opened a new pull request #11263: MINOR: remove unused Properties from GraphNode#writeToTopology

2021-08-25 Thread GitBox


ableegoldman opened a new pull request #11263:
URL: https://github.com/apache/kafka/pull/11263


   While going through the StreamsBuilder#build method to see if any of the 
steps actually modify internal state, I noticed this GraphNode method accepts a 
Properties input parameter but never uses it in any of its implementations. We 
can remove this parameter to clean things up, and make it clear that writing 
nodes to the topology doesn't involve the app properties. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-25 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696290766



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
+@Test
+public void testMeasureCommitSyncDuration() {
+// use a consumer that will throw to ensure we return quickly

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-25 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696290580



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
+@Test
+public void testMeasureCommitSyncDuration() {
+// use a consumer that will throw to ensure we return quickly
+Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+ConsumerMetadata metadata = createMetadata(subscription);
+MockClient client = new MockClient(time, metadata);
+initMetadata(client, singletonMap(topic, 1));
+Node node = metadata.fetch().nodes().get(0);
+ConsumerPartitionAssignor assignor = new RangeAssignor();
+client.createPendingAuthenticationError(node, 0);
+final KafkaConsumer consumer
+= newConsumer(time, client, subscription, metadata, assignor, 
false, groupInstanceId);

Review comment:
   refactored




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11259: HOTFIX: decrease session timeout in flaky NamedTopologyIntegrationTest

2021-08-25 Thread GitBox


ableegoldman merged pull request #11259:
URL: https://github.com/apache/kafka/pull/11259


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #11233: HOTFIX: Disable spurious left/outer stream-stream join fix

2021-08-25 Thread GitBox


mjsax commented on pull request #11233:
URL: https://github.com/apache/kafka/pull/11233#issuecomment-906091101


   @showuon -- thanks for taking care of the docs -- I did actually not forget 
about it and was just about to do a follow up PR -- great that it is already 
done!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-25 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696282952



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
+@Test
+public void testMeasureCommitSyncDuration() {
+// use a consumer that will throw to ensure we return quickly

Review comment:
   Yeah. There are not tests for that path, and I lost steam trying to pay 
that debt just to implement this metric.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13160) Fix the code that calls the broker's config handler to pass the expected default resource name when using KRaft.

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-13160:
---
Summary: Fix the code that calls the broker's config handler to pass the 
expected default resource name when using KRaft.  (was: Fix the code that calls 
the broker’s config handler to pass the expected default resource name when 
using KRaft.)

> Fix the code that calls the broker's config handler to pass the expected 
> default resource name when using KRaft.
> 
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the KRaft brokers, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in metadata with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. The code that 
> calls the handler method for config changes should be fixed to pass 
> "" instead of empty string to the handler method if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-25 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696280437



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
+@Test
+public void testMeasureCommitSyncDuration() {
+// use a consumer that will throw to ensure we return quickly
+Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+ConsumerMetadata metadata = createMetadata(subscription);
+MockClient client = new MockClient(time, metadata);
+initMetadata(client, singletonMap(topic, 1));
+Node node = metadata.fetch().nodes().get(0);
+ConsumerPartitionAssignor assignor = new RangeAssignor();
+client.createPendingAuthenticationError(node, 0);
+final KafkaConsumer consumer
+= newConsumer(time, client, subscription, metadata, assignor, 
false, groupInstanceId);

Review comment:
   it doesn't set a tick on the mock time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13032) Impossible stacktrace

2021-08-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404902#comment-17404902
 ] 

Matthias J. Sax commented on KAFKA-13032:
-

It seems [~cadonna] reviewed the PR – so I'll leave it to him to merge it.

> Impossible stacktrace
> -
>
> Key: KAFKA-13032
> URL: https://issues.apache.org/jira/browse/KAFKA-13032
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Niclas Hedhman
>Assignee: Yanwen Lin
>Priority: Minor
>  Labels: beginner, easy-fix
>
> I am presented with a stacktrace that has not a single touch point in my 
> code, so it is incredibly difficult to figure out where the problem could be. 
> I think more RuntimeExceptions need to be caught and pull out information at 
> each level that is providing any additional hint of where we are.
> For instance, each node could prepend its reference/name and one would have a 
> chance to see where we are...
> ```
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_140, processor=KSTREAM-SOURCE-00, topic=_poll, 
> partition=140, offset=0, stacktrace=java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>   at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:268)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:50)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>   at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafk

[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler

2021-08-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404900#comment-17404900
 ] 

Matthias J. Sax commented on KAFKA-13195:
-

[~cadonna] – I think it's still open for discussion if such a new feature is 
useful or not. Thus, it might be ok to leave it open to collect more feedback?

[~tchiotludo] – as pointed out before: changing the serialization format is 
considered a "non-compatible" change, and thus it's questionable if using a 
deserialization handler would be the right solution for this problem. I would 
not jump to a conclusion too eagerly. It's a difficult problem that might need 
a more generic solution. – Atm, if you change the format, it's required to 
reset your application to wipe out old state and start with empty state.

> StateSerde don't honor DeserializationExceptionHandler
> --
>
> Key: KAFKA-13195
> URL: https://issues.apache.org/jira/browse/KAFKA-13195
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Ludo
>Priority: Major
>
> Kafka streams allow to configure an 
> [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling]
>  
> When you are using a StateStore most of message will be a copy of original 
> message in internal topic and mostly will use the same serializer if the 
> message is another type. 
> You can see 
> [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161]
>  that StateSerde is using the raw Deserializer and not honor the 
> {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}.
> Leading to crash the application (reaching the 
> {{setUncaughtExceptionHandler}} method).
> I think the state store must have the same behavior than the 
> {{RecordDeserializer}} and honor the DeserializationExceptionHandler.
>  
> Stacktrace (coming from kafka stream 2.6.1) :
>  
> {code:java}
> Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_14, processor=workertaskjoined-repartition-source, 
> topic=kestra_executor-workertaskjoined-repartition, partition=14, 
> offset=167500, 
> stacktrace=org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize 
> value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String 
> "txt": not one of the values accepted for Enum class: 
> [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: 
> (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through 
> reference chain: 
> io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"])
>  at 
> com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188)
>  at 
> com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107)
>  at 
> com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDese

[jira] [Updated] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-7271:
--
Fix Version/s: (was: 3.0.0)

> Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
> ---
>
> Key: KAFKA-7271
> URL: https://issues.apache.org/jira/browse/KAFKA-7271
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Priority: Blocker
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-5905) Remove PrincipalBuilder and DefaultPrincipalBuilder

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-5905:
--
Fix Version/s: (was: 3.0.0)

> Remove PrincipalBuilder and DefaultPrincipalBuilder
> ---
>
> Key: KAFKA-5905
> URL: https://issues.apache.org/jira/browse/KAFKA-5905
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Manikumar
>Priority: Blocker
>
> These classes were deprecated after KIP-189: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-189%3A+Improve+principal+builder+interface+and+add+support+for+SASL,
>  which is part of 1.0.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12582) Remove deprecated `ConfigEntry` constructor

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-12582:
---
Fix Version/s: (was: 3.0.0)

> Remove deprecated `ConfigEntry` constructor
> ---
>
> Key: KAFKA-12582
> URL: https://issues.apache.org/jira/browse/KAFKA-12582
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> ConfigEntry's constructor was deprecated in 1.1.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10329) Enable connector context in logs by default

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-10329:
---
Fix Version/s: (was: 3.0.0)

> Enable connector context in logs by default
> ---
>
> Key: KAFKA-10329
> URL: https://issues.apache.org/jira/browse/KAFKA-10329
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Randall Hauch
>Priority: Blocker
>  Labels: needs-kip
>
> When 
> [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs]
>  was implemented and released as part of AK 2.3, we chose to not enable these 
> extra logging context information by default because it was not backward 
> compatible, and anyone relying upon the `connect-log4j.properties` file 
> provided by the AK distribution would after an upgrade to AK 2.3 (or later) 
> see different formats for their logs, which could break any log processing 
> functionality they were relying upon.
> However, we should enable this in AK 3.0, whenever that comes. Doing so will 
> require a fairly minor KIP to change the `connect-log4j.properties` file 
> slightly.
> Marked this as BLOCKER since it's a backward incompatible change that we 
> definitely want to do in the 3.0.0 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12929) KIP-750: Deprecate support for Java 8 in Kafka 3.0

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-12929.

Resolution: Fixed

Resolving to unblock the RC for 3.0.0 and will keep a note to add to the 
downloads page



> KIP-750: Deprecate support for Java 8 in Kafka 3.0
> --
>
> Key: KAFKA-12929
> URL: https://issues.apache.org/jira/browse/KAFKA-12929
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12599) Remove deprecated --zookeeper in preferredReplicaLeaderElectionCommand

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-12599:
---
Fix Version/s: (was: 3.0.0)

> Remove deprecated --zookeeper in preferredReplicaLeaderElectionCommand
> --
>
> Key: KAFKA-12599
> URL: https://issues.apache.org/jira/browse/KAFKA-12599
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12930) KIP-751: Deprecate support for Scala 2.12 in Kafka 3.0

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-12930.

Resolution: Fixed

Resolving to unblock the RC for 3.0.0 and will keep a note to add to the 
downloads page

> KIP-751: Deprecate support for Scala 2.12 in Kafka 3.0
> --
>
> Key: KAFKA-12930
> URL: https://issues.apache.org/jira/browse/KAFKA-12930
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13095) TransactionsTest is failing in kraft mode

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-13095:
---
Fix Version/s: (was: 3.0.0)

> TransactionsTest is failing in kraft mode
> -
>
> Key: KAFKA-13095
> URL: https://issues.apache.org/jira/browse/KAFKA-13095
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Blocker
>
> TransactionsTest#testSendOffsetsToTransactionTimeout keeps flaking on Jenkins.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13159) Enable system tests for transactions in KRaft mode

2021-08-25 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404886#comment-17404886
 ] 

Konstantine Karantasis commented on KAFKA-13159:


For the remainder of this ticket I'm moving to 3.0.1 because this open ticket 
is blocking the 3.0.0 RC

> Enable system tests for transactions in KRaft mode
> --
>
> Key: KAFKA-13159
> URL: https://issues.apache.org/jira/browse/KAFKA-13159
> Project: Kafka
>  Issue Type: Test
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.1.0, 3.0.1
>
>
> Previously, we disabled several system tests involving system tests in KRaft 
> mode. Now that KIP-730 is complete and transactions work in KRaft, we need to 
> re-enable these tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13159) Enable system tests for transactions in KRaft mode

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-13159:
---
Fix Version/s: (was: 3.0.0)
   3.0.1

> Enable system tests for transactions in KRaft mode
> --
>
> Key: KAFKA-13159
> URL: https://issues.apache.org/jira/browse/KAFKA-13159
> Project: Kafka
>  Issue Type: Test
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.1.0, 3.0.1
>
>
> Previously, we disabled several system tests involving system tests in KRaft 
> mode. Now that KIP-730 is complete and transactions work in KRaft, we need to 
> re-enable these tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13223) Idempotent producer error with Kraft

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-13223:
---
Fix Version/s: (was: 3.0.0)

> Idempotent producer error with Kraft 
> -
>
> Key: KAFKA-13223
> URL: https://issues.apache.org/jira/browse/KAFKA-13223
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, producer 
>Reporter: Laurynas Butkus
>Priority: Major
>
> I get an error *"The broker does not support INIT_PRODUCER_ID"* if I try to 
> produce a message idempotence enabled.
> Result:
> {code:java}
> ➜  ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test --request-required-acks -1 --producer-property enable.idempotence=true
> >test
> >[2021-08-23 19:40:33,356] ERROR [Producer clientId=console-producer] 
> >Aborting producer batches due to fatal error 
> >(org.apache.kafka.clients.producer.internals.Sender)
> org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
> not support INIT_PRODUCER_ID
> [2021-08-23 19:40:33,358] ERROR Error when sending message to topic test with 
> key: null, value: 4 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
> not support INIT_PRODUCER_ID
> {code}
>  
> It works fine with idempotence disabled. Also it works fine if using 
> zookeeper.
> Tested with altered docker image: 
> {code:java}
> FROM confluentinc/cp-kafka:6.2.0
> RUN sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure && \
> # Docker workaround: Ignore cub zk-ready
> sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure && 
> \
> # KRaft required step: Format the storage directory with a new cluster ID
> echo "kafka-storage format --ignore-formatted -t $(kafka-storage random-uuid) 
> -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
> {code}
> docker-compose.yml
> {code:java}
> version: '3.4'
> services:
>   kafka:
> build: kafka
> restart: unless-stopped
> environment:
>   ALLOW_PLAINTEXT_LISTENER: "yes"
>   KAFKA_HEAP_OPTS: -Xms256m -Xmx256m
>   LOG4J_LOGGER_KAFKA: "WARN"
>   KAFKA_BROKER_ID: 1
>   KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
> 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
>   KAFKA_ADVERTISED_LISTENERS: 
> 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092'
>   KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>   KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
>   KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
>   KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
>   KAFKA_PROCESS_ROLES: 'broker,controller'
>   KAFKA_NODE_ID: 1
>   KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
>   KAFKA_LISTENERS: 
> 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://:9092'
>   KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
>   KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
>   KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
> ports:
>   - "127.0.0.1:9092:9092/tcp"
> command: "/etc/confluent/docker/run"
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-13223) Idempotent producer error with Kraft

2021-08-25 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis closed KAFKA-13223.
--

> Idempotent producer error with Kraft 
> -
>
> Key: KAFKA-13223
> URL: https://issues.apache.org/jira/browse/KAFKA-13223
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, producer 
>Reporter: Laurynas Butkus
>Priority: Major
> Fix For: 3.0.0
>
>
> I get an error *"The broker does not support INIT_PRODUCER_ID"* if I try to 
> produce a message idempotence enabled.
> Result:
> {code:java}
> ➜  ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test --request-required-acks -1 --producer-property enable.idempotence=true
> >test
> >[2021-08-23 19:40:33,356] ERROR [Producer clientId=console-producer] 
> >Aborting producer batches due to fatal error 
> >(org.apache.kafka.clients.producer.internals.Sender)
> org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
> not support INIT_PRODUCER_ID
> [2021-08-23 19:40:33,358] ERROR Error when sending message to topic test with 
> key: null, value: 4 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
> not support INIT_PRODUCER_ID
> {code}
>  
> It works fine with idempotence disabled. Also it works fine if using 
> zookeeper.
> Tested with altered docker image: 
> {code:java}
> FROM confluentinc/cp-kafka:6.2.0
> RUN sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure && \
> # Docker workaround: Ignore cub zk-ready
> sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure && 
> \
> # KRaft required step: Format the storage directory with a new cluster ID
> echo "kafka-storage format --ignore-formatted -t $(kafka-storage random-uuid) 
> -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
> {code}
> docker-compose.yml
> {code:java}
> version: '3.4'
> services:
>   kafka:
> build: kafka
> restart: unless-stopped
> environment:
>   ALLOW_PLAINTEXT_LISTENER: "yes"
>   KAFKA_HEAP_OPTS: -Xms256m -Xmx256m
>   LOG4J_LOGGER_KAFKA: "WARN"
>   KAFKA_BROKER_ID: 1
>   KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
> 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
>   KAFKA_ADVERTISED_LISTENERS: 
> 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092'
>   KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>   KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
>   KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
>   KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
>   KAFKA_PROCESS_ROLES: 'broker,controller'
>   KAFKA_NODE_ID: 1
>   KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
>   KAFKA_LISTENERS: 
> 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://:9092'
>   KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
>   KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
>   KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
> ports:
>   - "127.0.0.1:9092:9092/tcp"
> command: "/etc/confluent/docker/run"
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] alapidas opened a new pull request #11262: KAFKA-12963: Add processor name to error

2021-08-25 Thread GitBox


alapidas opened a new pull request #11262:
URL: https://github.com/apache/kafka/pull/11262


   This PR adds the processor name to the `ClassCastException` exception text 
in `process()`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11235: KAFKA-13216: write correct tombstones into stream-stream join store changelog

2021-08-25 Thread GitBox


mjsax commented on a change in pull request #11235:
URL: https://github.com/apache/kafka/pull/11235#discussion_r696258817



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RawKeyAccessor.java
##
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals.metrics;
+
+import java.util.Collection;
+import org.apache.kafka.common.utils.Bytes;
+
+public interface RawKeyAccessor {

Review comment:
   Sure, I can add some JavaDoc. Won't be brief though... It's 
complicated... :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12963) Improve error message for Class cast exception

2021-08-25 Thread Andrew Lapidas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Lapidas reassigned KAFKA-12963:
--

Assignee: Andrew Lapidas  (was: A. Sophie Blee-Goldman)

> Improve error message for Class cast exception
> --
>
> Key: KAFKA-12963
> URL: https://issues.apache.org/jira/browse/KAFKA-12963
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Rasmus Helbig Hansen
>Assignee: Andrew Lapidas
>Priority: Minor
>
> After a topology change and starting the application again, we got this type 
> of error message:
>  [g9z-StreamThread-1] ERROR 
> org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
> [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
> error:
>  org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> invoking Processor. Do the Processor's input types match the deserialized 
> types? Check the Serde setup and change the default Serdes in StreamConfig or 
> provide correct Serdes via method parameters. Make sure the Processor can 
> accept the deserialized input of type key: org.acme.SomeKey, and value: 
> org.acme.SomeValue.
>  Note that although incorrect Serdes are a common cause of error, the cast 
> exception might have another cause (in user code, for example). For example, 
> if a processor wires in a store, but casts the generics incorrectly, a class 
> cast exception could be raised during processing, but the cause would not be 
> wrong Serdes.
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
>  Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be 
> cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue 
> are in unnamed module of loader 'app')
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>  ... 20 more
>  [g9z-StreamThread-1] ERROR 
> org.apache.ka

[jira] [Commented] (KAFKA-12963) Improve error message for Class cast exception

2021-08-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404853#comment-17404853
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12963:


Hey [~RasmusAtNordea], just to clarify the ask here is to add the processor 
name to this exception message? I don't think that was ever in there so I 
wouldn't say it's a regression, but I definitely agree that should have been in 
there. Is there anything else you can think of that would have been helpful in 
debugging this?

 

The one thing I'm not sure makes sense is the topic – what topic would that be? 
The processors themselves aren't attached to any specific topic, except of 
course a source or a sink. But I think maybe you meant, which topic did the 
record come from? The problem is, even that doesn't exactly have an answer. For 
one thing, the ClassCastException may have come from anywhere – could just be a 
bug in the processing logic itself, unattached to any specific record. Of 
course there's the specific record that #process is being called on, but maybe 
you have two different topics feeding into this processor and the 
ClassCastException was actually due to a record from the other topic that was 
just fetched from an attached state store. Or maybe the record being processed 
was created by a punctuator and never came from any topic at all. And so 
on...sorry for the long-winded way of saying that there's actually a reason for 
the topic info being left out, unlike the processor name which definitely 
should be included. Hope that makes sense?

> Improve error message for Class cast exception
> --
>
> Key: KAFKA-12963
> URL: https://issues.apache.org/jira/browse/KAFKA-12963
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Rasmus Helbig Hansen
>Assignee: A. Sophie Blee-Goldman
>Priority: Minor
>
> After a topology change and starting the application again, we got this type 
> of error message:
>  [g9z-StreamThread-1] ERROR 
> org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
> [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
> error:
>  org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> invoking Processor. Do the Processor's input types match the deserialized 
> types? Check the Serde setup and change the default Serdes in StreamConfig or 
> provide correct Serdes via method parameters. Make sure the Processor can 
> accept the deserialized input of type key: org.acme.SomeKey, and value: 
> org.acme.SomeValue.
>  Note that although incorrect Serdes are a common cause of error, the cast 
> exception might have another cause (in user code, for example). For example, 
> if a processor wires in a store, but casts the generics incorrectly, a class 
> cast exception could be raised during processing, but the cause would not be 
> wrong Serdes.
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency

[jira] [Assigned] (KAFKA-12963) Improve error message for Class cast exception

2021-08-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12963:
--

Assignee: A. Sophie Blee-Goldman

> Improve error message for Class cast exception
> --
>
> Key: KAFKA-12963
> URL: https://issues.apache.org/jira/browse/KAFKA-12963
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Rasmus Helbig Hansen
>Assignee: A. Sophie Blee-Goldman
>Priority: Minor
>
> After a topology change and starting the application again, we got this type 
> of error message:
>  [g9z-StreamThread-1] ERROR 
> org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
> [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
> error:
>  org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> invoking Processor. Do the Processor's input types match the deserialized 
> types? Check the Serde setup and change the default Serdes in StreamConfig or 
> provide correct Serdes via method parameters. Make sure the Processor can 
> accept the deserialized input of type key: org.acme.SomeKey, and value: 
> org.acme.SomeValue.
>  Note that although incorrect Serdes are a common cause of error, the cast 
> exception might have another cause (in user code, for example). For example, 
> if a processor wires in a store, but casts the generics incorrectly, a class 
> cast exception could be raised during processing, but the cause would not be 
> wrong Serdes.
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
>  Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be 
> cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue 
> are in unnamed module of loader 'app')
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>  ... 20 more
>  [g9z-StreamThread-1] ERROR 
> org.apache.k

[GitHub] [kafka] showuon commented on pull request #11218: MINOR: optimize performAssignment to skip unnecessary check

2021-08-25 Thread GitBox


showuon commented on pull request #11218:
URL: https://github.com/apache/kafka/pull/11218#issuecomment-906022496


   @ableegoldman , do you think this optimization make sense? And will 
`StreamsPartitionAssignor` have chance to distribute partitions not in the 
subscription list? Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #11260: MINOR: Add missing license entry for jline in LICENSE-binary

2021-08-25 Thread GitBox


kkonstantine merged pull request #11260:
URL: https://github.com/apache/kafka/pull/11260


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11260: MINOR: Add missing license entry for jline in LICENSE-binary

2021-08-25 Thread GitBox


kkonstantine commented on pull request #11260:
URL: https://github.com/apache/kafka/pull/11260#issuecomment-906020193


   Merging to include to 3.0 without waiting for tests to run because this is 
just documentation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #11261: KAFKA-13228: Ensure ApiVersionRequest is properly handled in KRaft

2021-08-25 Thread GitBox


dengziming opened a new pull request #11261:
URL: https://github.com/apache/kafka/pull/11261


   *More detailed description of your change*
   When I described quorum in Kraft mode I got 
`org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
not support DESCRIBE_QUORUM`.
   This happens because we only concerns `ApiKeys.zkBrokerApis()` when we call 
`NodeApiVersions.create()`, we should use `ApiKeys.controllerApiVersions` when 
in Kraft mode.
   
   *Summary of testing strategy (including rationale)*
   After this change, the DESCRIBE_QUORUM request was property handled and got 
a correct response:
   TopicData(topicName='__cluster_metadata', 
partitions=[PartitionData(partitionIndex=0, errorCode=0, leaderId=1, 
leaderEpoch=30, highWatermark=141, currentVoters=[ReplicaState(replicaId=1, 
logEndOffset=141)], observers=[])])
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine opened a new pull request #11260: MINOR: Add missing license entry for jline in LICENSE-binary

2021-08-25 Thread GitBox


kkonstantine opened a new pull request #11260:
URL: https://github.com/apache/kafka/pull/11260


   Adding the license for https://github.com/jline/jline3 
   This is a commit that was missed when 
https://github.com/apache/kafka/pull/11232 was merged. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13228) ApiVersionRequest are not correctly handled in kraft mode

2021-08-25 Thread dengziming (Jira)
dengziming created KAFKA-13228:
--

 Summary: ApiVersionRequest are not correctly handled in kraft mode
 Key: KAFKA-13228
 URL: https://issues.apache.org/jira/browse/KAFKA-13228
 Project: Kafka
  Issue Type: Bug
Reporter: dengziming
Assignee: dengziming
 Fix For: 3.0.1


I'am trying to describe quorum in kraft mode but got 
`org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
not support DESCRIBE_QUORUM`.

This happens because we only concerns `ApiKeys.zkBrokerApis()` when we call 
`NodeApiVersions.create()`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9792) Improve sticky task assignment for previous active tasks

2021-08-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-9792:
-

Assignee: (was: A. Sophie Blee-Goldman)

> Improve sticky task assignment for previous active tasks
> 
>
> Key: KAFKA-9792
> URL: https://issues.apache.org/jira/browse/KAFKA-9792
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
> Attachments: rebalance.txt
>
>
> Due to the way the client quota is computed, we may end up passing over a 
> client that is not yet full. This can cause us to “miss” the client while 
> attempting to assign active tasks to their previous owners, and ultimately 
> forcing an active task _away_ from its previous client.
> It’s not quite a bug, but it’s definitely sub-optimal behavior and can cause 
> unnecessary task shuffling. It also makes the assignment harder to 
> predict/debug
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8767) Optimize StickyAssignor for Cooperative mode

2021-08-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404833#comment-17404833
 ] 

A. Sophie Blee-Goldman commented on KAFKA-8767:
---

Note that since https://issues.apache.org/jira/browse/KAFKA-9987, this problem 
only affects the case in which the consumers in the group have different 
subscriptions. Most consumer groups don't have varying subscriptions, rather 
each member subscribes to the same set of topics/patterns and lets the assignor 
work out the distribution. For those groups with equal subscriptions across all 
members, the assignment is always maximally sticky (while still balanced) and 
therefore it's not possible for it to misplace the revoked partitions and 
result in an extra cooperative rebalance being required in order to assign all 
partitions.

I'll leave this ticket open since it does still affect a small minority of 
consumer groups, but it's probably not worth fixing at this point.

> Optimize StickyAssignor for Cooperative mode
> 
>
> Key: KAFKA-8767
> URL: https://issues.apache.org/jira/browse/KAFKA-8767
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 2.4.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> In some rare cases, the StickyAssignor will fail to balance an assignment 
> without violating stickiness despite a balanced and sticky assignment being 
> possible. The implications of this for cooperative rebalancing are that an 
> unnecessary additional rebalance will be triggered.
> This was seen to happen for example when each consumer is subscribed to some 
> random subset of all topics and all their subscriptions change to a different 
> random subset, as occurs in 
> AbstractStickyAssignorTest#testReassignmentWithRandomSubscriptionsAndChanges.
> The initial assignment after the random subscription change obviously 
> involved migrating partitions, so following the cooperative protocol those 
> partitions are removed from the balanced first assignment, and a second 
> rebalance is triggered. In some cases, during the second rebalance the 
> assignor was unable to reach a balanced assignment without migrating a few 
> partitions, even though one must have been possible (since the first 
> assignment was balanced). A third rebalance was needed to reach a stable 
> balanced state.
> Under the conditions in the previously mentioned test (between 20-40 
> consumers, 10-20 topics (with 0-20 partitions) this third rebalance was 
> required roughly 30% of the time. Some initial improvements to the sticky 
> assignment logic reduced this to under 15%, but we should consider closing 
> this gap and optimizing the cooperative sticky assignment
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8734) Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface

2021-08-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-8734.
---
Fix Version/s: 3.0.0
   Resolution: Fixed

> Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface
> --
>
> Key: KAFKA-8734
> URL: https://issues.apache.org/jira/browse/KAFKA-8734
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In 2.4 we deprecated the consumer.internal.PartitionAssignor interface and 
> migrated all assignors to the [new public consumer.ConsumerPartitionAssignor 
> interface|[https://github.com/apache/kafka/pull/7108|https://github.com/apache/kafka/pull/7108/files]].
>  Although internal, we provided an [adapter 
> |[https://github.com/apache/kafka/pull/7110]]for those who may have 
> implemented a custom PartitionAssignor to avoid breaking changes. These 
> should be removed in the next major release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8734) Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface

2021-08-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-8734:
--
Affects Version/s: (was: 3.0.0)

> Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface
> --
>
> Key: KAFKA-8734
> URL: https://issues.apache.org/jira/browse/KAFKA-8734
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In 2.4 we deprecated the consumer.internal.PartitionAssignor interface and 
> migrated all assignors to the [new public consumer.ConsumerPartitionAssignor 
> interface|[https://github.com/apache/kafka/pull/7108|https://github.com/apache/kafka/pull/7108/files]].
>  Although internal, we provided an [adapter 
> |[https://github.com/apache/kafka/pull/7110]]for those who may have 
> implemented a custom PartitionAssignor to avoid breaking changes. These 
> should be removed in the next major release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-25 Thread GitBox


jasonyanwenl commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r696210814



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,12 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+final Record record = new Record<>("K", 0, 0L);
+final Throwable throwable = assertThrows(NullPointerException.class, 
() -> supplier.get().process(record));
+assertEquals(throwable.getMessage(), String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));

Review comment:
   1. Just curious, why preferred `assertThat` from `hamcrest`? I saw in 
our code base, different places use `junit` / `hamcrest`. Any rule for guiding 
when to use which framework? 
   2. Other testing in this file are using `junit`. So if I only use `hamcrest` 
in this single place, this class will import both `junit` and `hamcrest` and 
mix the using of those two. I think this is a bit ugly LOL. Do you suggest 
updating all to using `hamcrest`?
   
   Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-08-25 Thread GitBox


junrao commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r696210968



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -630,7 +630,7 @@ class KafkaController(val config: KafkaConfig,
 
 // If replica failure did not require leader re-election, inform brokers 
of the offline brokers
 // Note that during leader re-election, brokers update their metadata
-if (partitionsWithOfflineLeader.isEmpty) {
+if (newOfflineReplicas.isEmpty) {

Review comment:
   Good point. Basically, if there is at least one partition that has gone 
through each `partitionStateMachine.triggerOnlinePartitionStateChange()` or ` 
replicaStateMachine.handleStateChanges()`, there is no need to send 
UpdateMetadataRequest since we broadcast metadata to every broker in each call. 
If partitionsWithOfflineLeader is empty but partitionsWithOfflineLeader is not 
empty, there is no need to send UpdateMetadataRequest. So, the check probably 
should be sth like
   
   ` if (newOfflineReplicas.isEmpty || (partitionsWithOfflineLeader.isEmpty && 
partitionsWithOfflineLeader.isEmpty))`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-25 Thread GitBox


jasonyanwenl commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r696210814



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,12 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+final Record record = new Record<>("K", 0, 0L);
+final Throwable throwable = assertThrows(NullPointerException.class, 
() -> supplier.get().process(record));
+assertEquals(throwable.getMessage(), String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));

Review comment:
   Just curious, why preferred `assertThat` from `hamcrest`? I saw in our 
code base, different places use `junit` / `hamcrest`. Any rule for guiding when 
to use which framework? Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonyanwenl edited a comment on pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-25 Thread GitBox


jasonyanwenl edited a comment on pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#issuecomment-905975169


   > Thanks for the PR @jasonyanwenl !
   > 
   > Here my feedback!
   > 
   > I think we should consider to introduce a specific exception for 
exceptions that originate from user code. Such exceptions could be 
distinguished from other exceptions when caught in the [Streams uncaught 
exception 
handler](https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.html)
 and better reacted upon.
   > 
   > Adding such an exception would require a KIP. Are you interested?
   
   Hi @cadonna, thanks for yor feedback. Those are great insights! I'm 
definitely interested in doing this. But before that, have several questions:
   
   1. Will this be the first case where we have a distiguished exception type 
for user code error?
   2. Shall we finish this as a simple beginner issue and create a new ticket 
tracking the design & impl of that? The concern is that this NPE ticket might 
not be the only place where the new exceptioin could bring benefit and we 
probably should analyze code to expand the scope and update exception type all 
at once in a dedicated PR/ticket. Hence I think separating those two is 
suitable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonyanwenl commented on pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-25 Thread GitBox


jasonyanwenl commented on pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#issuecomment-905975169


   > Thanks for the PR @jasonyanwenl !
   > 
   > Here my feedback!
   > 
   > I think we should consider to introduce a specific exception for 
exceptions that originate from user code. Such exceptions could be 
distinguished from other exceptions when caught in the [Streams uncaught 
exception 
handler](https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.html)
 and better reacted upon.
   > 
   > Adding such an exception would require a KIP. Are you interested?
   
   Hi @cadonna, thanks for yor feedback. Those are great insights! I'm 
definitely interested in doing this. But before that, have several questions:
   
   1. Will this be the first case where we have a distiguished exception type 
for user code error?
   2. Shall we finish this as a simple beginner issue and create a new ticket 
tracking the design & impl of that? The concern is that this NPE ticket might 
not be the only place where the new exceptioin could bring benefit and we 
probably should analyze code to expand the scope and update exception type all 
at once in a another PR/ticket. Hence I think separating those two is suitable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9858: KAFKA-12173 Migrate streams:streams-scala module to JUnit 5

2021-08-25 Thread GitBox


showuon commented on a change in pull request #9858:
URL: https://github.com/apache/kafka/pull/9858#discussion_r696201298



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -75,9 +76,19 @@
 @Category(IntegrationTest.class)
 public class AdjustStreamThreadCountTest {
 
-@ClassRule
 public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();

Review comment:
   I'll check it! :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11259: HOTFIX: decrease session timeout in flaky NamedTopologyIntegrationTest

2021-08-25 Thread GitBox


ableegoldman commented on pull request #11259:
URL: https://github.com/apache/kafka/pull/11259#issuecomment-905946923


   @wcarlson5 @vvcephei @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #11259: HOTFIX: decrease session timeout in flaky NamedTopologyIntegrationTest

2021-08-25 Thread GitBox


ableegoldman opened a new pull request #11259:
URL: https://github.com/apache/kafka/pull/11259


   Since the default session timeout was bumped to 45s a number of our 
integration tests have begun failing. Let's reset it back to 10s for these 
NamedTopologyIntegrationTests which have been a bit flaky to help parse out 
whether it's just environmental, or possibly something more...sinister 😈 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9858: KAFKA-12173 Migrate streams:streams-scala module to JUnit 5

2021-08-25 Thread GitBox


ableegoldman commented on a change in pull request #9858:
URL: https://github.com/apache/kafka/pull/9858#discussion_r696181741



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -75,9 +76,19 @@
 @Category(IntegrationTest.class)
 public class AdjustStreamThreadCountTest {
 
-@ClassRule
 public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();

Review comment:
   cc also @showuon -- would you be interested in taking a shot at this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11258: MINOR: add to empty, remove then add different test

2021-08-25 Thread GitBox


wcarlson5 commented on pull request #11258:
URL: https://github.com/apache/kafka/pull/11258#issuecomment-905910457


   @ableegoldman whatever works best for you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

2021-08-25 Thread GitBox


hachikuji commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r696142107



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -2207,15 +2191,27 @@ class ReplicaManager(val config: KafkaConfig,
 InitialFetchState(leaderEndPoint, 
partition.getLeaderEpoch, fetchOffset))
 } else {
   stateChangeLogger.info(
-s"Skipped the become-follower state change after marking 
its partition as " +
+"Skipped the become-follower state change after marking 
its partition as " +
 s"follower for partition $tp with id ${info.topicId} and 
partition state $state."
   )
 }
 }
   }
   changedPartitions.add(partition)
 } catch {
-  case e: Throwable => stateChangeLogger.error(s"Unable to start 
fetching ${tp} " +
+  case e: KafkaStorageException =>
+stateChangeLogger.error(s"Unable to start fetching $tp " +
+  s"with topic ID ${info.topicId} due to a storage error 
${e.getMessage}", e)
+replicaFetcherManager.addFailedPartition(tp)
+// If there is an offline log directory, a Partition object may 
have been created by
+// `getOrCreatePartition()` before `createLogIfNotExists()` failed 
to create local replica due
+// to KafkaStorageException. In this case 
`ReplicaManager.allPartitions` will map this topic-partition
+// to an empty Partition object. We need to map this 
topic-partition to OfflinePartition instead.
+markPartitionOffline(tp)
+
+

Review comment:
   nit: extra newline




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-08-25 Thread GitBox


splett2 commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r696141192



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -630,7 +630,7 @@ class KafkaController(val config: KafkaConfig,
 
 // If replica failure did not require leader re-election, inform brokers 
of the offline brokers
 // Note that during leader re-election, brokers update their metadata
-if (partitionsWithOfflineLeader.isEmpty) {
+if (newOfflineReplicas.isEmpty) {

Review comment:
   I don't follow why the check for `partitionsWithOfflineLeader` is 
necessary.
   
   If the broker doesn't host any replicas, then we need to send out 
`UpdateMetadataRequest` because the partition state machines will not trigger 
any control requests. This is handled by the check for 
`newOfflineReplicas.isEmpty`
   
   If the broker does host replicas, then shouldnt 
`replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, 
OfflineReplica)`
   handle sending out UpdateMetadataRequest to propagate the offline broker? 
Maybe we would also need a check if `newOfflineReplicasNotForDeletion` is empty 
as well, but `partitionsWithOfflineLeader.isEmpty` seems redundant and fairly 
coarse.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-08-25 Thread GitBox


splett2 commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r696141192



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -630,7 +630,7 @@ class KafkaController(val config: KafkaConfig,
 
 // If replica failure did not require leader re-election, inform brokers 
of the offline brokers
 // Note that during leader re-election, brokers update their metadata
-if (partitionsWithOfflineLeader.isEmpty) {
+if (newOfflineReplicas.isEmpty) {

Review comment:
   I don't follow why the check for `partitionsWithOfflineLeader` is 
necessary.
   
   If the broker doesn't host any replicas, then we need to send out 
`UpdateMetadataRequest` because the partition state machines will not trigger 
any control requests. This is handled by the check for 
`newOfflineReplicas.isEmpty`
   
   If the broker does host replicas, then shouldnt 
`replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, 
OfflineReplica)`
   handle sending out UpdateMetadataRequest to propagate the offline broker? 
`partitionsWithOfflineLeader.isEmpty` seems redundant.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-08-25 Thread GitBox


splett2 commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r696141192



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -630,7 +630,7 @@ class KafkaController(val config: KafkaConfig,
 
 // If replica failure did not require leader re-election, inform brokers 
of the offline brokers
 // Note that during leader re-election, brokers update their metadata
-if (partitionsWithOfflineLeader.isEmpty) {
+if (newOfflineReplicas.isEmpty) {

Review comment:
   I'm still don't follow why the check for `partitionsWithOfflineLeader` 
is necessary.
   
   If the broker doesn't host any replicas, then we need to send out 
`UpdateMetadataRequest` because the partition state machines will not trigger 
any control requests.
   
   If the broker does host replicas, then shouldnt 
`replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, 
OfflineReplica)`
   handle sending out UpdateMetadataRequest to propagate the offline broker? 
`partitionsWithOfflineLeader.isEmpty` seems redundant.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11258: MINOR: add to empty, remove then add different test

2021-08-25 Thread GitBox


ableegoldman commented on pull request #11258:
URL: https://github.com/apache/kafka/pull/11258#issuecomment-905900986


   Sounds like this is waiting on the blocking functionality coming in Pt. 4 -- 
thanks for the test, do you want to just leave this PR until Pt. 4 is merged 
and then make sure it passes? Alternatively I can copy this over to the Pt. 4 
PR and merge it with that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-08-25 Thread GitBox


hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r696118985



##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -1622,19 +1623,37 @@ object TestUtils extends Logging {
 waitForLeaderToBecome(client, topicPartition, None)
   }
 
-  def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, 
leader: Option[Int]): Unit = {
+  def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+waitUntilTrue(() => {
+  val nodes = client.describeCluster().nodes().get()
+  nodes.asScala.exists(_.id == brokerId)
+}, s"Timed out waiting for brokerId $brokerId to come online")
+  }
+
+  def waitForLeaderToBecome(
+client: Admin,
+topicPartition: TopicPartition,
+expectedLeaderOpt: Option[Int]
+  ): Unit = {
 val topic = topicPartition.topic
-val partition = topicPartition.partition
+val partitionId = topicPartition.partition
+
+def currentLeader: Try[Option[Int]] = Try {
+  val topicDescription = 
client.describeTopics(List(topic).asJava).all.get.get(topic)
+  topicDescription.partitions.asScala
+.find(_.partition == partitionId)
+.flatMap(partitionState => Option(partitionState.leader))

Review comment:
   Yes:
   ```
   /**
* Return the leader of the partition or null if there is none.
*/
   public Node leader() {
   return leader;
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-08-25 Thread GitBox


hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r696118985



##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -1622,19 +1623,37 @@ object TestUtils extends Logging {
 waitForLeaderToBecome(client, topicPartition, None)
   }
 
-  def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, 
leader: Option[Int]): Unit = {
+  def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+waitUntilTrue(() => {
+  val nodes = client.describeCluster().nodes().get()
+  nodes.asScala.exists(_.id == brokerId)
+}, s"Timed out waiting for brokerId $brokerId to come online")
+  }
+
+  def waitForLeaderToBecome(
+client: Admin,
+topicPartition: TopicPartition,
+expectedLeaderOpt: Option[Int]
+  ): Unit = {
 val topic = topicPartition.topic
-val partition = topicPartition.partition
+val partitionId = topicPartition.partition
+
+def currentLeader: Try[Option[Int]] = Try {
+  val topicDescription = 
client.describeTopics(List(topic).asJava).all.get.get(topic)
+  topicDescription.partitions.asScala
+.find(_.partition == partitionId)
+.flatMap(partitionState => Option(partitionState.leader))

Review comment:
   Yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-08-25 Thread GitBox


hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r696117735



##
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##
@@ -189,17 +197,39 @@ public void start() {
 @Override
 public void stop() {
 if (stopped.compareAndSet(false, true)) {
-try {
-clusterReference.get().close();
-} catch (Exception e) {
-throw new RuntimeException("Failed to stop Raft server", 
e);
-}
+
+Utils.closeQuietly(clusterReference.get(), "cluster");
+admins.forEach(admin -> Utils.closeQuietly(admin, "admin"));

Review comment:
   Makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-08-25 Thread GitBox


hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r696116190



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -90,8 +90,7 @@ class BrokerServer(
 
   this.logIdent = logContext.logPrefix
 
-  val lifecycleManager: BrokerLifecycleManager =
-new BrokerLifecycleManager(config, time, threadNamePrefix)
+  private var lifecycleManager: BrokerLifecycleManager = null

Review comment:
   This was a workaround to allow `BrokerServer` to be restartable in the 
same way that `KafkaServer` is. It would be better to let the test kit 
construct a new instance, but I decided to save that for a follow-up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-08-25 Thread GitBox


hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r696115948



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
##
@@ -103,20 +103,22 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 ApiError apiError = ApiError.fromThrowable(e);
 List electionResults = new ArrayList<>();
 
-for (TopicPartitions topic : data.topicPartitions()) {
-ReplicaElectionResult electionResult = new ReplicaElectionResult();
+if (data.topicPartitions() != null) {

Review comment:
   We use null to indicate election for all partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-08-25 Thread GitBox


junrao commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r696035745



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -630,7 +630,7 @@ class KafkaController(val config: KafkaConfig,
 
 // If replica failure did not require leader re-election, inform brokers 
of the offline brokers
 // Note that during leader re-election, brokers update their metadata
-if (partitionsWithOfflineLeader.isEmpty) {
+if (newOfflineReplicas.isEmpty) {

Review comment:
   Good catch. There are two cases where we need to send 
UpdateMetadataRequest.
   
   1. If the broker doesn't host any replicas. This is the new case that you 
identified since we won't send any UpdateMetadataRequest in 
PartitionStateMachine and we want to propagate the offline broker to other 
brokers.
   2. If partitionsWithOfflineLeader is not empty, 
partitionStateMachine.triggerOnlinePartitionStateChange() would send 
UpdateMetadataRequest as part of the transitioning of those partitions to 
online. So, we don't need to send UpdateMetadataRequest again. If 
partitionsWithOfflineLeader is empty, we do want to send UpdateMetadataRequest 
to propagate the offline broker.
   
   So, it seem that the check should be if (newOfflineReplicas.isEmpty || 
partitionsWithOfflineLeader.isEmpty). It would be useful to adjust the comment 
above accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13227) Cancel pending AlterIsr requests after receiving LeaderAndIsr

2021-08-25 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13227:
---

 Summary: Cancel pending AlterIsr requests after receiving 
LeaderAndIsr
 Key: KAFKA-13227
 URL: https://issues.apache.org/jira/browse/KAFKA-13227
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Currently we do not cancel pending AlterIsr requests after the state has been 
updated through a LeaderAndIsr request received from the controller. This leads 
to log messages such as this 
{code}
[2021-08-23 18:12:47,317] WARN [Partition __transaction_state-32 broker=3] 
Failed to enqueue ISR change state LeaderAndIsr(leader=3, leaderEpoch=3, 
isUncleanLeader=false, isr=List(3, 1), zkVersion=3) for partition 
__transaction_state-32 (kafka.cluster.Partition)
{code}
I think the only complication here is protecting against the AlterIsr callback 
which is executed asynchronously. To address this, we can move the `zkVersion` 
field into `IsrState`. When the callback is invoked, we can the existing state 
against the response state to decide whether to apply the change. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lkokhreidze edited a comment on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze edited a comment on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-905514160


   The general thought on the implementation. 
   As of now, we choose concrete `StandbyTaskAssignor` implementation based on 
passed `AssignmentConfigs` value.
   Instead, an alternative approach would be to have a chained standby task 
assignment based on multiple implementations. For instance, when required 
client tag configuration is present, we can try to assign the standby tasks 
based on `List.of(new ClientTagAwareStandbyTaskAssignor(), new 
LeastLoadedClientStandbyTaskAssignor())`. This way, 
`ClientTagAwareStandbyTaskAssignor` can try to distribute the standby tasks 
based on tags dimensions. If there are no more client tag dimensions available, 
and we still have `any(tasksToRemainingStandbys) > 0`, the next implementation 
in the chain (in this case `LeastLoadedClientStandbyTaskAssignor`) will be 
called. With this, `LeastLoadedClientStandbyTaskAssignor` can default to 
assigning the remaining standbys to the least loaded clients. Right now, 
`ClientTagAwareStandbyTaskAssignor` does both assignments based on available 
dimensions and fallback to the least loaded if there are no enough tag 
dimensions. Current implementation leads to more comp
 lex code. While with the approach above, we can clearly separate the 
implementations without duplication (there's no code duplication, rather 
"logic" duplication).
   For now, I've chosen to go with the simplest approach - having two 
independent implementations and selecting an appropriate one based on passed 
`AssignmentConfigs.` Still, I wanted to share this idea here, just in case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #11258: MINOR: add to empty, remove then add different test

2021-08-25 Thread GitBox


wcarlson5 opened a new pull request #11258:
URL: https://github.com/apache/kafka/pull/11258


   add a test for named topology
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

2021-08-25 Thread GitBox


jolshan commented on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-905758403


   Hmmm. Not sure what to make of this exit code 1:
   ```
   * What went wrong:
   
   Execution failed for task ':storage:unitTest'.
   
> Process 'Gradle Test Executor 133' finished with non-zero exit value 1
```

The other two failures look unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join

2021-08-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404607#comment-17404607
 ] 

Matthias J. Sax commented on KAFKA-10847:
-

Updated fixed version to 3.1.0 because of 
https://issues.apache.org/jira/browse/KAFKA-13216 

> Avoid spurious left/outer join results in stream-stream join 
> -
>
> Key: KAFKA-10847
> URL: https://issues.apache.org/jira/browse/KAFKA-10847
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sergio Peña
>Priority: Major
> Fix For: 3.1.0
>
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join

2021-08-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10847:

Fix Version/s: (was: 3.0.0)
   3.1.0

> Avoid spurious left/outer join results in stream-stream join 
> -
>
> Key: KAFKA-10847
> URL: https://issues.apache.org/jira/browse/KAFKA-10847
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sergio Peña
>Priority: Major
> Fix For: 3.1.0
>
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13115) doSend can be blocking

2021-08-25 Thread Ivan Vaskevych (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Vaskevych updated KAFKA-13115:
---
Labels: documentation pull-request-available  (was: pull-request-available)

> doSend can be blocking
> --
>
> Key: KAFKA-13115
> URL: https://issues.apache.org/jira/browse/KAFKA-13115
> Project: Kafka
>  Issue Type: Bug
>  Components: docs
>Reporter: Ivan Vaskevych
>Priority: Major
>  Labels: documentation, pull-request-available
>
> https://github.com/apache/kafka/pull/11023



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13115) doSend can be blocking

2021-08-25 Thread Ivan Vaskevych (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Vaskevych updated KAFKA-13115:
---
Component/s: docs

> doSend can be blocking
> --
>
> Key: KAFKA-13115
> URL: https://issues.apache.org/jira/browse/KAFKA-13115
> Project: Kafka
>  Issue Type: Bug
>  Components: docs
>Reporter: Ivan Vaskevych
>Priority: Major
>
> https://github.com/apache/kafka/pull/11023



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13115) doSend can be blocking

2021-08-25 Thread Ivan Vaskevych (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Vaskevych updated KAFKA-13115:
---
Labels: pull-request-available  (was: )

> doSend can be blocking
> --
>
> Key: KAFKA-13115
> URL: https://issues.apache.org/jira/browse/KAFKA-13115
> Project: Kafka
>  Issue Type: Bug
>  Components: docs
>Reporter: Ivan Vaskevych
>Priority: Major
>  Labels: pull-request-available
>
> https://github.com/apache/kafka/pull/11023



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

2021-08-25 Thread GitBox


dajac commented on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-905684062


   We can merge once we get clean builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

2021-08-25 Thread GitBox


jolshan edited a comment on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-905656665


   Looks like it was a really subtle bug. Consider the case where we have a 
partition in the session without a topic ID. We then add a new partition from 
that topic with an ID, but remove the partition that didn't have an ID.
   
   Here, we want to use IDs. When the receiving broker sees the topic IDs and 
currently has a session without them open, it will close that session so we can 
switch over to fully using topic IDs. Otherwise, we may get stuck not using 
topic IDs a little longer than we want to.
   
   I've added a check to see if the topic ID can be found in the current 
builder. Let me know if this makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

2021-08-25 Thread GitBox


jolshan commented on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-905656665


   Looks like it was a really subtle bug. Consider the case where we have a 
partition in the session without a topic ID. We then add a new partition from 
that topic with an ID, but remove the partition that didn't have an ID.
   
   Here, we want to use IDs. When the receiving broker sees the topic IDs and 
currently has a session without them open, it will close that session so we can 
switch over to fully using topic IDs.
   
   Let me know if this makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze edited a comment on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze edited a comment on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-905514160


   The general thought on the implementation. 
   As of now, we choose concrete `StandbyTaskAssignor` implementation based on 
passed `AssignmentConfigs` value.
   Instead, an alternative approach would be to have a chained standby task 
assignment based on multiple implementations. For instance, when required 
client tag configuration is present, we can try to assign the standby tasks 
based on `List.of(new ClientTagAwareStandbyTaskAssignor(), new 
LeastLoadedClientStandbyTaskAssignor())`. This way, 
`ClientTagAwareStandbyTaskAssignor` can try to distribute the standby tasks 
based on tags dimensions. If there are no more client tag dimensions available, 
and we still have `any(tasksToRemainingStandbys) > 0`, the next implementation 
in the chain (in this case `LeastLoadedClientStandbyTaskAssignor`) will be 
called. With this, `LeastLoadedClientStandbyTaskAssignor` can default to 
assigning the remaining standbys to the least loaded clients. The benefit of 
this approach is that, right now, `ClientTagAwareStandbyTaskAssignor` does both 
assignments based on available dimensions and fallback to the least loaded if 
there are no enough tag dimensions. Cur
 rent implementation leads to more complex code. While with the approach above, 
we can clearly separate the implementations without duplication (there's no 
code duplication, rather "logic" duplication).
   For now, I've chosen to go with the simplest approach - having two 
independent implementations and selecting an appropriate one based on passed 
`AssignmentConfigs.` Still, I wanted to share this idea here, just in case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze edited a comment on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze edited a comment on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-905514160


   The general thought on the implementation. 
   As of now, we choose concrete `StandbyTaskAssignor` implementation based on 
passed `AssignmentConfigs` value.
   Instead, an alternative approach would be to have a chained standby task 
assignment based on multiple implementations. For instance, when have required 
client tag configuration is present, we can try to assign the standby tasks 
based on `List.of(new ClientTagAwareStandbyTaskAssignor(), new 
LeastLoadedClientStandbyTaskAssignor())`. This way, 
`ClientTagAwareStandbyTaskAssignor` can try to distribute the standby tasks 
based on tags dimensions. If there are no more client tag dimensions available, 
and we still have `any(tasksToRemainingStandbys) > 0`, the next implementation 
in the chain (in this case `LeastLoadedClientStandbyTaskAssignor`) will be 
called. With this, `LeastLoadedClientStandbyTaskAssignor` can default to 
assigning the remaining standbys to the least loaded clients. The benefit of 
this approach is that, right now, `ClientTagAwareStandbyTaskAssignor` does both 
assignments based on available dimensions and fallback to the least loaded if 
there are no enough tag dimensions
 . Current implementation leads to more complex code. While with the approach 
above, we can clearly separate the implementations without duplication (there's 
no code duplication, rather "logic" duplication).
   For now, I've chosen to go with the simplest approach - having two 
independent implementations and selecting an appropriate one based on passed 
`AssignmentConfigs.` Still, I wanted to share this idea here, just in case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values

2021-08-25 Thread GitBox


rondagostino commented on pull request #11200:
URL: https://github.com/apache/kafka/pull/11200#issuecomment-905514929


   This PR is likely going to be replaced by 
https://github.com/apache/kafka/pull/11256.  Will probably eventually close 
this assuming that other PR gets merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze edited a comment on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze edited a comment on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-905514160


   The general thought on the implementation. 
   As of now, we choose concrete `StandbyTaskAssignor` implementation based on 
passed `AssignmentConfigs` value.
   Instead, an alternative approach would be to have a chained standby task 
assignment based on multiple implementations. For instance, when have required 
client tag configuration is present, we can try to assign the standby tasks 
based on `List.of(new ClientTagAwareStandbyTaskAssignor(), new 
LeastLoadedClientStandbyTaskAssignor())`. This way, 
`ClientTagAwareStandbyTaskAssignor` can try to distribute the standby tasks 
based on tags dimensions. If there are no more client tag dimensions available, 
and we still have `any(tasksToRemainingStandbys) > 0`, the next implementation 
in the chain (in this case `DefaultStandbyTaskAssignor`) will be called. With 
this, `DefaultStandbyTaskAssignor` can default to assigning the remaining 
standbys to the least loaded clients. The benefit of this approach is that, 
right now, `ClientTagAwareStandbyTaskAssignor` does both assignments based on 
available dimensions and fallback to the least loaded if there are no enough 
tag dimensions. Current implementa
 tion leads to more complex code. While with the approach above, we can clearly 
separate the implementations without duplication (there's no code duplication, 
rather "logic" duplication).
   For now, I've chosen to go with the simplest approach - having two 
independent implementations and selecting an appropriate one based on passed 
`AssignmentConfigs.` Still, I wanted to share this idea here, just in case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze commented on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-905514160


   The general thought on the implementation. 
   As of now, we choose concrete `StandbyTaskAssignor` implementation based on 
passed `AssignmentConfigs` value.
   Instead, an alternative approach would be to have a chained standby task 
assignment based on multiple implementations. For instance, when have required 
client tag configuration is present, we can try to assign the standby tasks 
based on `List.of(new ClientTagAwareStandbyTaskAssignor(), new 
DefaultStandbyTaskAssignor())`. This way, `ClientTagAwareStandbyTaskAssignor` 
can try to distribute the standby tasks based on tags dimensions. If there are 
no more client tag dimensions available, and we still have 
`any(tasksToRemainingStandbys) > 0`, the next implementation in the chain (in 
this case `DefaultStandbyTaskAssignor`) will be called. With this, 
`DefaultStandbyTaskAssignor` can default to assigning the remaining standbys to 
the least loaded clients. The benefit of this approach is that, right now, 
`ClientTagAwareStandbyTaskAssignor` does both assignments based on available 
dimensions and fallback to the least loaded if there are no enough tag 
dimensions. Current implementation leads
  to more complex code. While with the approach above, we can clearly separate 
the implementations without duplication (there's no code duplication, rather 
"logic" duplication).
   For now, I've chosen to go with the simplest approach - having two 
independent implementations and selecting an appropriate one based on passed 
`AssignmentConfigs.` Still, I wanted to share this idea here, just in case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

2021-08-25 Thread GitBox


dajac commented on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-905509721


   @jolshan Some tests failed and it seems related to the changes done in this 
PR. Could you check them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-08-25 Thread GitBox


showuon commented on a change in pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#discussion_r695758175



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
##
@@ -297,6 +297,186 @@ public void shouldPutFetchRangeFromCache() {
 }
 }
 
+@Test
+public void shouldPutFetchRangeFromCacheForNullKeyFrom() {
+cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+try (final KeyValueIterator, byte[]> iterator =
+ cachingStore.fetch(null, bytesKey("d"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("a"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+"a");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+"b");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+"c");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+"d");
+assertFalse(iterator.hasNext());
+}
+}
+
+@Test
+public void shouldPutFetchRangeFromCacheForNullKeyTo() {
+cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+try (final KeyValueIterator, byte[]> iterator =
+ cachingStore.fetch(bytesKey("b"), null, 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+"b");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+"c");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+"d");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+"e");
+assertFalse(iterator.hasNext());
+}
+}
+
+@Test
+public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() {
+cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+try (final KeyValueIterator, byte[]> iterator =
+ cachingStore.fetch(null, null, 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("a"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+"a");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+"b");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+"c");
+verifyWindowedKeyValue(
+iterator.next(),
+new 

[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-08-25 Thread GitBox


showuon commented on a change in pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#discussion_r695757591



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
##
@@ -74,7 +74,7 @@ public boolean hasNext() {
 close();
 currentSegment = segments.next();
 try {
-if (from == null || to == null) {
+if (from == null && to == null) {
 if (forward) {
 currentIterator = currentSegment.all();

Review comment:
   Good suggestion. Updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-08-25 Thread GitBox


showuon commented on a change in pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#discussion_r695757204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -124,8 +124,8 @@
 
 final List searchSpace = keySchema.segmentsToSearch(segments, from, 
to, forward);
 
-final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from);
-final Bytes binaryTo = keySchema.upperRange(keyTo, to);
+final Bytes binaryFrom = keyFrom == null ? null : 
keySchema.lowerRange(keyFrom, from);
+final Bytes binaryTo = keyTo == null ? null : 
keySchema.upperRange(keyTo, to);

Review comment:
   That's a good question. I'll check the test coverage later. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze commented on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-905490438


   Thanks for the feedback @cadonna , I've pushed the new changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r695735854



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(numStandbyReplicas,
+   
   statefulTasksWithClients);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(numStandbyReplicas,
+   
taskId,
+   
clientId,
+   
rackAwareAssignmentTags,
+   
clients,
+   
tasksToRemainingStandbys));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static Map computeTasksToRemainingStandbys(final 
int numStandbyReplicas, final Map statefulTasksWithClients) {
+return 
statefulTasksWithClients.keySet().stream().collect(toMap(Function.identity(), t 
-> numStandbyReplicas));
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> 
clientsPerTagValue,
+

[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r695736026



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(numStandbyReplicas,
+   
   statefulTasksWithClients);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(numStandbyReplicas,
+   
taskId,
+   
clientId,
+   
rackAwareAssignmentTags,
+   
clients,
+   
tasksToRemainingStandbys));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static Map computeTasksToRemainingStandbys(final 
int numStandbyReplicas, final Map statefulTasksWithClients) {
+return 
statefulTasksWithClients.keySet().stream().collect(toMap(Function.identity(), t 
-> numStandbyReplicas));
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> 
clientsPerTagValue,
+

[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r695735723



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(numStandbyReplicas,
+   
   statefulTasksWithClients);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(numStandbyReplicas,
+   
taskId,
+   
clientId,
+   
rackAwareAssignmentTags,
+   
clients,
+   
tasksToRemainingStandbys));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static Map computeTasksToRemainingStandbys(final 
int numStandbyReplicas, final Map statefulTasksWithClients) {
+return 
statefulTasksWithClients.keySet().stream().collect(toMap(Function.identity(), t 
-> numStandbyReplicas));
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> 
clientsPerTagValue,
+

[GitHub] [kafka] patrickstuedi commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-08-25 Thread GitBox


patrickstuedi commented on a change in pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#discussion_r695731353



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##
@@ -931,16 +979,6 @@ public void shouldThrowNullPointerExceptionOnGetNullKey() {
 assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 
ofEpochMilli(1L), ofEpochMilli(2L)));
 }
 
-@Test
-public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
-assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 
2, ofEpochMilli(1L), ofEpochMilli(2L)));
-}
-
-@Test
-public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
-assertThrows(NullPointerException.class, () -> windowStore.fetch(1, 
null, ofEpochMilli(1L), ofEpochMilli(2L)));
-}
-

Review comment:
   Ok, no, if you have them covered up there that's fine. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r695717497



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(numStandbyReplicas,
+   
   statefulTasksWithClients);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(numStandbyReplicas,
+   
taskId,
+   
clientId,
+   
rackAwareAssignmentTags,
+   
clients,
+   
tasksToRemainingStandbys));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static Map computeTasksToRemainingStandbys(final 
int numStandbyReplicas, final Map statefulTasksWithClients) {
+return 
statefulTasksWithClients.keySet().stream().collect(toMap(Function.identity(), t 
-> numStandbyReplicas));
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> 
clientsPerTagValue,
+

[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-08-25 Thread GitBox


showuon commented on a change in pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#discussion_r695715007



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##
@@ -931,16 +979,6 @@ public void shouldThrowNullPointerExceptionOnGetNullKey() {
 assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 
ofEpochMilli(1L), ofEpochMilli(2L)));
 }
 
-@Test
-public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
-assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 
2, ofEpochMilli(1L), ofEpochMilli(2L)));
-}
-
-@Test
-public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
-assertThrows(NullPointerException.class, () -> windowStore.fetch(1, 
null, ofEpochMilli(1L), ofEpochMilli(2L)));
-}
-

Review comment:
   Yes, but I've already tested these 2 test cases above (i.e. 
`testFetchRange` and `testBackwardFetchRange`). I don't think we should test 
them again here. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


cadonna commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r695669488



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(numStandbyReplicas,
+   
   statefulTasksWithClients);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(numStandbyReplicas,
+   
taskId,
+   
clientId,
+   
rackAwareAssignmentTags,
+   
clients,
+   
tasksToRemainingStandbys));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static Map computeTasksToRemainingStandbys(final 
int numStandbyReplicas, final Map statefulTasksWithClients) {
+return 
statefulTasksWithClients.keySet().stream().collect(toMap(Function.identity(), t 
-> numStandbyReplicas));
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> 
clientsPerTagValue,
+

[GitHub] [kafka] cadonna commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


cadonna commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r695633134



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(numStandbyReplicas,
+   
   statefulTasksWithClients);

Review comment:
   ```suggestion
   final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
   numStandbyReplicas,
   statefulTasksWithClients
   );
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+
+import 

[GitHub] [kafka] patrickstuedi commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-08-25 Thread GitBox


patrickstuedi commented on a change in pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#discussion_r695631238



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
##
@@ -74,7 +74,7 @@ public boolean hasNext() {
 close();
 currentSegment = segments.next();
 try {
-if (from == null || to == null) {
+if (from == null && to == null) {
 if (forward) {
 currentIterator = currentSegment.all();

Review comment:
   Isn't it sufficient to distinguish the forward and reverse cases and 
just call range(from, to) or reverseRange(from, to)?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -124,8 +124,8 @@
 
 final List searchSpace = keySchema.segmentsToSearch(segments, from, 
to, forward);
 
-final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from);
-final Bytes binaryTo = keySchema.upperRange(keyTo, to);
+final Bytes binaryFrom = keyFrom == null ? null : 
keySchema.lowerRange(keyFrom, from);
+final Bytes binaryTo = keyTo == null ? null : 
keySchema.upperRange(keyTo, to);

Review comment:
   Do you have tests for this store? Generally can you make sure that your 
tests cover all the stores you added infinite range support.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-25 Thread GitBox


cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r695567992



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
+@Test
+public void testMeasureCommitSyncDuration() {
+// use a consumer that will throw to ensure we return quickly

Review comment:
   I think we also need to test the case without failure. Otherwise, we 
assume in the test that the measurement is in the `finally` clause which we 
should not assume but rather ensure with unit tests. Same applies to the test 
below.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.junit.jupiter.api.Test;

Review comment:
   ```suggestion
   import org.apache.kafka.common.metrics.Metrics;
   
   import org.junit.jupiter.api.Test;
   
   import static org.junit.jupiter.api.Assertions.assertEquals;
   ```

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() {
 }
 }
 
+private double getAndAssertDuration(KafkaProducer producer, String 
name, double floor) {
+double value = getMetricValue(producer, name);
+assertTrue(value > floor);
+return value;
+}
+
+@Test
+public void testMeasureTransactionDurations() {
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some.id", host1));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+
+try (KafkaProducer producer = kafkaProducer(configs, 
new StringSerializer(),
+new StringSerializer(), metadata, client, null, time)) {
+producer.initTransactions();
+assertTrue(getMetricValue(producer, "txn-init-time-total") > 
99);

Review comment:
   I now saw that in the consumer tests you use 
`Duration.ofSeconds(1).toMillis()` and `Duration.ofMillis(999).toNanos()`. This 
makes it already clearer. I think a variable with a meaningful name for the 
lower bound would make it even clearer. 

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
+@Test
+public void testMeasureCommitSyncDuration() {
+// use a consumer that will throw to ensure we return quickly
+Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+ConsumerMetadata metadata = createMetadata(subscription);
+MockClient client = new MockClient(time, metadata);
+initMetadata(client, singletonMap(topic, 1));
+Node node = metadata.fetch().nodes().get(0);
+ConsumerPartitionAssignor assignor = new RangeAssignor();
+client.createP

[GitHub] [kafka] patrickstuedi commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-08-25 Thread GitBox


patrickstuedi commented on a change in pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#discussion_r695535522



##
File path: checkstyle/suppressions.xml
##
@@ -160,6 +160,9 @@
 
 
+

Review comment:
   I guess this is because of InMemoryWindowStore::isKeyWithinRange? Can we 
make that method more readable and at the same time avoid having to do this?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
##
@@ -128,12 +128,13 @@
  * This iterator must be closed after use.
  *
  * @param keyFrom the first key in the range
+ * A null value indicates a starting position from the first element in 
the store.

Review comment:
   Can you make those extra lines have the same indentation than the 
previous line so it can easily be seen that they belong together?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##
@@ -931,16 +979,6 @@ public void shouldThrowNullPointerExceptionOnGetNullKey() {
 assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 
ofEpochMilli(1L), ofEpochMilli(2L)));
 }
 
-@Test
-public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
-assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 
2, ofEpochMilli(1L), ofEpochMilli(2L)));
-}
-
-@Test
-public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
-assertThrows(NullPointerException.class, () -> windowStore.fetch(1, 
null, ofEpochMilli(1L), ofEpochMilli(2L)));
-}
-

Review comment:
   Instead of deleting maybe you want to change the name and check that the 
the store returns the right values.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##
@@ -462,14 +460,21 @@ public boolean hasNext() {
 }
 
 final Bytes key = getKey(next.key);
-if (key.compareTo(getKey(keyFrom)) >= 0 && 
key.compareTo(getKey(keyTo)) <= 0) {
+if (isKeyWithinRange(key)) {
 return true;
 } else {
 next = null;
 return hasNext();
 }
 }
 
+private boolean isKeyWithinRange(final Bytes key) {

Review comment:
   Per comment above, can you make this method more readable by splitting 
the statements?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
##
@@ -297,6 +297,186 @@ public void shouldPutFetchRangeFromCache() {
 }
 }
 
+@Test
+public void shouldPutFetchRangeFromCacheForNullKeyFrom() {
+cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+try (final KeyValueIterator, byte[]> iterator =
+ cachingStore.fetch(null, bytesKey("d"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("a"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+"a");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+"b");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+"c");
+verifyWindowedKeyValue(
+iterator.next(),
+new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+"d");
+assertFalse(iterator.hasNext());
+}
+}
+
+@Test
+public void shouldPutFetchRangeFromCacheForNullKeyTo() {
+cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+try (final KeyValueIterator, byte[]> iterator =
+ cachingStore.fetch(bytesKey("b"), null, 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {

[jira] [Created] (KAFKA-13226) Partition expansion may cause uneven distribution

2021-08-25 Thread shizhenzhen (Jira)
shizhenzhen created KAFKA-13226:
---

 Summary: Partition expansion may cause uneven distribution
 Key: KAFKA-13226
 URL: https://issues.apache.org/jira/browse/KAFKA-13226
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 2.6.2, 2.7.1, 2.8.0, 2.5.0
 Environment: mac  
kafka-2.5.0


Reporter: shizhenzhen


 

 {color:#ff}*Partition expansion may cause uneven distribution*{color}

 

1. Create a Topic  , 3-partition   1-replical

!https://img-blog.csdnimg.cn/561112064b114acfb03882aa09100e0e.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_Q1NETiBA55-z6Ie76Ie755qE5p2C6LSn6ZO6,size_55,color_FF,t_70,g_se,x_16!

 

2. partition expansion to 5 - partiton

!https://img-blog.csdnimg.cn/f7c3c33b6662457080d9bb5bb190c0c2.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_Q1NETiBA55-z6Ie76Ie755qE5p2C6LSn6ZO6,size_49,color_FF,t_70,g_se,x_16!

 

3. Does this meet expectations ?

 

!https://img-blog.csdnimg.cn/20cc1007c4214c4ebfcb1b2c2eeb98e4.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_Q1NETiBA55-z6Ie76Ie755qE5p2C6LSn6ZO6,size_18,color_FF,t_70,g_se,x_16!

 

{color:#ff}*so this is a bug ?*{color}

 

The problem may arise here ; 

When we create a new topic .  get the broker list is a Object Map ; 

*This is disordered*

you can read the code , first it have sortBy brokerId, but finally it convert 
to a *disorde Map;*

 

 

!https://img-blog.csdnimg.cn/131b9bf0c19e4753a73512af4c9c5854.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_Q1NETiBA55-z6Ie76Ie755qE5p2C6LSn6ZO6,size_66,color_FF,t_70,g_se,x_16!

 

 

 

The important thing is that it has been sorted when expanding the partition and 
parition-reassignment ;   

{color:#ff}*So why not sort when creating topics?*{color}

 

If the topic is sorted when  create a new topic , this problem will not occur ;

 

so it maybe is a tiny bug ?

 

 

if you can read Chinese ,

You can look at this article. I describe it in detail

 

We look forward to receiving your reply

 

如果你能看懂中文,可以看看这篇文章 我描述的很详细! 

 [This may be a Kafka 
bug?|[https://shirenchuang.blog.csdn.net/article/details/119912418]]

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-25 Thread GitBox


cadonna commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r695498527



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
##
@@ -41,6 +43,7 @@
 public void process(final Record record) {
 final Iterable> 
newKeyValues =
 mapper.apply(record.key(), record.value());
+Objects.requireNonNull(newKeyValues, String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));

Review comment:
   Just a suggestion:
   ```suggestion
   Objects.requireNonNull(newKeyValues, "The provided 
KeyValueMapper returned null which is not allowed.");
   ```
   BTW, we should not output records since they might contain sensitive data.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
##
@@ -42,6 +44,7 @@ public KStreamMap(final KeyValueMapper record) {
 final KeyValue newPair =
 mapper.apply(record.key(), record.value());
+Objects.requireNonNull(newPair, String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));

Review comment:
   Just a suggestion:
   ```suggestion
   Objects.requireNonNull(newPair, "The provided KeyValueMapper 
returned null which is not allowed.");
   ```
   
   BTW, we should not output records since they might contain sensitive data.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
##
@@ -68,6 +70,14 @@ public void testMap() {
 }
 }
 
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamMap supplier = new 
KStreamMap<>((key, value) -> null);
+final Record record = new Record<>("K", 0, 0L);
+final Throwable throwable = assertThrows(NullPointerException.class, 
() -> supplier.get().process(record));
+assertEquals(throwable.getMessage(), String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));

Review comment:
   See my comment above.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,12 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+final Record record = new Record<>("K", 0, 0L);
+final Throwable throwable = assertThrows(NullPointerException.class, 
() -> supplier.get().process(record));
+assertEquals(throwable.getMessage(), String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));

Review comment:
   We prefer to use `assertThat()`:
   
   ```suggestion
   assertThat(throwable.getMessage(), is(...);
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-5666) Need feedback to user if consumption fails due to offsets.topic.replication.factor=3

2021-08-25 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404267#comment-17404267
 ] 

Rens Groothuijsen commented on KAFKA-5666:
--

[~ll1124278064] No, I'm no longer working on this one, so go right ahead.

> Need feedback to user if consumption fails due to 
> offsets.topic.replication.factor=3
> 
>
> Key: KAFKA-5666
> URL: https://issues.apache.org/jira/browse/KAFKA-5666
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 0.11.0.0
>Reporter: Yeva Byzek
>Priority: Major
>  Labels: newbie, usability
>
> Introduced in 0.11: The offsets.topic.replication.factor broker config is now 
> enforced upon auto topic creation. Internal auto topic creation will fail 
> with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets 
> this replication factor requirement.
> Issue: Default is setting offsets.topic.replication.factor=3, but in 
> development and docker environments where there is only 1 broker, the offsets 
> topic will fail to be created when a consumer tries to consume and no records 
> will be returned.  As a result, the user experience is bad.  The user may 
> have no idea about this setting change and enforcement, and they just see 
> that `kafka-console-consumer` hangs with ZERO output. It is true that the 
> broker log file will provide a message (e.g. {{ERROR [KafkaApi-1] Number of 
> alive brokers '1' does not meet the required replication factor '3' for the 
> offsets topic (configured via 'offsets.topic.replication.factor'). This error 
> can be ignored if the cluster is starting up and not all brokers are up yet. 
> (kafka.server.KafkaApis)}}) but many users do not have access to the log 
> files or know how to get them.
> Suggestion: give feedback to the user/app if offsets topic cannot be created. 
>  For example, after some timeout.
> Workaround:
> Set offsets.topic.replication.factor=3



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-25 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r695463229



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(numStandbyReplicas,
+   
   statefulTasksWithClients);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(numStandbyReplicas,
+   
taskId,
+   
clientId,
+   
rackAwareAssignmentTags,
+   
clients,
+   
tasksToRemainingStandbys));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static Map computeTasksToRemainingStandbys(final 
int numStandbyReplicas, final Map statefulTasksWithClients) {

Review comment:
   This method is needed in `ClientTagAwareStandbyTaskAssignor` and 
`DefaultStandbyTaskAssignor`. Was thinking to create 
`StandbyTaskAssignmentUtils` and extract this logic in there. Wdyt @cadonna ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on t