[jira] [Comment Edited] (KAFKA-10313) Out of range offset errors leading to offset reset
[ https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403089#comment-17403089 ] qiang Liu edited comment on KAFKA-10313 at 8/26/21, 6:35 AM: - I am using 1.1.1, got offset out of range when partition leader switch. it may be caused by KAFKA-9835 was (Author: iamgd67): I am using 1.1.1, got offset out of range when partition leader switch. it may be caused by [KAFKA-9835|https://issues.apache.org/jira/browse/KAFKA-9835] > Out of range offset errors leading to offset reset > -- > > Key: KAFKA-10313 > URL: https://issues.apache.org/jira/browse/KAFKA-10313 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.2.2 >Reporter: Varsha Abhinandan >Priority: Major > > Hi, > > We have been occasionally noticing offset resets happening on the Kafka > consumer because of offset out of range error. However, I don't see any > errors in the broker logs. No logs related to leader-election, replica lag, > Kafka broker pod restarts or anything. (just info logs were enabled in the > prod environment). > > It appeared from the logs that the out of range error was because of the > fetch offset being larger than the offset range on the broker. Noticed this > happening multiple times on different consumers, stream apps in the prod > environment. So, it doesn't seem like an application bug and more like a bug > in the KafkaConsumer. Would like to understand the cause for such errors. > > Also, none of the offset reset options are desirable. Choosing "earliest" > creates a sudden huge lag (we have a retention of 24hours) and choosing > "latest" leads to data loss (the records produced between the out of range > error and when offset reset happens on the consumer). So, wondering if it is > better for the Kafka client to separate out 'auto.offset.reset' config for > just offset not found. For, out of range error maybe the Kafka client can > automatically reset the offset to latest if the fetch offset is higher to > prevent data loss. Also, automatically reset it to earliest if the fetch > offset is lesser than the start offset. > > Following are the logs on the consumer side : > {noformat} > [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 > ([prd453-19-event-upsert]-bo-pipeline-12)] > [o.a.k.c.consumer.internals.Fetcher] [Consumer > clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, > groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range > for partition prd453-19-event-upsert-32, resetting offset > [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 > ([prd453-19-event-upsert]-bo-pipeline-12)] > [o.a.k.c.consumer.internals.Fetcher] [Consumer > clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, > groupId=bo-indexer-group-prd453-19] Resetting offset for partition > prd453-19-event-upsert-32 to offset 453223789. > {noformat} > Broker logs for the partition : > {noformat} > [2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable > segments with base offsets [452091893] due to retention time 8640ms breach > [2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log > segment [baseOffset 452091893, size 1073741693] for deletion. > [2020-07-17T07:40:12,083Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log > start offset to 453223789 > [2020-07-17T07:41:12,083Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment > 452091893 > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted log > /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted. > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted offset index > /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted. > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted time index > /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted. > [2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] > [kafka.log.ProducerStateManager] [ProducerStateManager > partition=prd453-19-event-upsert-32] Writing producer snapshot at offset > 475609786 > [2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] > [k
[jira] [Comment Edited] (KAFKA-10313) Out of range offset errors leading to offset reset
[ https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403089#comment-17403089 ] qiang Liu edited comment on KAFKA-10313 at 8/26/21, 6:31 AM: - I am using 1.1.1, got offset out of range when partition leader switch. it may be caused by [KAFKA-9835|https://issues.apache.org/jira/browse/KAFKA-9835] was (Author: iamgd67): I am using 1.1.1, got offset out of range when partition leader switch. it may be caused by https://issues.apache.org/jira/browse/KAFKA-9835 > Out of range offset errors leading to offset reset > -- > > Key: KAFKA-10313 > URL: https://issues.apache.org/jira/browse/KAFKA-10313 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.2.2 >Reporter: Varsha Abhinandan >Priority: Major > > Hi, > > We have been occasionally noticing offset resets happening on the Kafka > consumer because of offset out of range error. However, I don't see any > errors in the broker logs. No logs related to leader-election, replica lag, > Kafka broker pod restarts or anything. (just info logs were enabled in the > prod environment). > > It appeared from the logs that the out of range error was because of the > fetch offset being larger than the offset range on the broker. Noticed this > happening multiple times on different consumers, stream apps in the prod > environment. So, it doesn't seem like an application bug and more like a bug > in the KafkaConsumer. Would like to understand the cause for such errors. > > Also, none of the offset reset options are desirable. Choosing "earliest" > creates a sudden huge lag (we have a retention of 24hours) and choosing > "latest" leads to data loss (the records produced between the out of range > error and when offset reset happens on the consumer). So, wondering if it is > better for the Kafka client to separate out 'auto.offset.reset' config for > just offset not found. For, out of range error maybe the Kafka client can > automatically reset the offset to latest if the fetch offset is higher to > prevent data loss. Also, automatically reset it to earliest if the fetch > offset is lesser than the start offset. > > Following are the logs on the consumer side : > {noformat} > [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 > ([prd453-19-event-upsert]-bo-pipeline-12)] > [o.a.k.c.consumer.internals.Fetcher] [Consumer > clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, > groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range > for partition prd453-19-event-upsert-32, resetting offset > [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 > ([prd453-19-event-upsert]-bo-pipeline-12)] > [o.a.k.c.consumer.internals.Fetcher] [Consumer > clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, > groupId=bo-indexer-group-prd453-19] Resetting offset for partition > prd453-19-event-upsert-32 to offset 453223789. > {noformat} > Broker logs for the partition : > {noformat} > [2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable > segments with base offsets [452091893] due to retention time 8640ms breach > [2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log > segment [baseOffset 452091893, size 1073741693] for deletion. > [2020-07-17T07:40:12,083Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log > start offset to 453223789 > [2020-07-17T07:41:12,083Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment > 452091893 > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted log > /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted. > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted offset index > /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted. > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted time index > /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted. > [2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] > [kafka.log.ProducerStateManager] [ProducerStateManager > partition=prd453-19-event-upsert-32] Writing producer snapshot at offset > 475609786 > [2020-07-17T07:52:31,836Z] [INFO ] [dat
[jira] [Comment Edited] (KAFKA-10313) Out of range offset errors leading to offset reset
[ https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403089#comment-17403089 ] qiang Liu edited comment on KAFKA-10313 at 8/26/21, 6:30 AM: - I am using 1.1.1, got offset out of range when partition leader switch. it may be caused by https://issues.apache.org/jira/browse/KAFKA-9835 was (Author: iamgd67): I am using 1.1.1, likely encounter the same issue. > Out of range offset errors leading to offset reset > -- > > Key: KAFKA-10313 > URL: https://issues.apache.org/jira/browse/KAFKA-10313 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.2.2 >Reporter: Varsha Abhinandan >Priority: Major > > Hi, > > We have been occasionally noticing offset resets happening on the Kafka > consumer because of offset out of range error. However, I don't see any > errors in the broker logs. No logs related to leader-election, replica lag, > Kafka broker pod restarts or anything. (just info logs were enabled in the > prod environment). > > It appeared from the logs that the out of range error was because of the > fetch offset being larger than the offset range on the broker. Noticed this > happening multiple times on different consumers, stream apps in the prod > environment. So, it doesn't seem like an application bug and more like a bug > in the KafkaConsumer. Would like to understand the cause for such errors. > > Also, none of the offset reset options are desirable. Choosing "earliest" > creates a sudden huge lag (we have a retention of 24hours) and choosing > "latest" leads to data loss (the records produced between the out of range > error and when offset reset happens on the consumer). So, wondering if it is > better for the Kafka client to separate out 'auto.offset.reset' config for > just offset not found. For, out of range error maybe the Kafka client can > automatically reset the offset to latest if the fetch offset is higher to > prevent data loss. Also, automatically reset it to earliest if the fetch > offset is lesser than the start offset. > > Following are the logs on the consumer side : > {noformat} > [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 > ([prd453-19-event-upsert]-bo-pipeline-12)] > [o.a.k.c.consumer.internals.Fetcher] [Consumer > clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, > groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range > for partition prd453-19-event-upsert-32, resetting offset > [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 > ([prd453-19-event-upsert]-bo-pipeline-12)] > [o.a.k.c.consumer.internals.Fetcher] [Consumer > clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, > groupId=bo-indexer-group-prd453-19] Resetting offset for partition > prd453-19-event-upsert-32 to offset 453223789. > {noformat} > Broker logs for the partition : > {noformat} > [2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable > segments with base offsets [452091893] due to retention time 8640ms breach > [2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log > segment [baseOffset 452091893, size 1073741693] for deletion. > [2020-07-17T07:40:12,083Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log > start offset to 453223789 > [2020-07-17T07:41:12,083Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment > 452091893 > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted log > /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted. > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted offset index > /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted. > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted time index > /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted. > [2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] > [kafka.log.ProducerStateManager] [ProducerStateManager > partition=prd453-19-event-upsert-32] Writing producer snapshot at offset > 475609786 > [2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] > [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data
[jira] [Created] (KAFKA-13229) KIP-761: implement a total blocked time metric in Kafka Streams
Rohan Desai created KAFKA-13229: --- Summary: KIP-761: implement a total blocked time metric in Kafka Streams Key: KAFKA-13229 URL: https://issues.apache.org/jira/browse/KAFKA-13229 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 3.1.0 Reporter: Rohan Desai Fix For: 3.1.0 KIP-761 proposes a total blocked time metric in streams that measures the total time (since the thread was started) that a given thread is blocked on Kafka. KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman opened a new pull request #11263: MINOR: remove unused Properties from GraphNode#writeToTopology
ableegoldman opened a new pull request #11263: URL: https://github.com/apache/kafka/pull/11263 While going through the StreamsBuilder#build method to see if any of the steps actually modify internal state, I noticed this GraphNode method accepts a Properties input parameter but never uses it in any of its implementations. We can remove this parameter to clean things up, and make it clear that writing nodes to the topology doesn't involve the app properties. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r696290766 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() { assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0)); } +@Test +public void testMeasureCommitSyncDuration() { +// use a consumer that will throw to ensure we return quickly Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r696290580 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() { assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0)); } +@Test +public void testMeasureCommitSyncDuration() { +// use a consumer that will throw to ensure we return quickly +Time time = new MockTime(Duration.ofSeconds(1).toMillis()); +SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); +ConsumerMetadata metadata = createMetadata(subscription); +MockClient client = new MockClient(time, metadata); +initMetadata(client, singletonMap(topic, 1)); +Node node = metadata.fetch().nodes().get(0); +ConsumerPartitionAssignor assignor = new RangeAssignor(); +client.createPendingAuthenticationError(node, 0); +final KafkaConsumer consumer += newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); Review comment: refactored -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11259: HOTFIX: decrease session timeout in flaky NamedTopologyIntegrationTest
ableegoldman merged pull request #11259: URL: https://github.com/apache/kafka/pull/11259 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #11233: HOTFIX: Disable spurious left/outer stream-stream join fix
mjsax commented on pull request #11233: URL: https://github.com/apache/kafka/pull/11233#issuecomment-906091101 @showuon -- thanks for taking care of the docs -- I did actually not forget about it and was just about to do a follow up PR -- great that it is already done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r696282952 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() { assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0)); } +@Test +public void testMeasureCommitSyncDuration() { +// use a consumer that will throw to ensure we return quickly Review comment: Yeah. There are not tests for that path, and I lost steam trying to pay that debt just to implement this metric. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13160) Fix the code that calls the broker's config handler to pass the expected default resource name when using KRaft.
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-13160: --- Summary: Fix the code that calls the broker's config handler to pass the expected default resource name when using KRaft. (was: Fix the code that calls the broker’s config handler to pass the expected default resource name when using KRaft.) > Fix the code that calls the broker's config handler to pass the expected > default resource name when using KRaft. > > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the KRaft brokers, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in metadata with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. The code that > calls the handler method for config changes should be fixed to pass > "" instead of empty string to the handler method if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r696280437 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() { assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0)); } +@Test +public void testMeasureCommitSyncDuration() { +// use a consumer that will throw to ensure we return quickly +Time time = new MockTime(Duration.ofSeconds(1).toMillis()); +SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); +ConsumerMetadata metadata = createMetadata(subscription); +MockClient client = new MockClient(time, metadata); +initMetadata(client, singletonMap(topic, 1)); +Node node = metadata.fetch().nodes().get(0); +ConsumerPartitionAssignor assignor = new RangeAssignor(); +client.createPendingAuthenticationError(node, 0); +final KafkaConsumer consumer += newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); Review comment: it doesn't set a tick on the mock time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13032) Impossible stacktrace
[ https://issues.apache.org/jira/browse/KAFKA-13032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404902#comment-17404902 ] Matthias J. Sax commented on KAFKA-13032: - It seems [~cadonna] reviewed the PR – so I'll leave it to him to merge it. > Impossible stacktrace > - > > Key: KAFKA-13032 > URL: https://issues.apache.org/jira/browse/KAFKA-13032 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Niclas Hedhman >Assignee: Yanwen Lin >Priority: Minor > Labels: beginner, easy-fix > > I am presented with a stacktrace that has not a single touch point in my > code, so it is incredibly difficult to figure out where the problem could be. > I think more RuntimeExceptions need to be caught and pull out information at > each level that is providing any additional hint of where we are. > For instance, each node could prepend its reference/name and one would have a > chance to see where we are... > ``` > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_140, processor=KSTREAM-SOURCE-00, topic=_poll, > partition=140, offset=0, stacktrace=java.lang.NullPointerException > at > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:268) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214) > at > org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:50) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafk
[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler
[ https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404900#comment-17404900 ] Matthias J. Sax commented on KAFKA-13195: - [~cadonna] – I think it's still open for discussion if such a new feature is useful or not. Thus, it might be ok to leave it open to collect more feedback? [~tchiotludo] – as pointed out before: changing the serialization format is considered a "non-compatible" change, and thus it's questionable if using a deserialization handler would be the right solution for this problem. I would not jump to a conclusion too eagerly. It's a difficult problem that might need a more generic solution. – Atm, if you change the format, it's required to reset your application to wipe out old state and start with empty state. > StateSerde don't honor DeserializationExceptionHandler > -- > > Key: KAFKA-13195 > URL: https://issues.apache.org/jira/browse/KAFKA-13195 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Ludo >Priority: Major > > Kafka streams allow to configure an > [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling] > > When you are using a StateStore most of message will be a copy of original > message in internal topic and mostly will use the same serializer if the > message is another type. > You can see > [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161] > that StateSerde is using the raw Deserializer and not honor the > {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}. > Leading to crash the application (reaching the > {{setUncaughtExceptionHandler}} method). > I think the state store must have the same behavior than the > {{RecordDeserializer}} and honor the DeserializationExceptionHandler. > > Stacktrace (coming from kafka stream 2.6.1) : > > {code:java} > Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_14, processor=workertaskjoined-repartition-source, > topic=kestra_executor-workertaskjoined-repartition, partition=14, > offset=167500, > stacktrace=org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize > value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String > "txt": not one of the values accepted for Enum class: > [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: > (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through > reference chain: > io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"]) > at > com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67) > at > com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851) > at > com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188) > at > com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107) > at > com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244) > at > com.fasterxml.jackson.databind.deser.std.CollectionDese
[jira] [Updated] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
[ https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-7271: -- Fix Version/s: (was: 3.0.0) > Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers > --- > > Key: KAFKA-7271 > URL: https://issues.apache.org/jira/browse/KAFKA-7271 > Project: Kafka > Issue Type: Improvement > Components: streams, system tests >Reporter: John Roesler >Priority: Blocker > > Fix in the oldest branch that ignores the test and cherry-pick forward. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-5905) Remove PrincipalBuilder and DefaultPrincipalBuilder
[ https://issues.apache.org/jira/browse/KAFKA-5905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-5905: -- Fix Version/s: (was: 3.0.0) > Remove PrincipalBuilder and DefaultPrincipalBuilder > --- > > Key: KAFKA-5905 > URL: https://issues.apache.org/jira/browse/KAFKA-5905 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Manikumar >Priority: Blocker > > These classes were deprecated after KIP-189: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-189%3A+Improve+principal+builder+interface+and+add+support+for+SASL, > which is part of 1.0.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12582) Remove deprecated `ConfigEntry` constructor
[ https://issues.apache.org/jira/browse/KAFKA-12582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-12582: --- Fix Version/s: (was: 3.0.0) > Remove deprecated `ConfigEntry` constructor > --- > > Key: KAFKA-12582 > URL: https://issues.apache.org/jira/browse/KAFKA-12582 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > ConfigEntry's constructor was deprecated in 1.1.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10329) Enable connector context in logs by default
[ https://issues.apache.org/jira/browse/KAFKA-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-10329: --- Fix Version/s: (was: 3.0.0) > Enable connector context in logs by default > --- > > Key: KAFKA-10329 > URL: https://issues.apache.org/jira/browse/KAFKA-10329 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Randall Hauch >Priority: Blocker > Labels: needs-kip > > When > [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs] > was implemented and released as part of AK 2.3, we chose to not enable these > extra logging context information by default because it was not backward > compatible, and anyone relying upon the `connect-log4j.properties` file > provided by the AK distribution would after an upgrade to AK 2.3 (or later) > see different formats for their logs, which could break any log processing > functionality they were relying upon. > However, we should enable this in AK 3.0, whenever that comes. Doing so will > require a fairly minor KIP to change the `connect-log4j.properties` file > slightly. > Marked this as BLOCKER since it's a backward incompatible change that we > definitely want to do in the 3.0.0 release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12929) KIP-750: Deprecate support for Java 8 in Kafka 3.0
[ https://issues.apache.org/jira/browse/KAFKA-12929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-12929. Resolution: Fixed Resolving to unblock the RC for 3.0.0 and will keep a note to add to the downloads page > KIP-750: Deprecate support for Java 8 in Kafka 3.0 > -- > > Key: KAFKA-12929 > URL: https://issues.apache.org/jira/browse/KAFKA-12929 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12599) Remove deprecated --zookeeper in preferredReplicaLeaderElectionCommand
[ https://issues.apache.org/jira/browse/KAFKA-12599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-12599: --- Fix Version/s: (was: 3.0.0) > Remove deprecated --zookeeper in preferredReplicaLeaderElectionCommand > -- > > Key: KAFKA-12599 > URL: https://issues.apache.org/jira/browse/KAFKA-12599 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12930) KIP-751: Deprecate support for Scala 2.12 in Kafka 3.0
[ https://issues.apache.org/jira/browse/KAFKA-12930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-12930. Resolution: Fixed Resolving to unblock the RC for 3.0.0 and will keep a note to add to the downloads page > KIP-751: Deprecate support for Scala 2.12 in Kafka 3.0 > -- > > Key: KAFKA-12930 > URL: https://issues.apache.org/jira/browse/KAFKA-12930 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13095) TransactionsTest is failing in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-13095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-13095: --- Fix Version/s: (was: 3.0.0) > TransactionsTest is failing in kraft mode > - > > Key: KAFKA-13095 > URL: https://issues.apache.org/jira/browse/KAFKA-13095 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Colin McCabe >Assignee: Jason Gustafson >Priority: Blocker > > TransactionsTest#testSendOffsetsToTransactionTimeout keeps flaking on Jenkins. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13159) Enable system tests for transactions in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404886#comment-17404886 ] Konstantine Karantasis commented on KAFKA-13159: For the remainder of this ticket I'm moving to 3.0.1 because this open ticket is blocking the 3.0.0 RC > Enable system tests for transactions in KRaft mode > -- > > Key: KAFKA-13159 > URL: https://issues.apache.org/jira/browse/KAFKA-13159 > Project: Kafka > Issue Type: Test >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.1.0, 3.0.1 > > > Previously, we disabled several system tests involving system tests in KRaft > mode. Now that KIP-730 is complete and transactions work in KRaft, we need to > re-enable these tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13159) Enable system tests for transactions in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-13159: --- Fix Version/s: (was: 3.0.0) 3.0.1 > Enable system tests for transactions in KRaft mode > -- > > Key: KAFKA-13159 > URL: https://issues.apache.org/jira/browse/KAFKA-13159 > Project: Kafka > Issue Type: Test >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.1.0, 3.0.1 > > > Previously, we disabled several system tests involving system tests in KRaft > mode. Now that KIP-730 is complete and transactions work in KRaft, we need to > re-enable these tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13223) Idempotent producer error with Kraft
[ https://issues.apache.org/jira/browse/KAFKA-13223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-13223: --- Fix Version/s: (was: 3.0.0) > Idempotent producer error with Kraft > - > > Key: KAFKA-13223 > URL: https://issues.apache.org/jira/browse/KAFKA-13223 > Project: Kafka > Issue Type: Bug > Components: kraft, producer >Reporter: Laurynas Butkus >Priority: Major > > I get an error *"The broker does not support INIT_PRODUCER_ID"* if I try to > produce a message idempotence enabled. > Result: > {code:java} > ➜ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test --request-required-acks -1 --producer-property enable.idempotence=true > >test > >[2021-08-23 19:40:33,356] ERROR [Producer clientId=console-producer] > >Aborting producer batches due to fatal error > >(org.apache.kafka.clients.producer.internals.Sender) > org.apache.kafka.common.errors.UnsupportedVersionException: The broker does > not support INIT_PRODUCER_ID > [2021-08-23 19:40:33,358] ERROR Error when sending message to topic test with > key: null, value: 4 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > org.apache.kafka.common.errors.UnsupportedVersionException: The broker does > not support INIT_PRODUCER_ID > {code} > > It works fine with idempotence disabled. Also it works fine if using > zookeeper. > Tested with altered docker image: > {code:java} > FROM confluentinc/cp-kafka:6.2.0 > RUN sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure && \ > # Docker workaround: Ignore cub zk-ready > sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure && > \ > # KRaft required step: Format the storage directory with a new cluster ID > echo "kafka-storage format --ignore-formatted -t $(kafka-storage random-uuid) > -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure > {code} > docker-compose.yml > {code:java} > version: '3.4' > services: > kafka: > build: kafka > restart: unless-stopped > environment: > ALLOW_PLAINTEXT_LISTENER: "yes" > KAFKA_HEAP_OPTS: -Xms256m -Xmx256m > LOG4J_LOGGER_KAFKA: "WARN" > KAFKA_BROKER_ID: 1 > KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: > 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' > KAFKA_ADVERTISED_LISTENERS: > 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092' > KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 > KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 > KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 > KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 > KAFKA_PROCESS_ROLES: 'broker,controller' > KAFKA_NODE_ID: 1 > KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' > KAFKA_LISTENERS: > 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://:9092' > KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' > KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' > KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' > ports: > - "127.0.0.1:9092:9092/tcp" > command: "/etc/confluent/docker/run" > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (KAFKA-13223) Idempotent producer error with Kraft
[ https://issues.apache.org/jira/browse/KAFKA-13223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis closed KAFKA-13223. -- > Idempotent producer error with Kraft > - > > Key: KAFKA-13223 > URL: https://issues.apache.org/jira/browse/KAFKA-13223 > Project: Kafka > Issue Type: Bug > Components: kraft, producer >Reporter: Laurynas Butkus >Priority: Major > Fix For: 3.0.0 > > > I get an error *"The broker does not support INIT_PRODUCER_ID"* if I try to > produce a message idempotence enabled. > Result: > {code:java} > ➜ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test --request-required-acks -1 --producer-property enable.idempotence=true > >test > >[2021-08-23 19:40:33,356] ERROR [Producer clientId=console-producer] > >Aborting producer batches due to fatal error > >(org.apache.kafka.clients.producer.internals.Sender) > org.apache.kafka.common.errors.UnsupportedVersionException: The broker does > not support INIT_PRODUCER_ID > [2021-08-23 19:40:33,358] ERROR Error when sending message to topic test with > key: null, value: 4 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > org.apache.kafka.common.errors.UnsupportedVersionException: The broker does > not support INIT_PRODUCER_ID > {code} > > It works fine with idempotence disabled. Also it works fine if using > zookeeper. > Tested with altered docker image: > {code:java} > FROM confluentinc/cp-kafka:6.2.0 > RUN sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure && \ > # Docker workaround: Ignore cub zk-ready > sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure && > \ > # KRaft required step: Format the storage directory with a new cluster ID > echo "kafka-storage format --ignore-formatted -t $(kafka-storage random-uuid) > -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure > {code} > docker-compose.yml > {code:java} > version: '3.4' > services: > kafka: > build: kafka > restart: unless-stopped > environment: > ALLOW_PLAINTEXT_LISTENER: "yes" > KAFKA_HEAP_OPTS: -Xms256m -Xmx256m > LOG4J_LOGGER_KAFKA: "WARN" > KAFKA_BROKER_ID: 1 > KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: > 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' > KAFKA_ADVERTISED_LISTENERS: > 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092' > KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 > KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 > KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 > KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 > KAFKA_PROCESS_ROLES: 'broker,controller' > KAFKA_NODE_ID: 1 > KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' > KAFKA_LISTENERS: > 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://:9092' > KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' > KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' > KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' > ports: > - "127.0.0.1:9092:9092/tcp" > command: "/etc/confluent/docker/run" > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] alapidas opened a new pull request #11262: KAFKA-12963: Add processor name to error
alapidas opened a new pull request #11262: URL: https://github.com/apache/kafka/pull/11262 This PR adds the processor name to the `ClassCastException` exception text in `process()` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11235: KAFKA-13216: write correct tombstones into stream-stream join store changelog
mjsax commented on a change in pull request #11235: URL: https://github.com/apache/kafka/pull/11235#discussion_r696258817 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RawKeyAccessor.java ## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals.metrics; + +import java.util.Collection; +import org.apache.kafka.common.utils.Bytes; + +public interface RawKeyAccessor { Review comment: Sure, I can add some JavaDoc. Won't be brief though... It's complicated... :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12963) Improve error message for Class cast exception
[ https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Lapidas reassigned KAFKA-12963: -- Assignee: Andrew Lapidas (was: A. Sophie Blee-Goldman) > Improve error message for Class cast exception > -- > > Key: KAFKA-12963 > URL: https://issues.apache.org/jira/browse/KAFKA-12963 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Rasmus Helbig Hansen >Assignee: Andrew Lapidas >Priority: Minor > > After a topology change and starting the application again, we got this type > of error message: > [g9z-StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.TaskManager - stream-thread > [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following > error: > org.apache.kafka.streams.errors.StreamsException: ClassCastException > invoking Processor. Do the Processor's input types match the deserialized > types? Check the Serde setup and change the default Serdes in StreamConfig or > provide correct Serdes via method parameters. Make sure the Processor can > accept the deserialized input of type key: org.acme.SomeKey, and value: > org.acme.SomeValue. > Note that although incorrect Serdes are a common cause of error, the cast > exception might have another cause (in user code, for example). For example, > if a processor wires in a store, but casts the generics incorrectly, a class > cast exception could be raised during processing, but the cause would not be > wrong Serdes. > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be > cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue > are in unnamed module of loader 'app') > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > ... 20 more > [g9z-StreamThread-1] ERROR > org.apache.ka
[jira] [Commented] (KAFKA-12963) Improve error message for Class cast exception
[ https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404853#comment-17404853 ] A. Sophie Blee-Goldman commented on KAFKA-12963: Hey [~RasmusAtNordea], just to clarify the ask here is to add the processor name to this exception message? I don't think that was ever in there so I wouldn't say it's a regression, but I definitely agree that should have been in there. Is there anything else you can think of that would have been helpful in debugging this? The one thing I'm not sure makes sense is the topic – what topic would that be? The processors themselves aren't attached to any specific topic, except of course a source or a sink. But I think maybe you meant, which topic did the record come from? The problem is, even that doesn't exactly have an answer. For one thing, the ClassCastException may have come from anywhere – could just be a bug in the processing logic itself, unattached to any specific record. Of course there's the specific record that #process is being called on, but maybe you have two different topics feeding into this processor and the ClassCastException was actually due to a record from the other topic that was just fetched from an attached state store. Or maybe the record being processed was created by a punctuator and never came from any topic at all. And so on...sorry for the long-winded way of saying that there's actually a reason for the topic info being left out, unlike the processor name which definitely should be included. Hope that makes sense? > Improve error message for Class cast exception > -- > > Key: KAFKA-12963 > URL: https://issues.apache.org/jira/browse/KAFKA-12963 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Rasmus Helbig Hansen >Assignee: A. Sophie Blee-Goldman >Priority: Minor > > After a topology change and starting the application again, we got this type > of error message: > [g9z-StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.TaskManager - stream-thread > [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following > error: > org.apache.kafka.streams.errors.StreamsException: ClassCastException > invoking Processor. Do the Processor's input types match the deserialized > types? Check the Serde setup and change the default Serdes in StreamConfig or > provide correct Serdes via method parameters. Make sure the Processor can > accept the deserialized input of type key: org.acme.SomeKey, and value: > org.acme.SomeValue. > Note that although incorrect Serdes are a common cause of error, the cast > exception might have another cause (in user code, for example). For example, > if a processor wires in a store, but casts the generics incorrectly, a class > cast exception could be raised during processing, but the cause would not be > wrong Serdes. > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency
[jira] [Assigned] (KAFKA-12963) Improve error message for Class cast exception
[ https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-12963: -- Assignee: A. Sophie Blee-Goldman > Improve error message for Class cast exception > -- > > Key: KAFKA-12963 > URL: https://issues.apache.org/jira/browse/KAFKA-12963 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Rasmus Helbig Hansen >Assignee: A. Sophie Blee-Goldman >Priority: Minor > > After a topology change and starting the application again, we got this type > of error message: > [g9z-StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.TaskManager - stream-thread > [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following > error: > org.apache.kafka.streams.errors.StreamsException: ClassCastException > invoking Processor. Do the Processor's input types match the deserialized > types? Check the Serde setup and change the default Serdes in StreamConfig or > provide correct Serdes via method parameters. Make sure the Processor can > accept the deserialized input of type key: org.acme.SomeKey, and value: > org.acme.SomeValue. > Note that although incorrect Serdes are a common cause of error, the cast > exception might have another cause (in user code, for example). For example, > if a processor wires in a store, but casts the generics incorrectly, a class > cast exception could be raised during processing, but the cause would not be > wrong Serdes. > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be > cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue > are in unnamed module of loader 'app') > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > ... 20 more > [g9z-StreamThread-1] ERROR > org.apache.k
[GitHub] [kafka] showuon commented on pull request #11218: MINOR: optimize performAssignment to skip unnecessary check
showuon commented on pull request #11218: URL: https://github.com/apache/kafka/pull/11218#issuecomment-906022496 @ableegoldman , do you think this optimization make sense? And will `StreamsPartitionAssignor` have chance to distribute partitions not in the subscription list? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine merged pull request #11260: MINOR: Add missing license entry for jline in LICENSE-binary
kkonstantine merged pull request #11260: URL: https://github.com/apache/kafka/pull/11260 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #11260: MINOR: Add missing license entry for jline in LICENSE-binary
kkonstantine commented on pull request #11260: URL: https://github.com/apache/kafka/pull/11260#issuecomment-906020193 Merging to include to 3.0 without waiting for tests to run because this is just documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #11261: KAFKA-13228: Ensure ApiVersionRequest is properly handled in KRaft
dengziming opened a new pull request #11261: URL: https://github.com/apache/kafka/pull/11261 *More detailed description of your change* When I described quorum in Kraft mode I got `org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_QUORUM`. This happens because we only concerns `ApiKeys.zkBrokerApis()` when we call `NodeApiVersions.create()`, we should use `ApiKeys.controllerApiVersions` when in Kraft mode. *Summary of testing strategy (including rationale)* After this change, the DESCRIBE_QUORUM request was property handled and got a correct response: TopicData(topicName='__cluster_metadata', partitions=[PartitionData(partitionIndex=0, errorCode=0, leaderId=1, leaderEpoch=30, highWatermark=141, currentVoters=[ReplicaState(replicaId=1, logEndOffset=141)], observers=[])]) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine opened a new pull request #11260: MINOR: Add missing license entry for jline in LICENSE-binary
kkonstantine opened a new pull request #11260: URL: https://github.com/apache/kafka/pull/11260 Adding the license for https://github.com/jline/jline3 This is a commit that was missed when https://github.com/apache/kafka/pull/11232 was merged. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13228) ApiVersionRequest are not correctly handled in kraft mode
dengziming created KAFKA-13228: -- Summary: ApiVersionRequest are not correctly handled in kraft mode Key: KAFKA-13228 URL: https://issues.apache.org/jira/browse/KAFKA-13228 Project: Kafka Issue Type: Bug Reporter: dengziming Assignee: dengziming Fix For: 3.0.1 I'am trying to describe quorum in kraft mode but got `org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_QUORUM`. This happens because we only concerns `ApiKeys.zkBrokerApis()` when we call `NodeApiVersions.create()` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9792) Improve sticky task assignment for previous active tasks
[ https://issues.apache.org/jira/browse/KAFKA-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-9792: - Assignee: (was: A. Sophie Blee-Goldman) > Improve sticky task assignment for previous active tasks > > > Key: KAFKA-9792 > URL: https://issues.apache.org/jira/browse/KAFKA-9792 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Attachments: rebalance.txt > > > Due to the way the client quota is computed, we may end up passing over a > client that is not yet full. This can cause us to “miss” the client while > attempting to assign active tasks to their previous owners, and ultimately > forcing an active task _away_ from its previous client. > It’s not quite a bug, but it’s definitely sub-optimal behavior and can cause > unnecessary task shuffling. It also makes the assignment harder to > predict/debug > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8767) Optimize StickyAssignor for Cooperative mode
[ https://issues.apache.org/jira/browse/KAFKA-8767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404833#comment-17404833 ] A. Sophie Blee-Goldman commented on KAFKA-8767: --- Note that since https://issues.apache.org/jira/browse/KAFKA-9987, this problem only affects the case in which the consumers in the group have different subscriptions. Most consumer groups don't have varying subscriptions, rather each member subscribes to the same set of topics/patterns and lets the assignor work out the distribution. For those groups with equal subscriptions across all members, the assignment is always maximally sticky (while still balanced) and therefore it's not possible for it to misplace the revoked partitions and result in an extra cooperative rebalance being required in order to assign all partitions. I'll leave this ticket open since it does still affect a small minority of consumer groups, but it's probably not worth fixing at this point. > Optimize StickyAssignor for Cooperative mode > > > Key: KAFKA-8767 > URL: https://issues.apache.org/jira/browse/KAFKA-8767 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 2.4.0 >Reporter: A. Sophie Blee-Goldman >Priority: Major > > In some rare cases, the StickyAssignor will fail to balance an assignment > without violating stickiness despite a balanced and sticky assignment being > possible. The implications of this for cooperative rebalancing are that an > unnecessary additional rebalance will be triggered. > This was seen to happen for example when each consumer is subscribed to some > random subset of all topics and all their subscriptions change to a different > random subset, as occurs in > AbstractStickyAssignorTest#testReassignmentWithRandomSubscriptionsAndChanges. > The initial assignment after the random subscription change obviously > involved migrating partitions, so following the cooperative protocol those > partitions are removed from the balanced first assignment, and a second > rebalance is triggered. In some cases, during the second rebalance the > assignor was unable to reach a balanced assignment without migrating a few > partitions, even though one must have been possible (since the first > assignment was balanced). A third rebalance was needed to reach a stable > balanced state. > Under the conditions in the previously mentioned test (between 20-40 > consumers, 10-20 topics (with 0-20 partitions) this third rebalance was > required roughly 30% of the time. Some initial improvements to the sticky > assignment logic reduced this to under 15%, but we should consider closing > this gap and optimizing the cooperative sticky assignment > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8734) Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface
[ https://issues.apache.org/jira/browse/KAFKA-8734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-8734. --- Fix Version/s: 3.0.0 Resolution: Fixed > Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface > -- > > Key: KAFKA-8734 > URL: https://issues.apache.org/jira/browse/KAFKA-8734 > Project: Kafka > Issue Type: Task > Components: clients >Affects Versions: 3.0.0 >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0 > > > In 2.4 we deprecated the consumer.internal.PartitionAssignor interface and > migrated all assignors to the [new public consumer.ConsumerPartitionAssignor > interface|[https://github.com/apache/kafka/pull/7108|https://github.com/apache/kafka/pull/7108/files]]. > Although internal, we provided an [adapter > |[https://github.com/apache/kafka/pull/7110]]for those who may have > implemented a custom PartitionAssignor to avoid breaking changes. These > should be removed in the next major release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8734) Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface
[ https://issues.apache.org/jira/browse/KAFKA-8734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-8734: -- Affects Version/s: (was: 3.0.0) > Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface > -- > > Key: KAFKA-8734 > URL: https://issues.apache.org/jira/browse/KAFKA-8734 > Project: Kafka > Issue Type: Task > Components: clients >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0 > > > In 2.4 we deprecated the consumer.internal.PartitionAssignor interface and > migrated all assignors to the [new public consumer.ConsumerPartitionAssignor > interface|[https://github.com/apache/kafka/pull/7108|https://github.com/apache/kafka/pull/7108/files]]. > Although internal, we provided an [adapter > |[https://github.com/apache/kafka/pull/7110]]for those who may have > implemented a custom PartitionAssignor to avoid breaking changes. These > should be removed in the next major release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jasonyanwenl commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r696210814 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,12 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +final Record record = new Record<>("K", 0, 0L); +final Throwable throwable = assertThrows(NullPointerException.class, () -> supplier.get().process(record)); +assertEquals(throwable.getMessage(), String.format("KeyValueMapper can't return null from mapping the record: %s", record)); Review comment: 1. Just curious, why preferred `assertThat` from `hamcrest`? I saw in our code base, different places use `junit` / `hamcrest`. Any rule for guiding when to use which framework? 2. Other testing in this file are using `junit`. So if I only use `hamcrest` in this single place, this class will import both `junit` and `hamcrest` and mix the using of those two. I think this is a bit ugly LOL. Do you suggest updating all to using `hamcrest`? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
junrao commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r696210968 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -630,7 +630,7 @@ class KafkaController(val config: KafkaConfig, // If replica failure did not require leader re-election, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata -if (partitionsWithOfflineLeader.isEmpty) { +if (newOfflineReplicas.isEmpty) { Review comment: Good point. Basically, if there is at least one partition that has gone through each `partitionStateMachine.triggerOnlinePartitionStateChange()` or ` replicaStateMachine.handleStateChanges()`, there is no need to send UpdateMetadataRequest since we broadcast metadata to every broker in each call. If partitionsWithOfflineLeader is empty but partitionsWithOfflineLeader is not empty, there is no need to send UpdateMetadataRequest. So, the check probably should be sth like ` if (newOfflineReplicas.isEmpty || (partitionsWithOfflineLeader.isEmpty && partitionsWithOfflineLeader.isEmpty))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jasonyanwenl commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r696210814 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,12 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +final Record record = new Record<>("K", 0, 0L); +final Throwable throwable = assertThrows(NullPointerException.class, () -> supplier.get().process(record)); +assertEquals(throwable.getMessage(), String.format("KeyValueMapper can't return null from mapping the record: %s", record)); Review comment: Just curious, why preferred `assertThat` from `hamcrest`? I saw in our code base, different places use `junit` / `hamcrest`. Any rule for guiding when to use which framework? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonyanwenl edited a comment on pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jasonyanwenl edited a comment on pull request #11241: URL: https://github.com/apache/kafka/pull/11241#issuecomment-905975169 > Thanks for the PR @jasonyanwenl ! > > Here my feedback! > > I think we should consider to introduce a specific exception for exceptions that originate from user code. Such exceptions could be distinguished from other exceptions when caught in the [Streams uncaught exception handler](https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.html) and better reacted upon. > > Adding such an exception would require a KIP. Are you interested? Hi @cadonna, thanks for yor feedback. Those are great insights! I'm definitely interested in doing this. But before that, have several questions: 1. Will this be the first case where we have a distiguished exception type for user code error? 2. Shall we finish this as a simple beginner issue and create a new ticket tracking the design & impl of that? The concern is that this NPE ticket might not be the only place where the new exceptioin could bring benefit and we probably should analyze code to expand the scope and update exception type all at once in a dedicated PR/ticket. Hence I think separating those two is suitable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonyanwenl commented on pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jasonyanwenl commented on pull request #11241: URL: https://github.com/apache/kafka/pull/11241#issuecomment-905975169 > Thanks for the PR @jasonyanwenl ! > > Here my feedback! > > I think we should consider to introduce a specific exception for exceptions that originate from user code. Such exceptions could be distinguished from other exceptions when caught in the [Streams uncaught exception handler](https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.html) and better reacted upon. > > Adding such an exception would require a KIP. Are you interested? Hi @cadonna, thanks for yor feedback. Those are great insights! I'm definitely interested in doing this. But before that, have several questions: 1. Will this be the first case where we have a distiguished exception type for user code error? 2. Shall we finish this as a simple beginner issue and create a new ticket tracking the design & impl of that? The concern is that this NPE ticket might not be the only place where the new exceptioin could bring benefit and we probably should analyze code to expand the scope and update exception type all at once in a another PR/ticket. Hence I think separating those two is suitable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9858: KAFKA-12173 Migrate streams:streams-scala module to JUnit 5
showuon commented on a change in pull request #9858: URL: https://github.com/apache/kafka/pull/9858#discussion_r696201298 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -75,9 +76,19 @@ @Category(IntegrationTest.class) public class AdjustStreamThreadCountTest { -@ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); Review comment: I'll check it! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11259: HOTFIX: decrease session timeout in flaky NamedTopologyIntegrationTest
ableegoldman commented on pull request #11259: URL: https://github.com/apache/kafka/pull/11259#issuecomment-905946923 @wcarlson5 @vvcephei @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #11259: HOTFIX: decrease session timeout in flaky NamedTopologyIntegrationTest
ableegoldman opened a new pull request #11259: URL: https://github.com/apache/kafka/pull/11259 Since the default session timeout was bumped to 45s a number of our integration tests have begun failing. Let's reset it back to 10s for these NamedTopologyIntegrationTests which have been a bit flaky to help parse out whether it's just environmental, or possibly something more...sinister 😈 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9858: KAFKA-12173 Migrate streams:streams-scala module to JUnit 5
ableegoldman commented on a change in pull request #9858: URL: https://github.com/apache/kafka/pull/9858#discussion_r696181741 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -75,9 +76,19 @@ @Category(IntegrationTest.class) public class AdjustStreamThreadCountTest { -@ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); Review comment: cc also @showuon -- would you be interested in taking a shot at this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #11258: MINOR: add to empty, remove then add different test
wcarlson5 commented on pull request #11258: URL: https://github.com/apache/kafka/pull/11258#issuecomment-905910457 @ableegoldman whatever works best for you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`
hachikuji commented on a change in pull request #11225: URL: https://github.com/apache/kafka/pull/11225#discussion_r696142107 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -2207,15 +2191,27 @@ class ReplicaManager(val config: KafkaConfig, InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset)) } else { stateChangeLogger.info( -s"Skipped the become-follower state change after marking its partition as " + +"Skipped the become-follower state change after marking its partition as " + s"follower for partition $tp with id ${info.topicId} and partition state $state." ) } } } changedPartitions.add(partition) } catch { - case e: Throwable => stateChangeLogger.error(s"Unable to start fetching ${tp} " + + case e: KafkaStorageException => +stateChangeLogger.error(s"Unable to start fetching $tp " + + s"with topic ID ${info.topicId} due to a storage error ${e.getMessage}", e) +replicaFetcherManager.addFailedPartition(tp) +// If there is an offline log directory, a Partition object may have been created by +// `getOrCreatePartition()` before `createLogIfNotExists()` failed to create local replica due +// to KafkaStorageException. In this case `ReplicaManager.allPartitions` will map this topic-partition +// to an empty Partition object. We need to map this topic-partition to OfflinePartition instead. +markPartitionOffline(tp) + + Review comment: nit: extra newline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
splett2 commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r696141192 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -630,7 +630,7 @@ class KafkaController(val config: KafkaConfig, // If replica failure did not require leader re-election, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata -if (partitionsWithOfflineLeader.isEmpty) { +if (newOfflineReplicas.isEmpty) { Review comment: I don't follow why the check for `partitionsWithOfflineLeader` is necessary. If the broker doesn't host any replicas, then we need to send out `UpdateMetadataRequest` because the partition state machines will not trigger any control requests. This is handled by the check for `newOfflineReplicas.isEmpty` If the broker does host replicas, then shouldnt `replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)` handle sending out UpdateMetadataRequest to propagate the offline broker? Maybe we would also need a check if `newOfflineReplicasNotForDeletion` is empty as well, but `partitionsWithOfflineLeader.isEmpty` seems redundant and fairly coarse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
splett2 commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r696141192 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -630,7 +630,7 @@ class KafkaController(val config: KafkaConfig, // If replica failure did not require leader re-election, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata -if (partitionsWithOfflineLeader.isEmpty) { +if (newOfflineReplicas.isEmpty) { Review comment: I don't follow why the check for `partitionsWithOfflineLeader` is necessary. If the broker doesn't host any replicas, then we need to send out `UpdateMetadataRequest` because the partition state machines will not trigger any control requests. This is handled by the check for `newOfflineReplicas.isEmpty` If the broker does host replicas, then shouldnt `replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)` handle sending out UpdateMetadataRequest to propagate the offline broker? `partitionsWithOfflineLeader.isEmpty` seems redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
splett2 commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r696141192 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -630,7 +630,7 @@ class KafkaController(val config: KafkaConfig, // If replica failure did not require leader re-election, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata -if (partitionsWithOfflineLeader.isEmpty) { +if (newOfflineReplicas.isEmpty) { Review comment: I'm still don't follow why the check for `partitionsWithOfflineLeader` is necessary. If the broker doesn't host any replicas, then we need to send out `UpdateMetadataRequest` because the partition state machines will not trigger any control requests. If the broker does host replicas, then shouldnt `replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)` handle sending out UpdateMetadataRequest to propagate the offline broker? `partitionsWithOfflineLeader.isEmpty` seems redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11258: MINOR: add to empty, remove then add different test
ableegoldman commented on pull request #11258: URL: https://github.com/apache/kafka/pull/11258#issuecomment-905900986 Sounds like this is waiting on the blocking functionality coming in Pt. 4 -- thanks for the test, do you want to just leave this PR until Pt. 4 is merged and then make sure it passes? Alternatively I can copy this over to the Pt. 4 PR and merge it with that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
hachikuji commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r696118985 ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -1622,19 +1623,37 @@ object TestUtils extends Logging { waitForLeaderToBecome(client, topicPartition, None) } - def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = { + def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = { +waitUntilTrue(() => { + val nodes = client.describeCluster().nodes().get() + nodes.asScala.exists(_.id == brokerId) +}, s"Timed out waiting for brokerId $brokerId to come online") + } + + def waitForLeaderToBecome( +client: Admin, +topicPartition: TopicPartition, +expectedLeaderOpt: Option[Int] + ): Unit = { val topic = topicPartition.topic -val partition = topicPartition.partition +val partitionId = topicPartition.partition + +def currentLeader: Try[Option[Int]] = Try { + val topicDescription = client.describeTopics(List(topic).asJava).all.get.get(topic) + topicDescription.partitions.asScala +.find(_.partition == partitionId) +.flatMap(partitionState => Option(partitionState.leader)) Review comment: Yes: ``` /** * Return the leader of the partition or null if there is none. */ public Node leader() { return leader; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
hachikuji commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r696118985 ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -1622,19 +1623,37 @@ object TestUtils extends Logging { waitForLeaderToBecome(client, topicPartition, None) } - def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = { + def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = { +waitUntilTrue(() => { + val nodes = client.describeCluster().nodes().get() + nodes.asScala.exists(_.id == brokerId) +}, s"Timed out waiting for brokerId $brokerId to come online") + } + + def waitForLeaderToBecome( +client: Admin, +topicPartition: TopicPartition, +expectedLeaderOpt: Option[Int] + ): Unit = { val topic = topicPartition.topic -val partition = topicPartition.partition +val partitionId = topicPartition.partition + +def currentLeader: Try[Option[Int]] = Try { + val topicDescription = client.describeTopics(List(topic).asJava).all.get.get(topic) + topicDescription.partitions.asScala +.find(_.partition == partitionId) +.flatMap(partitionState => Option(partitionState.leader)) Review comment: Yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
hachikuji commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r696117735 ## File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java ## @@ -189,17 +197,39 @@ public void start() { @Override public void stop() { if (stopped.compareAndSet(false, true)) { -try { -clusterReference.get().close(); -} catch (Exception e) { -throw new RuntimeException("Failed to stop Raft server", e); -} + +Utils.closeQuietly(clusterReference.get(), "cluster"); +admins.forEach(admin -> Utils.closeQuietly(admin, "admin")); Review comment: Makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
hachikuji commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r696116190 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -90,8 +90,7 @@ class BrokerServer( this.logIdent = logContext.logPrefix - val lifecycleManager: BrokerLifecycleManager = -new BrokerLifecycleManager(config, time, threadNamePrefix) + private var lifecycleManager: BrokerLifecycleManager = null Review comment: This was a workaround to allow `BrokerServer` to be restartable in the same way that `KafkaServer` is. It would be better to let the test kit construct a new instance, but I decided to save that for a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
hachikuji commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r696115948 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java ## @@ -103,20 +103,22 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { ApiError apiError = ApiError.fromThrowable(e); List electionResults = new ArrayList<>(); -for (TopicPartitions topic : data.topicPartitions()) { -ReplicaElectionResult electionResult = new ReplicaElectionResult(); +if (data.topicPartitions() != null) { Review comment: We use null to indicate election for all partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
junrao commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r696035745 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -630,7 +630,7 @@ class KafkaController(val config: KafkaConfig, // If replica failure did not require leader re-election, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata -if (partitionsWithOfflineLeader.isEmpty) { +if (newOfflineReplicas.isEmpty) { Review comment: Good catch. There are two cases where we need to send UpdateMetadataRequest. 1. If the broker doesn't host any replicas. This is the new case that you identified since we won't send any UpdateMetadataRequest in PartitionStateMachine and we want to propagate the offline broker to other brokers. 2. If partitionsWithOfflineLeader is not empty, partitionStateMachine.triggerOnlinePartitionStateChange() would send UpdateMetadataRequest as part of the transitioning of those partitions to online. So, we don't need to send UpdateMetadataRequest again. If partitionsWithOfflineLeader is empty, we do want to send UpdateMetadataRequest to propagate the offline broker. So, it seem that the check should be if (newOfflineReplicas.isEmpty || partitionsWithOfflineLeader.isEmpty). It would be useful to adjust the comment above accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13227) Cancel pending AlterIsr requests after receiving LeaderAndIsr
Jason Gustafson created KAFKA-13227: --- Summary: Cancel pending AlterIsr requests after receiving LeaderAndIsr Key: KAFKA-13227 URL: https://issues.apache.org/jira/browse/KAFKA-13227 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Currently we do not cancel pending AlterIsr requests after the state has been updated through a LeaderAndIsr request received from the controller. This leads to log messages such as this {code} [2021-08-23 18:12:47,317] WARN [Partition __transaction_state-32 broker=3] Failed to enqueue ISR change state LeaderAndIsr(leader=3, leaderEpoch=3, isUncleanLeader=false, isr=List(3, 1), zkVersion=3) for partition __transaction_state-32 (kafka.cluster.Partition) {code} I think the only complication here is protecting against the AlterIsr callback which is executed asynchronously. To address this, we can move the `zkVersion` field into `IsrState`. When the callback is invoked, we can the existing state against the response state to decide whether to apply the change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lkokhreidze edited a comment on pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze edited a comment on pull request #10851: URL: https://github.com/apache/kafka/pull/10851#issuecomment-905514160 The general thought on the implementation. As of now, we choose concrete `StandbyTaskAssignor` implementation based on passed `AssignmentConfigs` value. Instead, an alternative approach would be to have a chained standby task assignment based on multiple implementations. For instance, when required client tag configuration is present, we can try to assign the standby tasks based on `List.of(new ClientTagAwareStandbyTaskAssignor(), new LeastLoadedClientStandbyTaskAssignor())`. This way, `ClientTagAwareStandbyTaskAssignor` can try to distribute the standby tasks based on tags dimensions. If there are no more client tag dimensions available, and we still have `any(tasksToRemainingStandbys) > 0`, the next implementation in the chain (in this case `LeastLoadedClientStandbyTaskAssignor`) will be called. With this, `LeastLoadedClientStandbyTaskAssignor` can default to assigning the remaining standbys to the least loaded clients. Right now, `ClientTagAwareStandbyTaskAssignor` does both assignments based on available dimensions and fallback to the least loaded if there are no enough tag dimensions. Current implementation leads to more comp lex code. While with the approach above, we can clearly separate the implementations without duplication (there's no code duplication, rather "logic" duplication). For now, I've chosen to go with the simplest approach - having two independent implementations and selecting an appropriate one based on passed `AssignmentConfigs.` Still, I wanted to share this idea here, just in case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request #11258: MINOR: add to empty, remove then add different test
wcarlson5 opened a new pull request #11258: URL: https://github.com/apache/kafka/pull/11258 add a test for named topology ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs
jolshan commented on pull request #11104: URL: https://github.com/apache/kafka/pull/11104#issuecomment-905758403 Hmmm. Not sure what to make of this exit code 1: ``` * What went wrong: Execution failed for task ':storage:unitTest'. > Process 'Gradle Test Executor 133' finished with non-zero exit value 1 ``` The other two failures look unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join
[ https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404607#comment-17404607 ] Matthias J. Sax commented on KAFKA-10847: - Updated fixed version to 3.1.0 because of https://issues.apache.org/jira/browse/KAFKA-13216 > Avoid spurious left/outer join results in stream-stream join > - > > Key: KAFKA-10847 > URL: https://issues.apache.org/jira/browse/KAFKA-10847 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sergio Peña >Priority: Major > Fix For: 3.1.0 > > > KafkaStreams follows an eager execution model, ie, it never buffers input > records but processes them right away. For left/outer stream-stream join, > this implies that left/outer join result might be emitted before the window > end (or window close) time is reached. Thus, a record what will be an > inner-join result, might produce a eager (and spurious) left/outer join > result. > We should change the implementation of the join, to not emit eager left/outer > join result, but instead delay the emission of such result after the window > grace period passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join
[ https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10847: Fix Version/s: (was: 3.0.0) 3.1.0 > Avoid spurious left/outer join results in stream-stream join > - > > Key: KAFKA-10847 > URL: https://issues.apache.org/jira/browse/KAFKA-10847 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sergio Peña >Priority: Major > Fix For: 3.1.0 > > > KafkaStreams follows an eager execution model, ie, it never buffers input > records but processes them right away. For left/outer stream-stream join, > this implies that left/outer join result might be emitted before the window > end (or window close) time is reached. Thus, a record what will be an > inner-join result, might produce a eager (and spurious) left/outer join > result. > We should change the implementation of the join, to not emit eager left/outer > join result, but instead delay the emission of such result after the window > grace period passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13115) doSend can be blocking
[ https://issues.apache.org/jira/browse/KAFKA-13115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Vaskevych updated KAFKA-13115: --- Labels: documentation pull-request-available (was: pull-request-available) > doSend can be blocking > -- > > Key: KAFKA-13115 > URL: https://issues.apache.org/jira/browse/KAFKA-13115 > Project: Kafka > Issue Type: Bug > Components: docs >Reporter: Ivan Vaskevych >Priority: Major > Labels: documentation, pull-request-available > > https://github.com/apache/kafka/pull/11023 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13115) doSend can be blocking
[ https://issues.apache.org/jira/browse/KAFKA-13115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Vaskevych updated KAFKA-13115: --- Component/s: docs > doSend can be blocking > -- > > Key: KAFKA-13115 > URL: https://issues.apache.org/jira/browse/KAFKA-13115 > Project: Kafka > Issue Type: Bug > Components: docs >Reporter: Ivan Vaskevych >Priority: Major > > https://github.com/apache/kafka/pull/11023 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13115) doSend can be blocking
[ https://issues.apache.org/jira/browse/KAFKA-13115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Vaskevych updated KAFKA-13115: --- Labels: pull-request-available (was: ) > doSend can be blocking > -- > > Key: KAFKA-13115 > URL: https://issues.apache.org/jira/browse/KAFKA-13115 > Project: Kafka > Issue Type: Bug > Components: docs >Reporter: Ivan Vaskevych >Priority: Major > Labels: pull-request-available > > https://github.com/apache/kafka/pull/11023 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs
dajac commented on pull request #11104: URL: https://github.com/apache/kafka/pull/11104#issuecomment-905684062 We can merge once we get clean builds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan edited a comment on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs
jolshan edited a comment on pull request #11104: URL: https://github.com/apache/kafka/pull/11104#issuecomment-905656665 Looks like it was a really subtle bug. Consider the case where we have a partition in the session without a topic ID. We then add a new partition from that topic with an ID, but remove the partition that didn't have an ID. Here, we want to use IDs. When the receiving broker sees the topic IDs and currently has a session without them open, it will close that session so we can switch over to fully using topic IDs. Otherwise, we may get stuck not using topic IDs a little longer than we want to. I've added a check to see if the topic ID can be found in the current builder. Let me know if this makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs
jolshan commented on pull request #11104: URL: https://github.com/apache/kafka/pull/11104#issuecomment-905656665 Looks like it was a really subtle bug. Consider the case where we have a partition in the session without a topic ID. We then add a new partition from that topic with an ID, but remove the partition that didn't have an ID. Here, we want to use IDs. When the receiving broker sees the topic IDs and currently has a session without them open, it will close that session so we can switch over to fully using topic IDs. Let me know if this makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze edited a comment on pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze edited a comment on pull request #10851: URL: https://github.com/apache/kafka/pull/10851#issuecomment-905514160 The general thought on the implementation. As of now, we choose concrete `StandbyTaskAssignor` implementation based on passed `AssignmentConfigs` value. Instead, an alternative approach would be to have a chained standby task assignment based on multiple implementations. For instance, when required client tag configuration is present, we can try to assign the standby tasks based on `List.of(new ClientTagAwareStandbyTaskAssignor(), new LeastLoadedClientStandbyTaskAssignor())`. This way, `ClientTagAwareStandbyTaskAssignor` can try to distribute the standby tasks based on tags dimensions. If there are no more client tag dimensions available, and we still have `any(tasksToRemainingStandbys) > 0`, the next implementation in the chain (in this case `LeastLoadedClientStandbyTaskAssignor`) will be called. With this, `LeastLoadedClientStandbyTaskAssignor` can default to assigning the remaining standbys to the least loaded clients. The benefit of this approach is that, right now, `ClientTagAwareStandbyTaskAssignor` does both assignments based on available dimensions and fallback to the least loaded if there are no enough tag dimensions. Cur rent implementation leads to more complex code. While with the approach above, we can clearly separate the implementations without duplication (there's no code duplication, rather "logic" duplication). For now, I've chosen to go with the simplest approach - having two independent implementations and selecting an appropriate one based on passed `AssignmentConfigs.` Still, I wanted to share this idea here, just in case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze edited a comment on pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze edited a comment on pull request #10851: URL: https://github.com/apache/kafka/pull/10851#issuecomment-905514160 The general thought on the implementation. As of now, we choose concrete `StandbyTaskAssignor` implementation based on passed `AssignmentConfigs` value. Instead, an alternative approach would be to have a chained standby task assignment based on multiple implementations. For instance, when have required client tag configuration is present, we can try to assign the standby tasks based on `List.of(new ClientTagAwareStandbyTaskAssignor(), new LeastLoadedClientStandbyTaskAssignor())`. This way, `ClientTagAwareStandbyTaskAssignor` can try to distribute the standby tasks based on tags dimensions. If there are no more client tag dimensions available, and we still have `any(tasksToRemainingStandbys) > 0`, the next implementation in the chain (in this case `LeastLoadedClientStandbyTaskAssignor`) will be called. With this, `LeastLoadedClientStandbyTaskAssignor` can default to assigning the remaining standbys to the least loaded clients. The benefit of this approach is that, right now, `ClientTagAwareStandbyTaskAssignor` does both assignments based on available dimensions and fallback to the least loaded if there are no enough tag dimensions . Current implementation leads to more complex code. While with the approach above, we can clearly separate the implementations without duplication (there's no code duplication, rather "logic" duplication). For now, I've chosen to go with the simplest approach - having two independent implementations and selecting an appropriate one based on passed `AssignmentConfigs.` Still, I wanted to share this idea here, just in case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values
rondagostino commented on pull request #11200: URL: https://github.com/apache/kafka/pull/11200#issuecomment-905514929 This PR is likely going to be replaced by https://github.com/apache/kafka/pull/11256. Will probably eventually close this assuming that other PR gets merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze edited a comment on pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze edited a comment on pull request #10851: URL: https://github.com/apache/kafka/pull/10851#issuecomment-905514160 The general thought on the implementation. As of now, we choose concrete `StandbyTaskAssignor` implementation based on passed `AssignmentConfigs` value. Instead, an alternative approach would be to have a chained standby task assignment based on multiple implementations. For instance, when have required client tag configuration is present, we can try to assign the standby tasks based on `List.of(new ClientTagAwareStandbyTaskAssignor(), new LeastLoadedClientStandbyTaskAssignor())`. This way, `ClientTagAwareStandbyTaskAssignor` can try to distribute the standby tasks based on tags dimensions. If there are no more client tag dimensions available, and we still have `any(tasksToRemainingStandbys) > 0`, the next implementation in the chain (in this case `DefaultStandbyTaskAssignor`) will be called. With this, `DefaultStandbyTaskAssignor` can default to assigning the remaining standbys to the least loaded clients. The benefit of this approach is that, right now, `ClientTagAwareStandbyTaskAssignor` does both assignments based on available dimensions and fallback to the least loaded if there are no enough tag dimensions. Current implementa tion leads to more complex code. While with the approach above, we can clearly separate the implementations without duplication (there's no code duplication, rather "logic" duplication). For now, I've chosen to go with the simplest approach - having two independent implementations and selecting an appropriate one based on passed `AssignmentConfigs.` Still, I wanted to share this idea here, just in case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on pull request #10851: URL: https://github.com/apache/kafka/pull/10851#issuecomment-905514160 The general thought on the implementation. As of now, we choose concrete `StandbyTaskAssignor` implementation based on passed `AssignmentConfigs` value. Instead, an alternative approach would be to have a chained standby task assignment based on multiple implementations. For instance, when have required client tag configuration is present, we can try to assign the standby tasks based on `List.of(new ClientTagAwareStandbyTaskAssignor(), new DefaultStandbyTaskAssignor())`. This way, `ClientTagAwareStandbyTaskAssignor` can try to distribute the standby tasks based on tags dimensions. If there are no more client tag dimensions available, and we still have `any(tasksToRemainingStandbys) > 0`, the next implementation in the chain (in this case `DefaultStandbyTaskAssignor`) will be called. With this, `DefaultStandbyTaskAssignor` can default to assigning the remaining standbys to the least loaded clients. The benefit of this approach is that, right now, `ClientTagAwareStandbyTaskAssignor` does both assignments based on available dimensions and fallback to the least loaded if there are no enough tag dimensions. Current implementation leads to more complex code. While with the approach above, we can clearly separate the implementations without duplication (there's no code duplication, rather "logic" duplication). For now, I've chosen to go with the simplest approach - having two independent implementations and selecting an appropriate one based on passed `AssignmentConfigs.` Still, I wanted to share this idea here, just in case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs
dajac commented on pull request #11104: URL: https://github.com/apache/kafka/pull/11104#issuecomment-905509721 @jolshan Some tests failed and it seems related to the changes done in this PR. Could you check them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r695758175 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java ## @@ -297,6 +297,186 @@ public void shouldPutFetchRangeFromCache() { } } +@Test +public void shouldPutFetchRangeFromCacheForNullKeyFrom() { +cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); +cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); +cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); +cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); +cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + +try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(null, bytesKey("d"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), +"a"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), +"b"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), +"c"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), +"d"); +assertFalse(iterator.hasNext()); +} +} + +@Test +public void shouldPutFetchRangeFromCacheForNullKeyTo() { +cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); +cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); +cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); +cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); +cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + +try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(bytesKey("b"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), +"b"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), +"c"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), +"d"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), +"e"); +assertFalse(iterator.hasNext()); +} +} + +@Test +public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() { +cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); +cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); +cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); +cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); +cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + +try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), +"a"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), +"b"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), +"c"); +verifyWindowedKeyValue( +iterator.next(), +new
[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r695757591 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java ## @@ -74,7 +74,7 @@ public boolean hasNext() { close(); currentSegment = segments.next(); try { -if (from == null || to == null) { +if (from == null && to == null) { if (forward) { currentIterator = currentSegment.all(); Review comment: Good suggestion. Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r695757204 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java ## @@ -124,8 +124,8 @@ final List searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); -final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); -final Bytes binaryTo = keySchema.upperRange(keyTo, to); +final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from); +final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to); Review comment: That's a good question. I'll check the test coverage later. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on pull request #10851: URL: https://github.com/apache/kafka/pull/10851#issuecomment-905490438 Thanks for the feedback @cadonna , I've pushed the new changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r695735854 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, + statefulTasksWithClients); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask(numStandbyReplicas, + taskId, + clientId, + rackAwareAssignmentTags, + clients, + tasksToRemainingStandbys)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static Map computeTasksToRemainingStandbys(final int numStandbyReplicas, final Map statefulTasksWithClients) { +return statefulTasksWithClients.keySet().stream().collect(toMap(Function.identity(), t -> numStandbyReplicas)); +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> clientsPerTagValue, +
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r695736026 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, + statefulTasksWithClients); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask(numStandbyReplicas, + taskId, + clientId, + rackAwareAssignmentTags, + clients, + tasksToRemainingStandbys)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static Map computeTasksToRemainingStandbys(final int numStandbyReplicas, final Map statefulTasksWithClients) { +return statefulTasksWithClients.keySet().stream().collect(toMap(Function.identity(), t -> numStandbyReplicas)); +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> clientsPerTagValue, +
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r695735723 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, + statefulTasksWithClients); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask(numStandbyReplicas, + taskId, + clientId, + rackAwareAssignmentTags, + clients, + tasksToRemainingStandbys)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static Map computeTasksToRemainingStandbys(final int numStandbyReplicas, final Map statefulTasksWithClients) { +return statefulTasksWithClients.keySet().stream().collect(toMap(Function.identity(), t -> numStandbyReplicas)); +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> clientsPerTagValue, +
[GitHub] [kafka] patrickstuedi commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
patrickstuedi commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r695731353 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java ## @@ -931,16 +979,6 @@ public void shouldThrowNullPointerExceptionOnGetNullKey() { assertThrows(NullPointerException.class, () -> windowStore.fetch(null, ofEpochMilli(1L), ofEpochMilli(2L))); } -@Test -public void shouldThrowNullPointerExceptionOnRangeNullFromKey() { -assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 2, ofEpochMilli(1L), ofEpochMilli(2L))); -} - -@Test -public void shouldThrowNullPointerExceptionOnRangeNullToKey() { -assertThrows(NullPointerException.class, () -> windowStore.fetch(1, null, ofEpochMilli(1L), ofEpochMilli(2L))); -} - Review comment: Ok, no, if you have them covered up there that's fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r695717497 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, + statefulTasksWithClients); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask(numStandbyReplicas, + taskId, + clientId, + rackAwareAssignmentTags, + clients, + tasksToRemainingStandbys)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static Map computeTasksToRemainingStandbys(final int numStandbyReplicas, final Map statefulTasksWithClients) { +return statefulTasksWithClients.keySet().stream().collect(toMap(Function.identity(), t -> numStandbyReplicas)); +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> clientsPerTagValue, +
[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r695715007 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java ## @@ -931,16 +979,6 @@ public void shouldThrowNullPointerExceptionOnGetNullKey() { assertThrows(NullPointerException.class, () -> windowStore.fetch(null, ofEpochMilli(1L), ofEpochMilli(2L))); } -@Test -public void shouldThrowNullPointerExceptionOnRangeNullFromKey() { -assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 2, ofEpochMilli(1L), ofEpochMilli(2L))); -} - -@Test -public void shouldThrowNullPointerExceptionOnRangeNullToKey() { -assertThrows(NullPointerException.class, () -> windowStore.fetch(1, null, ofEpochMilli(1L), ofEpochMilli(2L))); -} - Review comment: Yes, but I've already tested these 2 test cases above (i.e. `testFetchRange` and `testBackwardFetchRange`). I don't think we should test them again here. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
cadonna commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r695669488 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, + statefulTasksWithClients); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask(numStandbyReplicas, + taskId, + clientId, + rackAwareAssignmentTags, + clients, + tasksToRemainingStandbys)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static Map computeTasksToRemainingStandbys(final int numStandbyReplicas, final Map statefulTasksWithClients) { +return statefulTasksWithClients.keySet().stream().collect(toMap(Function.identity(), t -> numStandbyReplicas)); +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> clientsPerTagValue, +
[GitHub] [kafka] cadonna commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
cadonna commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r695633134 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, + statefulTasksWithClients); Review comment: ```suggestion final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( numStandbyReplicas, statefulTasksWithClients ); ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import
[GitHub] [kafka] patrickstuedi commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
patrickstuedi commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r695631238 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java ## @@ -74,7 +74,7 @@ public boolean hasNext() { close(); currentSegment = segments.next(); try { -if (from == null || to == null) { +if (from == null && to == null) { if (forward) { currentIterator = currentSegment.all(); Review comment: Isn't it sufficient to distinguish the forward and reverse cases and just call range(from, to) or reverseRange(from, to)? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java ## @@ -124,8 +124,8 @@ final List searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); -final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); -final Bytes binaryTo = keySchema.upperRange(keyTo, to); +final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from); +final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to); Review comment: Do you have tests for this store? Generally can you make sure that your tests cover all the stores you added infinite range support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
cadonna commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r695567992 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() { assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0)); } +@Test +public void testMeasureCommitSyncDuration() { +// use a consumer that will throw to ensure we return quickly Review comment: I think we also need to test the case without failure. Otherwise, we assume in the test that the measurement is in the `finally` clause which we should not assume but rather ensure with unit tests. Same applies to the test below. ## File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.kafka.common.metrics.Metrics; +import org.junit.jupiter.api.Test; Review comment: ```suggestion import org.apache.kafka.common.metrics.Metrics; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; ``` ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() { } } +private double getAndAssertDuration(KafkaProducer producer, String name, double floor) { +double value = getMetricValue(producer, name); +assertTrue(value > floor); +return value; +} + +@Test +public void testMeasureTransactionDurations() { +Map configs = new HashMap<>(); +configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id"); +configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); +Time time = new MockTime(1); +MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); +ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); + +MockClient client = new MockClient(time, metadata); +client.updateMetadata(initialUpdateResponse); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1)); +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); + +try (KafkaProducer producer = kafkaProducer(configs, new StringSerializer(), +new StringSerializer(), metadata, client, null, time)) { +producer.initTransactions(); +assertTrue(getMetricValue(producer, "txn-init-time-total") > 99); Review comment: I now saw that in the consumer tests you use `Duration.ofSeconds(1).toMillis()` and `Duration.ofMillis(999).toNanos()`. This makes it already clearer. I think a variable with a meaningful name for the lower bound would make it even clearer. ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() { assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0)); } +@Test +public void testMeasureCommitSyncDuration() { +// use a consumer that will throw to ensure we return quickly +Time time = new MockTime(Duration.ofSeconds(1).toMillis()); +SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); +ConsumerMetadata metadata = createMetadata(subscription); +MockClient client = new MockClient(time, metadata); +initMetadata(client, singletonMap(topic, 1)); +Node node = metadata.fetch().nodes().get(0); +ConsumerPartitionAssignor assignor = new RangeAssignor(); +client.createP
[GitHub] [kafka] patrickstuedi commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
patrickstuedi commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r695535522 ## File path: checkstyle/suppressions.xml ## @@ -160,6 +160,9 @@ + Review comment: I guess this is because of InMemoryWindowStore::isKeyWithinRange? Can we make that method more readable and at the same time avoid having to do this? ## File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java ## @@ -128,12 +128,13 @@ * This iterator must be closed after use. * * @param keyFrom the first key in the range + * A null value indicates a starting position from the first element in the store. Review comment: Can you make those extra lines have the same indentation than the previous line so it can easily be seen that they belong together? ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java ## @@ -931,16 +979,6 @@ public void shouldThrowNullPointerExceptionOnGetNullKey() { assertThrows(NullPointerException.class, () -> windowStore.fetch(null, ofEpochMilli(1L), ofEpochMilli(2L))); } -@Test -public void shouldThrowNullPointerExceptionOnRangeNullFromKey() { -assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 2, ofEpochMilli(1L), ofEpochMilli(2L))); -} - -@Test -public void shouldThrowNullPointerExceptionOnRangeNullToKey() { -assertThrows(NullPointerException.class, () -> windowStore.fetch(1, null, ofEpochMilli(1L), ofEpochMilli(2L))); -} - Review comment: Instead of deleting maybe you want to change the name and check that the the store returns the right values. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java ## @@ -462,14 +460,21 @@ public boolean hasNext() { } final Bytes key = getKey(next.key); -if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) { +if (isKeyWithinRange(key)) { return true; } else { next = null; return hasNext(); } } +private boolean isKeyWithinRange(final Bytes key) { Review comment: Per comment above, can you make this method more readable by splitting the statements? ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java ## @@ -297,6 +297,186 @@ public void shouldPutFetchRangeFromCache() { } } +@Test +public void shouldPutFetchRangeFromCacheForNullKeyFrom() { +cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); +cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); +cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); +cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); +cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + +try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(null, bytesKey("d"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), +"a"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), +"b"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), +"c"); +verifyWindowedKeyValue( +iterator.next(), +new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), +"d"); +assertFalse(iterator.hasNext()); +} +} + +@Test +public void shouldPutFetchRangeFromCacheForNullKeyTo() { +cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); +cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); +cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); +cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); +cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + +try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(bytesKey("b"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
[jira] [Created] (KAFKA-13226) Partition expansion may cause uneven distribution
shizhenzhen created KAFKA-13226: --- Summary: Partition expansion may cause uneven distribution Key: KAFKA-13226 URL: https://issues.apache.org/jira/browse/KAFKA-13226 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 2.6.2, 2.7.1, 2.8.0, 2.5.0 Environment: mac kafka-2.5.0 Reporter: shizhenzhen {color:#ff}*Partition expansion may cause uneven distribution*{color} 1. Create a Topic , 3-partition 1-replical !https://img-blog.csdnimg.cn/561112064b114acfb03882aa09100e0e.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_Q1NETiBA55-z6Ie76Ie755qE5p2C6LSn6ZO6,size_55,color_FF,t_70,g_se,x_16! 2. partition expansion to 5 - partiton !https://img-blog.csdnimg.cn/f7c3c33b6662457080d9bb5bb190c0c2.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_Q1NETiBA55-z6Ie76Ie755qE5p2C6LSn6ZO6,size_49,color_FF,t_70,g_se,x_16! 3. Does this meet expectations ? !https://img-blog.csdnimg.cn/20cc1007c4214c4ebfcb1b2c2eeb98e4.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_Q1NETiBA55-z6Ie76Ie755qE5p2C6LSn6ZO6,size_18,color_FF,t_70,g_se,x_16! {color:#ff}*so this is a bug ?*{color} The problem may arise here ; When we create a new topic . get the broker list is a Object Map ; *This is disordered* you can read the code , first it have sortBy brokerId, but finally it convert to a *disorde Map;* !https://img-blog.csdnimg.cn/131b9bf0c19e4753a73512af4c9c5854.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_Q1NETiBA55-z6Ie76Ie755qE5p2C6LSn6ZO6,size_66,color_FF,t_70,g_se,x_16! The important thing is that it has been sorted when expanding the partition and parition-reassignment ; {color:#ff}*So why not sort when creating topics?*{color} If the topic is sorted when create a new topic , this problem will not occur ; so it maybe is a tiny bug ? if you can read Chinese , You can look at this article. I describe it in detail We look forward to receiving your reply 如果你能看懂中文,可以看看这篇文章 我描述的很详细! [This may be a Kafka bug?|[https://shirenchuang.blog.csdn.net/article/details/119912418]] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
cadonna commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r695498527 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java ## @@ -41,6 +43,7 @@ public void process(final Record record) { final Iterable> newKeyValues = mapper.apply(record.key(), record.value()); +Objects.requireNonNull(newKeyValues, String.format("KeyValueMapper can't return null from mapping the record: %s", record)); Review comment: Just a suggestion: ```suggestion Objects.requireNonNull(newKeyValues, "The provided KeyValueMapper returned null which is not allowed."); ``` BTW, we should not output records since they might contain sensitive data. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java ## @@ -42,6 +44,7 @@ public KStreamMap(final KeyValueMapper record) { final KeyValue newPair = mapper.apply(record.key(), record.value()); +Objects.requireNonNull(newPair, String.format("KeyValueMapper can't return null from mapping the record: %s", record)); Review comment: Just a suggestion: ```suggestion Objects.requireNonNull(newPair, "The provided KeyValueMapper returned null which is not allowed."); ``` BTW, we should not output records since they might contain sensitive data. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java ## @@ -68,6 +70,14 @@ public void testMap() { } } +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamMap supplier = new KStreamMap<>((key, value) -> null); +final Record record = new Record<>("K", 0, 0L); +final Throwable throwable = assertThrows(NullPointerException.class, () -> supplier.get().process(record)); +assertEquals(throwable.getMessage(), String.format("KeyValueMapper can't return null from mapping the record: %s", record)); Review comment: See my comment above. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,12 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +final Record record = new Record<>("K", 0, 0L); +final Throwable throwable = assertThrows(NullPointerException.class, () -> supplier.get().process(record)); +assertEquals(throwable.getMessage(), String.format("KeyValueMapper can't return null from mapping the record: %s", record)); Review comment: We prefer to use `assertThat()`: ```suggestion assertThat(throwable.getMessage(), is(...); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-5666) Need feedback to user if consumption fails due to offsets.topic.replication.factor=3
[ https://issues.apache.org/jira/browse/KAFKA-5666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404267#comment-17404267 ] Rens Groothuijsen commented on KAFKA-5666: -- [~ll1124278064] No, I'm no longer working on this one, so go right ahead. > Need feedback to user if consumption fails due to > offsets.topic.replication.factor=3 > > > Key: KAFKA-5666 > URL: https://issues.apache.org/jira/browse/KAFKA-5666 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 0.11.0.0 >Reporter: Yeva Byzek >Priority: Major > Labels: newbie, usability > > Introduced in 0.11: The offsets.topic.replication.factor broker config is now > enforced upon auto topic creation. Internal auto topic creation will fail > with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets > this replication factor requirement. > Issue: Default is setting offsets.topic.replication.factor=3, but in > development and docker environments where there is only 1 broker, the offsets > topic will fail to be created when a consumer tries to consume and no records > will be returned. As a result, the user experience is bad. The user may > have no idea about this setting change and enforcement, and they just see > that `kafka-console-consumer` hangs with ZERO output. It is true that the > broker log file will provide a message (e.g. {{ERROR [KafkaApi-1] Number of > alive brokers '1' does not meet the required replication factor '3' for the > offsets topic (configured via 'offsets.topic.replication.factor'). This error > can be ignored if the cluster is starting up and not all brokers are up yet. > (kafka.server.KafkaApis)}}) but many users do not have access to the log > files or know how to get them. > Suggestion: give feedback to the user/app if offsets topic cannot be created. > For example, after some timeout. > Workaround: > Set offsets.topic.replication.factor=3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r695463229 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, + statefulTasksWithClients); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask(numStandbyReplicas, + taskId, + clientId, + rackAwareAssignmentTags, + clients, + tasksToRemainingStandbys)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static Map computeTasksToRemainingStandbys(final int numStandbyReplicas, final Map statefulTasksWithClients) { Review comment: This method is needed in `ClientTagAwareStandbyTaskAssignor` and `DefaultStandbyTaskAssignor`. Was thinking to create `StandbyTaskAssignmentUtils` and extract this logic in there. Wdyt @cadonna ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on t