[GitHub] [kafka] chia7712 commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
chia7712 commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r518564543 ## File path: clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java ## @@ -91,4 +91,14 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public static EnvelopeRequest parse(ByteBuffer buffer, short version) { return new EnvelopeRequest(ApiKeys.ENVELOPE.parseRequest(version, buffer), version); } + +public EnvelopeRequestData data() { +return data; +} + +@Override +public Send toSend(String destination, RequestHeader header) { +return SendBuilder.buildRequestSend(destination, header, this.data); Review comment: not sure whether it is ok to ignore the version of EnvelopeRequest. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10667) Add timeout for forwarding requests
[ https://issues.apache.org/jira/browse/KAFKA-10667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-10667: --- Assignee: Boyang Chen > Add timeout for forwarding requests > --- > > Key: KAFKA-10667 > URL: https://issues.apache.org/jira/browse/KAFKA-10667 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > It makes sense to handle timeout for forwarding request coming from the > client, instead of retry indefinitely. We could either use the api timeout, > or a customized timeout hook which could be defined by different request > types. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10674) Brokers should know the active controller ApiVersion after enabling KIP-590 forwarding
[ https://issues.apache.org/jira/browse/KAFKA-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10674: Parent: KAFKA-9705 Issue Type: Sub-task (was: Improvement) > Brokers should know the active controller ApiVersion after enabling KIP-590 > forwarding > -- > > Key: KAFKA-10674 > URL: https://issues.apache.org/jira/browse/KAFKA-10674 > Project: Kafka > Issue Type: Sub-task > Components: clients, core >Reporter: Boyang Chen >Priority: Major > > Admin clients send ApiVersions to the broker upon the first connection > establishes. The tricky thing after forwarding is enabled is that for > forwardable APIs, admin client needs to know a commonly-agreed range of > ApiVersions among handling broker, active controller and itself. > Right now the inter-broker APIs are guaranteed by IBP constraints, but not > for forwardable APIs. A compromised solution would be to put all forwardable > APIs under IBP, which is brittle and hard to maintain consistency. > Instead, any broker connecting to the active controller should send an > ApiVersion request from beginning, so it is easy to compute that information > and send back to the admin clients upon ApiVersion request from admin. Any > rolling of the active controller will trigger reconnection between broker and > controller, which guarantees a refreshed ApiVersions between the two. This > approach avoids the tight bond with IBP and broker could just close the > connection between admin client to trigger retry logic and refreshing of the > ApiVersions. Since this failure should be rare, two round-trips and timeout > delays are well compensated by the less engineering work. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (KAFKA-10691) AlterIsr Respond with wrong Error Id
[ https://issues.apache.org/jira/browse/KAFKA-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming closed KAFKA-10691. -- > AlterIsr Respond with wrong Error Id > > > Key: KAFKA-10691 > URL: https://issues.apache.org/jira/browse/KAFKA-10691 > Project: Kafka > Issue Type: Sub-task > Components: controller >Reporter: dengziming >Assignee: dengziming >Priority: Minor > > AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, > which should be UNKNOWN_MEMBER_ID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10691) AlterIsr Respond with wrong Error Id
[ https://issues.apache.org/jira/browse/KAFKA-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227199#comment-17227199 ] dengziming commented on KAFKA-10691: Sorry, I misunderstood the `STALE_BROKER_EPOCH`, I just inspect The KIP-497 and there isn't a `{{UNKNOWN_MEMBER_ID}}`. > AlterIsr Respond with wrong Error Id > > > Key: KAFKA-10691 > URL: https://issues.apache.org/jira/browse/KAFKA-10691 > Project: Kafka > Issue Type: Sub-task > Components: controller >Reporter: dengziming >Assignee: dengziming >Priority: Minor > > AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, > which should be UNKNOWN_MEMBER_ID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10691) AlterIsr Respond with wrong Error Id
[ https://issues.apache.org/jira/browse/KAFKA-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming resolved KAFKA-10691. Resolution: Abandoned > AlterIsr Respond with wrong Error Id > > > Key: KAFKA-10691 > URL: https://issues.apache.org/jira/browse/KAFKA-10691 > Project: Kafka > Issue Type: Sub-task > Components: controller >Reporter: dengziming >Assignee: dengziming >Priority: Minor > > AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, > which should be UNKNOWN_MEMBER_ID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming commented on a change in pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id
dengziming commented on a change in pull request #9571: URL: https://github.com/apache/kafka/pull/9571#discussion_r518555329 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2261,7 +2261,7 @@ class KafkaController(val config: KafkaConfig, val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) if (brokerEpochOpt.isEmpty) { info(s"Ignoring AlterIsr due to unknown broker $brokerId") - callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + callback.apply(Right(Errors.UNKNOWN_MEMBER_ID)) Review comment: You are right, I just inspect The KIP-497 and there isn't a `UNKNOWN_MEMBER_ID`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10674) Brokers should know the active controller ApiVersion after enabling KIP-590 forwarding
[ https://issues.apache.org/jira/browse/KAFKA-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10674: Description: Admin clients send ApiVersions to the broker upon the first connection establishes. The tricky thing after forwarding is enabled is that for forwardable APIs, admin client needs to know a commonly-agreed range of ApiVersions among handling broker, active controller and itself. Right now the inter-broker APIs are guaranteed by IBP constraints, but not for forwardable APIs. A compromised solution would be to put all forwardable APIs under IBP, which is brittle and hard to maintain consistency. Instead, any broker connecting to the active controller should send an ApiVersion request from beginning, so it is easy to compute that information and send back to the admin clients upon ApiVersion request from admin. Any rolling of the active controller will trigger reconnection between broker and controller, which guarantees a refreshed ApiVersions between the two. This approach avoids the tight bond with IBP and broker could just close the connection between admin client to trigger retry logic and refreshing of the ApiVersions. Since this failure should be rare, two round-trips and timeout delays are well compensated by the less engineering work. was: Admin clients send ApiVersions to the broker upon the first connection establishes. The tricky thing after forwarding is enabled is that for forwardable APIs, admin client needs to know a commonly-agreed rang of ApiVersions among handling broker, active controller and itself. Right now the inter-broker APIs are guaranteed by IBP constraints, but not for forwardable APIs. A compromised solution would be to put all forwardable APIs under IBP, which is brittle and hard to maintain consistency. Instead, any broker connecting to the active controller should send an ApiVersion request from beginning, so it is easy to compute that information and send back to the admin clients upon ApiVersion request from admin. Any rolling of the active controller will trigger reconnection between broker and controller, which guarantees a refreshed ApiVersions between the two. This approach avoids the tight bond with IBP and broker could just close the connection between admin client to trigger retry logic and refreshing of the ApiVersions. Since this failure should be rare, two round-trips and timeout delays are well compensated by the less engineering work. > Brokers should know the active controller ApiVersion after enabling KIP-590 > forwarding > -- > > Key: KAFKA-10674 > URL: https://issues.apache.org/jira/browse/KAFKA-10674 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Reporter: Boyang Chen >Priority: Major > > Admin clients send ApiVersions to the broker upon the first connection > establishes. The tricky thing after forwarding is enabled is that for > forwardable APIs, admin client needs to know a commonly-agreed range of > ApiVersions among handling broker, active controller and itself. > Right now the inter-broker APIs are guaranteed by IBP constraints, but not > for forwardable APIs. A compromised solution would be to put all forwardable > APIs under IBP, which is brittle and hard to maintain consistency. > Instead, any broker connecting to the active controller should send an > ApiVersion request from beginning, so it is easy to compute that information > and send back to the admin clients upon ApiVersion request from admin. Any > rolling of the active controller will trigger reconnection between broker and > controller, which guarantees a refreshed ApiVersions between the two. This > approach avoids the tight bond with IBP and broker could just close the > connection between admin client to trigger retry logic and refreshing of the > ApiVersions. Since this failure should be rare, two round-trips and timeout > delays are well compensated by the less engineering work. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming closed pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id
dengziming closed pull request #9571: URL: https://github.com/apache/kafka/pull/9571 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id
dengziming commented on a change in pull request #9571: URL: https://github.com/apache/kafka/pull/9571#discussion_r518554842 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2261,7 +2261,7 @@ class KafkaController(val config: KafkaConfig, val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) if (brokerEpochOpt.isEmpty) { info(s"Ignoring AlterIsr due to unknown broker $brokerId") - callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + callback.apply(Right(Errors.UNKNOWN_MEMBER_ID)) Review comment: Thank you for your reply, here we have 2 judgement. the first is `if (brokerEpochOpt.isEmpty)` and second is `if(!brokerEpochOpt.contains(brokerEpoch))`, they both return `STALE_BROKER_EPOCH `. maybe the first is meant to return`UNKNOWN_MEMBER_ID `. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id
chia7712 commented on a change in pull request #9571: URL: https://github.com/apache/kafka/pull/9571#discussion_r518550582 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2261,7 +2261,7 @@ class KafkaController(val config: KafkaConfig, val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) if (brokerEpochOpt.isEmpty) { info(s"Ignoring AlterIsr due to unknown broker $brokerId") - callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + callback.apply(Right(Errors.UNKNOWN_MEMBER_ID)) Review comment: not sure whether ```UNKNOWN_MEMBER_ID`` is more suitable than ```STALE_BROKER_EPOCH```. The concept of member id is not equal to the broker id here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id
chia7712 commented on a change in pull request #9571: URL: https://github.com/apache/kafka/pull/9571#discussion_r518550582 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2261,7 +2261,7 @@ class KafkaController(val config: KafkaConfig, val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) if (brokerEpochOpt.isEmpty) { info(s"Ignoring AlterIsr due to unknown broker $brokerId") - callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + callback.apply(Right(Errors.UNKNOWN_MEMBER_ID)) Review comment: not sure whether ```UNKNOWN_MEMBER_ID`` is more suitable than ```STALE_BROKER_EPOCH```. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r518549809 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Writable.java ## @@ -33,6 +35,23 @@ void writeVarint(int i); void writeVarlong(long i); +default void writeApiMessage( Review comment: Fair enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id
dengziming commented on pull request #9571: URL: https://github.com/apache/kafka/pull/9571#issuecomment-722889382 @hachikuji hi, PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id
dengziming opened a new pull request #9571: URL: https://github.com/apache/kafka/pull/9571 AlterIsr send by an unknown broker will respond with a STALE_BROKER_EPOCH, which should be UNKNOWN_MEMBER_ID. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10691) AlterIsr Respond with wrong Error Id
[ https://issues.apache.org/jira/browse/KAFKA-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming updated KAFKA-10691: --- Summary: AlterIsr Respond with wrong Error Id (was: AlterIsrR Respond with wrong Error Id) > AlterIsr Respond with wrong Error Id > > > Key: KAFKA-10691 > URL: https://issues.apache.org/jira/browse/KAFKA-10691 > Project: Kafka > Issue Type: Sub-task > Components: controller >Reporter: dengziming >Assignee: dengziming >Priority: Minor > > AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, > which should be UNKNOWN_MEMBER_ID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10691) AlterIsrR Respond with wrong Error Id
dengziming created KAFKA-10691: -- Summary: AlterIsrR Respond with wrong Error Id Key: KAFKA-10691 URL: https://issues.apache.org/jira/browse/KAFKA-10691 Project: Kafka Issue Type: Sub-task Components: controller Reporter: dengziming Assignee: dengziming AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, which should be UNKNOWN_MEMBER_ID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9837) New RPC for notifying controller of failed replica
[ https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming reassigned KAFKA-9837: - Assignee: dengziming > New RPC for notifying controller of failed replica > -- > > Key: KAFKA-9837 > URL: https://issues.apache.org/jira/browse/KAFKA-9837 > Project: Kafka > Issue Type: Sub-task > Components: controller, core >Reporter: David Arthur >Assignee: dengziming >Priority: Major > Fix For: 2.8.0 > > > This is the tracking ticket for > [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller]. > For the bridge release, brokers should no longer use ZooKeeper to notify the > controller that a log dir has failed. It should instead use an RPC mechanism. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9837) New RPC for notifying controller of failed replica
[ https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227182#comment-17227182 ] dengziming commented on KAFKA-9837: --- I will do this. > New RPC for notifying controller of failed replica > -- > > Key: KAFKA-9837 > URL: https://issues.apache.org/jira/browse/KAFKA-9837 > Project: Kafka > Issue Type: Sub-task > Components: controller, core >Reporter: David Arthur >Priority: Major > Fix For: 2.8.0 > > > This is the tracking ticket for > [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller]. > For the bridge release, brokers should no longer use ZooKeeper to notify the > controller that a log dir has failed. It should instead use an RPC mechanism. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit
mjsax commented on a change in pull request #9570: URL: https://github.com/apache/kafka/pull/9570#discussion_r518540480 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -997,7 +1016,7 @@ void shutdown(final boolean clean) { // For testing only. int commitAll() { -return commit(tasks.values()); +return commit(new HashSet<>(tasks.values())); Review comment: Need to make a copy (as all production calls do, too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit
mjsax commented on a change in pull request #9570: URL: https://github.com/apache/kafka/pull/9570#discussion_r518540292 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -903,6 +913,15 @@ void shutdown(final boolean clean) { tasksToCloseClean.remove(task); } } +} catch (final TaskTimeoutExceptions taskTimeoutExceptions) { Review comment: During shutdown, we don't need to trigger `task.timeout.ms` but can re-throw the `TimeoutException` to trigger a "dirty close" instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit
mjsax commented on a change in pull request #9570: URL: https://github.com/apache/kafka/pull/9570#discussion_r518540199 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -570,6 +575,11 @@ void handleRevocation(final Collection revokedPartitions) { // so we would capture any exception and throw try { commitOffsetsOrTransaction(consumedOffsetsPerTask); +} catch (final TaskTimeoutExceptions taskTimeoutExceptions) { Review comment: If a task is revoked, we don't need to trigger `task.timeout.ms` but can re-throw the `TimeoutException` to trigger a "dirty close" instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit
mjsax commented on a change in pull request #9570: URL: https://github.com/apache/kafka/pull/9570#discussion_r518539800 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ## @@ -202,6 +204,10 @@ private void recordSendError(final String topic, final Exception exception, fina "indicating the task may be migrated out"; sendException.set(new TaskMigratedException(errorMessage, exception)); } else { +// TODO: KIP-572 handle `TimeoutException extends RetriableException` Review comment: As above: This is an open question. Input is welcome. I would like to tackle it in a follow up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit
mjsax commented on a change in pull request #9570: URL: https://github.com/apache/kafka/pull/9570#discussion_r518539735 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ## @@ -111,6 +111,8 @@ public void initialize() { try { partitions = streamsProducer.partitionsFor(topic); } catch (final KafkaException e) { +// TODO: KIP-572 need to handle `TimeoutException` +// -> should we throw a `TaskCorruptedException` for this case to reset the task and retry (including triggering `task.timeout.ms`) ? Review comment: This is an open question. Input is welcome. I would like to tackle it in a follow up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit
mjsax commented on a change in pull request #9570: URL: https://github.com/apache/kafka/pull/9570#discussion_r518539527 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -164,9 +164,9 @@ public InternalTopicManager(final Time time, "Error message was: {}", topicName, cause.toString()); throw new StreamsException(String.format("Could not create topic %s.", topicName), cause); } -} catch (final TimeoutException retryableException) { +} catch (final TimeoutException retriableException) { Review comment: Just fixing some naming issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit
mjsax commented on a change in pull request #9570: URL: https://github.com/apache/kafka/pull/9570#discussion_r518539416 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/TaskTimeoutExceptions.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.streams.processor.internals.Task; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class TaskTimeoutExceptions extends StreamsException { Review comment: If we commit multiple tasks individually (ie, eos-alpha), we use this class as an "exception container" to track the TimeoutException for each failed task individually. To simplify the caller code, we also wrap a single `TimeoutException` if we commit all tasks at once (at-least-once, eos-beta) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #9570: KAFKA-9274: Handle TimeoutException on commit
mjsax opened a new pull request #9570: URL: https://github.com/apache/kafka/pull/9570 - part of KIP-572 - when KafkaStreams commit a task, a TimeoutException should not kill the thread but `task.timeout.ms` should be triggered and the commit should be retried in the next loop Call for review @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r518532228 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Writable.java ## @@ -33,6 +35,23 @@ void writeVarint(int i); void writeVarlong(long i); +default void writeApiMessage( +ApiMessage message, +ObjectSerializationCache serializationCache, +short version +) { +message.write(this, serializationCache, version); +} + +default void writeRecords(BaseRecords records) { Review comment: I was concerned about this also, but the generated code adds its own null check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r518523253 ## File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java ## @@ -1581,56 +1570,56 @@ private void generateVariableLengthFieldSize(FieldSpec field, headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS); buffer.printf("_cache.setArraySizeInBytes(%s, _arraySize);%n", field.camelCaseName()); -buffer.printf("_size += _arraySize + ByteUtils.sizeOfUnsignedVarint(_arraySize);%n"); + buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_arraySize.totalSize()));%n"); +buffer.printf("_size.add(_arraySize);%n"); } else { -buffer.printf("_size += _arraySize;%n"); +buffer.printf("_size.add(_arraySize);%n"); } } else if (field.type().isBytes()) { +buffer.printf("MessageSize _bytesSize = new MessageSize();%n"); if (field.zeroCopy()) { -buffer.printf("int _bytesSize = %s.remaining();%n", field.camelCaseName()); + buffer.printf("_bytesSize.addZeroCopyBytes(%s.remaining());%n", field.camelCaseName()); } else { -buffer.printf("int _bytesSize = %s.length;%n", field.camelCaseName()); +buffer.printf("_bytesSize.addBytes(%s.length);%n", field.camelCaseName()); } VersionConditional.forVersions(fieldFlexibleVersions(field), possibleVersions). ifMember(__ -> { headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS); if (field.zeroCopy()) { -buffer.printf("_bytesSize += " + - "ByteUtils.sizeOfUnsignedVarint(%s.remaining() + 1);%n", field.camelCaseName()); +buffer.printf("_bytesSize.addBytes(" + + "ByteUtils.sizeOfUnsignedVarint(%s.remaining() + 1));%n", field.camelCaseName()); } else { -buffer.printf("_bytesSize += ByteUtils.sizeOfUnsignedVarint(%s.length + 1);%n", + buffer.printf("_bytesSize.addBytes(ByteUtils.sizeOfUnsignedVarint(%s.length + 1));%n", field.camelCaseName()); } }). ifNotMember(__ -> { -buffer.printf("_bytesSize += 4;%n"); +buffer.printf("_bytesSize.addBytes(4);%n"); }). generate(buffer); if (tagged) { headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS); -buffer.printf("_size += _bytesSize + ByteUtils.sizeOfUnsignedVarint(_bytesSize);%n"); -} else { -buffer.printf("_size += _bytesSize;%n"); + buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_bytesSize.totalSize()));%n"); } +buffer.printf("_size.add(_bytesSize);%n"); } else if (field.type().isRecords()) { -buffer.printf("int _recordsSize = %s.sizeInBytes();%n", field.camelCaseName()); +buffer.printf("_size.addBytes(%s.sizeInBytes());%n", field.camelCaseName()); Review comment: Yeah, my bad. Probably messed this up after renaming. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10604) The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings
[ https://issues.apache.org/jira/browse/KAFKA-10604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227164#comment-17227164 ] Matthias J. Sax commented on KAFKA-10604: - Thanks. Good to see that we are in agreement. Will review the PR soon. Thanks for the ticket and PR [~dongjin]! > The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM > parameter or OS-specific settings > - > > Key: KAFKA-10604 > URL: https://issues.apache.org/jira/browse/KAFKA-10604 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Minor > > I found this problem working for > [KAFKA-10585|https://issues.apache.org/jira/browse/KAFKA-10585]. > The JVM's temporary directory location is different per OS, and JVM allows to > change it with `java.io.tmpdir` system property. In Linux, it defaults to > `/tmp`. > The problem is the default value of StreamsConfig.STATE_DIR_CONFIG > (`state.dir`) is fixed to `/tmp/kafka-streams`. For this reason, it does not > change if the runs on OS other than Linux or the user specifies > `java.io.tmpdir` system property. > It should be `\{temp-directory}/kafka-streams`, not `/tmp/kafka-streams`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
chia7712 commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r518513351 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Writable.java ## @@ -33,6 +35,23 @@ void writeVarint(int i); void writeVarlong(long i); +default void writeApiMessage( Review comment: It is used by only ```SendBuilder```. How about moving it to ```SendBuilder```? ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Writable.java ## @@ -33,6 +35,23 @@ void writeVarint(int i); void writeVarlong(long i); +default void writeApiMessage( +ApiMessage message, +ObjectSerializationCache serializationCache, +short version +) { +message.write(this, serializationCache, version); +} + +default void writeRecords(BaseRecords records) { Review comment: Does it need null check (maybe no-op)? ## File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java ## @@ -1581,56 +1570,56 @@ private void generateVariableLengthFieldSize(FieldSpec field, headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS); buffer.printf("_cache.setArraySizeInBytes(%s, _arraySize);%n", field.camelCaseName()); -buffer.printf("_size += _arraySize + ByteUtils.sizeOfUnsignedVarint(_arraySize);%n"); + buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_arraySize.totalSize()));%n"); +buffer.printf("_size.add(_arraySize);%n"); } else { -buffer.printf("_size += _arraySize;%n"); +buffer.printf("_size.add(_arraySize);%n"); } } else if (field.type().isBytes()) { +buffer.printf("MessageSize _bytesSize = new MessageSize();%n"); if (field.zeroCopy()) { -buffer.printf("int _bytesSize = %s.remaining();%n", field.camelCaseName()); + buffer.printf("_bytesSize.addZeroCopyBytes(%s.remaining());%n", field.camelCaseName()); } else { -buffer.printf("int _bytesSize = %s.length;%n", field.camelCaseName()); +buffer.printf("_bytesSize.addBytes(%s.length);%n", field.camelCaseName()); } VersionConditional.forVersions(fieldFlexibleVersions(field), possibleVersions). ifMember(__ -> { headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS); if (field.zeroCopy()) { -buffer.printf("_bytesSize += " + - "ByteUtils.sizeOfUnsignedVarint(%s.remaining() + 1);%n", field.camelCaseName()); +buffer.printf("_bytesSize.addBytes(" + + "ByteUtils.sizeOfUnsignedVarint(%s.remaining() + 1));%n", field.camelCaseName()); } else { -buffer.printf("_bytesSize += ByteUtils.sizeOfUnsignedVarint(%s.length + 1);%n", + buffer.printf("_bytesSize.addBytes(ByteUtils.sizeOfUnsignedVarint(%s.length + 1));%n", field.camelCaseName()); } }). ifNotMember(__ -> { -buffer.printf("_bytesSize += 4;%n"); +buffer.printf("_bytesSize.addBytes(4);%n"); }). generate(buffer); if (tagged) { headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS); -buffer.printf("_size += _bytesSize + ByteUtils.sizeOfUnsignedVarint(_bytesSize);%n"); -} else { -buffer.printf("_size += _bytesSize;%n"); + buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_bytesSize.totalSize()));%n"); } +buffer.printf("_size.add(_bytesSize);%n"); } else if (field.type().isRecords()) { -buffer.printf("int _recordsSize = %s.sizeInBytes();%n", field.camelCaseName()); +buffer.printf("_size.addBytes(%s.sizeInBytes());%n", field.camelCaseName()); Review comment: Why it is not ```addZeroCopyBytes```? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage
vamossagar12 commented on a change in pull request #9539: URL: https://github.com/apache/kafka/pull/9539#discussion_r518513246 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -317,6 +317,9 @@ private void appendLeaderChangeMessage(LeaderState state, long currentTimeMs) { .map(follower -> new Voter().setVoterId(follower)) .collect(Collectors.toList()); +// Adding the leader to the voters as the protocol ensures that leader always votes for itself. +voters.add(new Voter().setVoterId(state.election().leaderId())); Review comment: @hachikuji i have made the changes you had suggested. cc @jsancio This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH
abbccdda commented on pull request #9569: URL: https://github.com/apache/kafka/pull/9569#issuecomment-722795850 @edenhill FYI This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH
abbccdda opened a new pull request #9569: URL: https://github.com/apache/kafka/pull/9569 In KIP-588, we added the new `PRODUCER_FENCED` error code 90 to represent existing ProducerFencedException, which would be unknown error for producer client who sends ProduceRequest and doesn't know how to handle it. The fix is to revise the error code back to the known `INVALID_PRODUCER_EPOCH` in the ProduceResponse. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10604) The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings
[ https://issues.apache.org/jira/browse/KAFKA-10604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227150#comment-17227150 ] John Roesler commented on KAFKA-10604: -- I agree; I do not think this needs a KIP. It seems clear that the intent was to use "the temporary directory", and it was simply a bug/oversight to hard-code "/tmp" instead of using the platform-indendent "java.io.tmpdir". In fact, I've just reclassified this as a bug. > The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM > parameter or OS-specific settings > - > > Key: KAFKA-10604 > URL: https://issues.apache.org/jira/browse/KAFKA-10604 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Minor > > I found this problem working for > [KAFKA-10585|https://issues.apache.org/jira/browse/KAFKA-10585]. > The JVM's temporary directory location is different per OS, and JVM allows to > change it with `java.io.tmpdir` system property. In Linux, it defaults to > `/tmp`. > The problem is the default value of StreamsConfig.STATE_DIR_CONFIG > (`state.dir`) is fixed to `/tmp/kafka-streams`. For this reason, it does not > change if the runs on OS other than Linux or the user specifies > `java.io.tmpdir` system property. > It should be `\{temp-directory}/kafka-streams`, not `/tmp/kafka-streams`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10604) The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings
[ https://issues.apache.org/jira/browse/KAFKA-10604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10604: - Issue Type: Bug (was: Improvement) > The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM > parameter or OS-specific settings > - > > Key: KAFKA-10604 > URL: https://issues.apache.org/jira/browse/KAFKA-10604 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Minor > > I found this problem working for > [KAFKA-10585|https://issues.apache.org/jira/browse/KAFKA-10585]. > The JVM's temporary directory location is different per OS, and JVM allows to > change it with `java.io.tmpdir` system property. In Linux, it defaults to > `/tmp`. > The problem is the default value of StreamsConfig.STATE_DIR_CONFIG > (`state.dir`) is fixed to `/tmp/kafka-streams`. For this reason, it does not > change if the runs on OS other than Linux or the user specifies > `java.io.tmpdir` system property. > It should be `\{temp-directory}/kafka-streams`, not `/tmp/kafka-streams`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one
[ https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haruki Okada updated KAFKA-10690: - Summary: Produce-response delay caused by lagging replica fetch which affects in-sync one (was: Produce-response delay caused by lagging replica fetch which blocks in-sync one) > Produce-response delay caused by lagging replica fetch which affects in-sync > one > > > Key: KAFKA-10690 > URL: https://issues.apache.org/jira/browse/KAFKA-10690 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.1 >Reporter: Haruki Okada >Priority: Major > Attachments: image-2020-11-06-11-15-21-781.png, > image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png > > > h2. Our environment > * Kafka version: 2.4.1 > h2. Phenomenon > * Produce response time 99th (remote scope) degrades to 500ms, which is 20 > times worse than usual > ** Meanwhile, the cluster was running replica reassignment to service-in new > machine to recover replicas which held by failed (Hardware issue) broker > machine > !image-2020-11-06-11-15-21-781.png|width=292,height=166! > h2. Analysis > Let's say > * broker-X: The broker we observed produce latency degradation > * broker-Y: The broker under servicing-in > broker-Y was catching up replicas of partitions: > * partition-A: has relatively small log size > * partition-B: has large log size > (actually, broker-Y was catching-up many other partitions. I noted only two > partitions here to make explanation simple) > broker-X was the leader for both partition-A and partition-B. > We found that both partition-A and partition-B are assigned to same > ReplicaFetcherThread of broker-Y, and produce latency started to degrade > right after broker-Y finished catching up partition-A. > !image-2020-11-06-11-17-09-910.png|width=476,height=174! > Besides, we observed disk reads on broker-X during service-in. (This is > natural since old segments are likely not in page cache) > !image-2020-11-06-11-15-38-390.png|width=292,height=193! > So we suspected that: > * In-sync replica fetch (partition-A) was involved by lagging replica fetch > (partition-B), which should be slow because it causes actual disk reads > ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next > fetch request can't be sent until one fetch request completes > ** => Causes in-sync replica fetch for partitions assigned to same replica > fetcher thread to delay > ** => Causes remote scope produce latency degradation > h2. Possible fix > We think this issue can be addressed by designating part of > ReplicaFetcherThread (or creating another thread pool) for lagging replica > catching-up, but not so sure this is the appropriate way. > Please give your opinions about this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman closed pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG
ableegoldman closed pull request #9489: URL: https://github.com/apache/kafka/pull/9489 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman removed a comment on pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG
ableegoldman removed a comment on pull request #9489: URL: https://github.com/apache/kafka/pull/9489#issuecomment-715639072 @guozhangwang @vvcephei WDYT? I'm generally all for more logs but this is pretty extreme 🙂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
chia7712 commented on pull request #8826: URL: https://github.com/apache/kafka/pull/8826#issuecomment-722781536 > Did that fix the issue since I still saw the same WARN when running kafka-topics.sh. my bad :( This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518478117 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -567,10 +589,34 @@ void runLoop() { } } catch (final TaskMigratedException e) { handleTaskMigrated(e); +} catch (final Exception e) { +if (this.streamsUncaughtExceptionHandler.handle(e)) { Review comment: I had a little trouble following the `Handler` class. Some trivial things -- eg the handler in the StreamThread is named `streamsUncaughtExceptionHandler` but it's actually _not_ a `StreamsUncaughtExceptionHandler`. Also the usage of the return value; IIUC it's supposed to indicate whether to use the new handler or fall back on the old one. To me it sounds like if `handle` returns `true` that means we should handle it, ie we should _not_ rethrow the exception, but this looks like the opposite of what we do now. Honestly either interpretation is ok with me, as long as it's documented somewhere Do we really need the `Handler` in the first place though? It's already pretty confusing that we have to deal with two types of handlers (old and new) so I'd prefer not to add a third unless it's really necessary. It seems like we can just inline the logic of whether to invoke the new handler or rethrow the exception, which would also clear up the confusion around the meaning of the return value. But I might be missing something here -- WDYT? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -282,6 +283,17 @@ public boolean isRunning() { private final Admin adminClient; private final InternalTopologyBuilder builder; +private Handler streamsUncaughtExceptionHandler; +private ShutdownErrorHook shutdownErrorHook; +private AtomicInteger assignmentErrorCode; +public interface ShutdownErrorHook { +void shutdown(); +} Review comment: Seems like we can just pass in a Runnable with `KafkaStreams::closeToError` instead of adding a whole `ShutdownErrorHook` functional interface ## File path: streams/src/main/resources/common/message/SubscriptionInfoData.json ## @@ -57,6 +57,11 @@ "name": "uniqueField", "versions": "8+", "type": "int8" +}, +{ + "name": "shutdownRequested", + "versions": "9+", + "type": "int8" Review comment: I think we should mirror the `errorCode` in the AssignmentInfo here, both in terms of naming and type. If we're going to use the same AssignorError for both, then they should really be the same. And we may want to send other kinds of error codes in the subscription going forward: better to just encode a single `int` than a separate `byte` for every logical error code. I don't think we'll notice the extra three bytes since Subscriptions aren't sent that frequently ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java ## @@ -30,7 +30,7 @@ public Admin adminClient; public TaskManager taskManager; public StreamsMetadataState streamsMetadataState; -public final AtomicInteger assignmentErrorCode = new AtomicInteger(); +public AtomicInteger assignmentErrorCode = new AtomicInteger(); Review comment: This should probably stay `final` so we don't accidentally change it ever ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) { } } +private void closeToError() { +if (!setState(State.ERROR)) { +log.info("Can not transition to error from state " + state()); Review comment: Should this be logged at error? ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) { } } +private void closeToError() { +if (!setState(State.ERROR)) { +log.info("Can not transition to error from state " + state()); +} else { +log.info("Transitioning to ERROR state"); +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} + +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +final Thread shutdownThread = new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting
[jira] [Created] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which blocks in-sync one
Haruki Okada created KAFKA-10690: Summary: Produce-response delay caused by lagging replica fetch which blocks in-sync one Key: KAFKA-10690 URL: https://issues.apache.org/jira/browse/KAFKA-10690 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 2.4.1 Reporter: Haruki Okada Attachments: image-2020-11-06-11-15-21-781.png, image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png h2. Our environment * Kafka version: 2.4.1 h2. Phenomenon * Produce response time 99th (remote scope) degrades to 500ms, which is 20 times worse than usual ** Meanwhile, the cluster was running replica reassignment to service-in new machine to recover replicas which held by failed (Hardware issue) broker machine !image-2020-11-06-11-15-21-781.png|width=292,height=166! h2. Analysis Let's say * broker-X: The broker we observed produce latency degradation * broker-Y: The broker under servicing-in broker-Y was catching up replicas of partitions: * partition-A: has relatively small log size * partition-B: has large log size (actually, broker-Y was catching-up many other partitions. I noted only two partitions here to make explanation simple) broker-X was the leader for both partition-A and partition-B. We found that both partition-A and partition-B are assigned to same ReplicaFetcherThread of broker-Y, and produce latency started to degrade right after broker-Y finished catching up partition-A. !image-2020-11-06-11-17-09-910.png|width=476,height=174! Besides, we observed disk reads on broker-X during service-in. (This is natural since old segments are likely not in page cache) !image-2020-11-06-11-15-38-390.png|width=292,height=193! So we suspected that: * In-sync replica fetch (partition-A) was involved by lagging replica fetch (partition-B), which should be slow because it causes actual disk reads ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next fetch request can't be sent until one fetch request completes ** => Causes in-sync replica fetch for partitions assigned to same replica fetcher thread to delay ** => Causes remote scope produce latency degradation h2. Possible fix We think this issue can be addressed by designating part of ReplicaFetcherThread (or creating another thread pool) for lagging replica catching-up, but not so sure this is the appropriate way. Please give your opinions about this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman opened a new pull request #9568: KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops
ableegoldman opened a new pull request #9568: URL: https://github.com/apache/kafka/pull/9568 Two pieces: 1) Fix bug in StreamSinkNode#writeToTopology to make sure it always calls the `addSink` overload with the specific topic name, when it exists, so that this topic gets tracked in the InternalTopologyBuilder's `nodeToSinkTopic` map. The sink topics are used by the StreamsPartitionAssignor to resolve the upstream subtopology of a repartition source topic, for whom that repartition topic will be a sink. Without this information the SPA gets stuck permanently during a rebalance 2) Improve the SPA's `setRepartitionTopicMetadataNumberOfPartitions()` method to break out of the loop if we aren't making any progress, to avoid infinitely looping if we ever have another bug like KAFKA-10689. If the SPA hasn't updated the known partition numbers for any repartition topic in the current outer loop, then we know that it's stuck and should throw a TaskAssignmentException to shut down the application This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
junrao commented on pull request #8826: URL: https://github.com/apache/kafka/pull/8826#issuecomment-722728533 @chia7712 : Thanks for the updated PR. Did that fix the issue since I still saw the same WARN when running kafka-topics.sh. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10624) [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration
[ https://issues.apache.org/jira/browse/KAFKA-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-10624. - Fix Version/s: 2.8.0 Resolution: Fixed merged the PR to trunk. > [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration > > > Key: KAFKA-10624 > URL: https://issues.apache.org/jira/browse/KAFKA-10624 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Minor > Fix For: 2.8.0 > > > In Scala, we prefer sealed traits over Enumeration since the former gives you > exhaustiveness checking. With Scala Enumeration, you don't get a warning if > you add a new value that is not handled in a given pattern match. > This Jira tracks refactoring enum > [FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801] > from an enum to a sealed trait. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10604) The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings
[ https://issues.apache.org/jira/browse/KAFKA-10604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227084#comment-17227084 ] Guozhang Wang commented on KAFKA-10604: --- I personally would prefer not a KIP for this straight-forward fix. > The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM > parameter or OS-specific settings > - > > Key: KAFKA-10604 > URL: https://issues.apache.org/jira/browse/KAFKA-10604 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Minor > > I found this problem working for > [KAFKA-10585|https://issues.apache.org/jira/browse/KAFKA-10585]. > The JVM's temporary directory location is different per OS, and JVM allows to > change it with `java.io.tmpdir` system property. In Linux, it defaults to > `/tmp`. > The problem is the default value of StreamsConfig.STATE_DIR_CONFIG > (`state.dir`) is fixed to `/tmp/kafka-streams`. For this reason, it does not > change if the runs on OS other than Linux or the user specifies > `java.io.tmpdir` system property. > It should be `\{temp-directory}/kafka-streams`, not `/tmp/kafka-streams`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration
junrao merged pull request #9561: URL: https://github.com/apache/kafka/pull/9561 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227080#comment-17227080 ] Guozhang Wang edited comment on KAFKA-10688 at 11/5/20, 11:50 PM: -- Without KAFKA-3370, then we have to implement the desired behavior at the streams layer itself. That is: 1) Upon task assignment, explicitly set the starting offset for the main consumer based on the per-topic / global reset policy. For repartition topics, the reset policy would be using the global policy. 2) Upon task revive (for corrupted exception handling), do the same thing as 1). 3) During normal processing, if an InvalidOffsetException is thrown from main consumer, we differentiate these cases: 3.a) for source topics: log a warning and reset accordingly; 3.b) for repartition topics throw as fatal errors. We can potentially be more strict that we require all topics contains committed offset, if only some of them have committed positions then fail. But for extensibility I'm going to hold on doing that for now. was (Author: guozhang): Without KAFKA-3370, then we have to implement the desired behavior at the streams layer itself. That is: 1) Upon task assignment, explicitly set the starting offset for the main consumer based on the per-topic / global reset policy. For repartition topics, the reset policy would be `latest`. 2) Upon task revive (for corrupted exception handling), do the same thing as 1). 3) During normal processing, if an InvalidOffsetException is thrown from main consumer, we differentiate these cases: 3.a) for source topics: log a warning and reset accordingly; 3.b) for repartition topics throw as fatal errors. We can potentially be more strict that we require all topics contains committed offset, if only some of them have committed positions then fail. But for extensibility I'm going to hold on doing that for now. > Handle accidental truncation of repartition topics as exceptional failure > - > > Key: KAFKA-10688 > URL: https://issues.apache.org/jira/browse/KAFKA-10688 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Today we always handle InvalidOffsetException from the main consumer by the > resetting policy assuming they are for source topics. But repartition topics > are also source topics and should never be truncated and hence cause > InvalidOffsetException. > We should differentiate these repartition topics from external source topics > and treat the InvalidOffsetException from repartition topics as fatal and > close the whole application. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10687) Produce request should be bumped for new error code PRODUCE_FENCED
[ https://issues.apache.org/jira/browse/KAFKA-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10687: Issue Type: Bug (was: Improvement) > Produce request should be bumped for new error code PRODUCE_FENCED > -- > > Key: KAFKA-10687 > URL: https://issues.apache.org/jira/browse/KAFKA-10687 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Blocker > Fix For: 2.7.0 > > > In https://issues.apache.org/jira/browse/KAFKA-9910, we missed a case where > the ProduceRequest needs to be bumped to return the new error code > PRODUCE_FENCED. This gap needs to be addressed as a blocker since it is > shipping in 2.7. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227080#comment-17227080 ] Guozhang Wang commented on KAFKA-10688: --- Without KAFKA-3370, then we have to implement the desired behavior at the streams layer itself. That is: 1) Upon task assignment, explicitly set the starting offset for the main consumer based on the per-topic / global reset policy. For repartition topics, the reset policy would be `latest`. 2) Upon task revive (for corrupted exception handling), do the same thing as 1). 3) During normal processing, if an InvalidOffsetException is thrown from main consumer, we differentiate these cases: 3.a) for source topics: log a warning and reset accordingly; 3.b) for repartition topics throw as fatal errors. We can potentially be more strict that we require all topics contains committed offset, if only some of them have committed positions then fail. But for extensibility I'm going to hold on doing that for now. > Handle accidental truncation of repartition topics as exceptional failure > - > > Key: KAFKA-10688 > URL: https://issues.apache.org/jira/browse/KAFKA-10688 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Today we always handle InvalidOffsetException from the main consumer by the > resetting policy assuming they are for source topics. But repartition topics > are also source topics and should never be truncated and hence cause > InvalidOffsetException. > We should differentiate these repartition topics from external source topics > and treat the InvalidOffsetException from repartition topics as fatal and > close the whole application. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji edited a comment on pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji edited a comment on pull request #9563: URL: https://github.com/apache/kafka/pull/9563#issuecomment-722690665 I got inspired to try and extend this to apply to all message types. I've updated the patch to remove the custom response logic in `FetchResponse` in favor of a general pattern using `SendBuilder`. We had an existing benchmark `FetchResponseBenchmark.testSerializeFetchResponse`, so I tried it out. Here is the results before the patch (10 topics and 500 topics): ``` Result "org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse": 6322.861 ±(99.9%) 310.785 ns/op [Average] (min, avg, max) = (6057.758, 6322.861, 7090.658), stdev = 290.708 CI (99.9%): [6012.076, 6633.646] (assumes normal distribution) Result "org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse": 323310.283 ±(99.9%) 25947.515 ns/op [Average] (min, avg, max) = (301370.273, 323310.283, 383716.556), stdev = 24271.322 CI (99.9%): [297362.768, 349257.799] (assumes normal distribution) ``` Here is the new benchmark (10 topics and 500 topics): ``` Result "org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse": 5701.378 ±(99.9%) 100.848 ns/op [Average] (min, avg, max) = (5601.838, 5701.378, 5925.943), stdev = 94.333 CI (99.9%): [5600.530, 5802.225] (assumes normal distribution) Result "org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse": 298221.825 ±(99.9%) 8173.945 ns/op [Average] (min, avg, max) = (287615.891, 298221.825, 321499.618), stdev = 7645.913 CI (99.9%): [290047.880, 306395.770] (assumes normal distribution) ``` So looks like a modest overall improvement. Note I still need to polish up a few things in the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r518433135 ## File path: clients/src/main/resources/common/message/EnvelopeRequest.json ## @@ -23,7 +23,7 @@ "fields": [ { "name": "RequestData", "type": "bytes", "versions": "0+", "zeroCopy": true, "about": "The embedded request header and data."}, -{ "name": "RequestPrincipal", "type": "bytes", "versions": "0+", "zeroCopy": true, "nullableVersions": "0+", +{ "name": "RequestPrincipal", "type": "bytes", "versions": "0+", "nullableVersions": "0+", Review comment: I agree it should not be nullable. Will fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10689) Assignor can't determine number of partitions on FJK with upstream windowed repartition
[ https://issues.apache.org/jira/browse/KAFKA-10689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227077#comment-17227077 ] A. Sophie Blee-Goldman commented on KAFKA-10689: It's a pretty obnoxious bug, since the application stays stuck in REBALANCING while StreamThreads slowly drop out of the group one-by-one as the current group leader gets stuck and a new rebalance has to be triggered. Meanwhile we don't log anything within this loop so it's impossible to know what happened based on the logs. Ideally we would just limit the number of iterations and shut down the application if we can't seem to figure out the number of partitions for some reason. Unfortunately, given the random way that setRepartitionTopicMetadataNumberOfPartitions walks through the topology and the lack of a ceiling on topological cycles/complexity, it's not immediately obvious how (or if) we can pick a limit on the number of necessary iterations. Still, we can probably improve the current situation and do better than just silently looping forever. One simple option would be to just start logging a warning once we're past some large iteration number. Another option is to keep track of the set of repartition topics whose partitions are still unknown, and if this set fails to change over one full iteration of the outer `topicGroups.values()` loop, then break out and shut down the application. This seems pretty airtight, although obviously a bit more complicated than just logging a warning at high iteration count. The logging is probably more than sufficient for a user to debug their application, but also a worse user experience. > Assignor can't determine number of partitions on FJK with upstream windowed > repartition > --- > > Key: KAFKA-10689 > URL: https://issues.apache.org/jira/browse/KAFKA-10689 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: A. Sophie Blee-Goldman >Priority: Major > Fix For: 2.8.0, 2.7.1 > > > Due to a minor logical gap in how windowed repartition sink nodes are written > to the topology, they are never added to the official map of sink topics > tracked by the InternalTopologyBuilder. This makes it impossible to determine > the number of partitions of downstream repartition topics in > StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions, > causing the assignor to loop infinitely in this method. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10689) Assignor can't determine number of partitions on FJK with upstream windowed repartition
[ https://issues.apache.org/jira/browse/KAFKA-10689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-10689: -- Assignee: A. Sophie Blee-Goldman > Assignor can't determine number of partitions on FJK with upstream windowed > repartition > --- > > Key: KAFKA-10689 > URL: https://issues.apache.org/jira/browse/KAFKA-10689 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 2.8.0, 2.7.1 > > > Due to a minor logical gap in how windowed repartition sink nodes are written > to the topology, they are never added to the official map of sink topics > tracked by the InternalTopologyBuilder. This makes it impossible to determine > the number of partitions of downstream repartition topics in > StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions, > causing the assignor to loop infinitely in this method. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10689) Assignor can't determine number of partitions on FJK with upstream windowed repartition
[ https://issues.apache.org/jira/browse/KAFKA-10689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-10689: --- Priority: Critical (was: Major) > Assignor can't determine number of partitions on FJK with upstream windowed > repartition > --- > > Key: KAFKA-10689 > URL: https://issues.apache.org/jira/browse/KAFKA-10689 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Critical > Fix For: 2.8.0, 2.7.1 > > > Due to a minor logical gap in how windowed repartition sink nodes are written > to the topology, they are never added to the official map of sink topics > tracked by the InternalTopologyBuilder. This makes it impossible to determine > the number of partitions of downstream repartition topics in > StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions, > causing the assignor to loop infinitely in this method. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10689) Assignor can't determine number of partitions on FJK with upstream windowed repartition
A. Sophie Blee-Goldman created KAFKA-10689: -- Summary: Assignor can't determine number of partitions on FJK with upstream windowed repartition Key: KAFKA-10689 URL: https://issues.apache.org/jira/browse/KAFKA-10689 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.5.0 Reporter: A. Sophie Blee-Goldman Fix For: 2.8.0, 2.7.1 Due to a minor logical gap in how windowed repartition sink nodes are written to the topology, they are never added to the official map of sink topics tracked by the InternalTopologyBuilder. This makes it impossible to determine the number of partitions of downstream repartition topics in StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions, causing the assignor to loop infinitely in this method. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bbejeck commented on a change in pull request #9565: MINOR: Move upgraded docs from site to Kafka docs
bbejeck commented on a change in pull request #9565: URL: https://github.com/apache/kafka/pull/9565#discussion_r518417041 ## File path: docs/js/templateData.js ## @@ -19,6 +19,6 @@ limitations under the License. var context={ "version": "26", "dotVersion": "2.6", -"fullDotVersion": "2.6.1", +"fullDotVersion": "2.6.0", Review comment: this needs to be `2.6.1` ## File path: docs/quickstart-zookeeper.html ## @@ -0,0 +1,277 @@ + + + + + + +
[jira] [Created] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure
Guozhang Wang created KAFKA-10688: - Summary: Handle accidental truncation of repartition topics as exceptional failure Key: KAFKA-10688 URL: https://issues.apache.org/jira/browse/KAFKA-10688 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Assignee: Guozhang Wang Today we always handle InvalidOffsetException from the main consumer by the resetting policy assuming they are for source topics. But repartition topics are also source topics and should never be truncated and hence cause InvalidOffsetException. We should differentiate these repartition topics from external source topics and treat the InvalidOffsetException from repartition topics as fatal and close the whole application. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10678) Re-deploying Streams app causes rebalance and task migration
[ https://issues.apache.org/jira/browse/KAFKA-10678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227067#comment-17227067 ] A. Sophie Blee-Goldman commented on KAFKA-10678: Thanks for opening a separate ticket for this. There seem to be two main problems/unanswered questions here: 1) Why was there a rebalance at all if static membership was enabled? 2) Why did the rebalance result in a large shuffling of tasks? For 1) it's difficult to say with only the broker side logs, since they won't tell us _why_ the client triggered a new rebalance after it was bounced. Would it be possible to collect logs from the client covering the period immediately after it was bounced, when it apparently tried to trigger a rebalance? I was discussing question 2) with [~cadonna] and it seems to be a combination of a few things: first, the "eventual" assignment is currently performed without regard to the previous placement of tasks. It just tries to distribute tasks as evenly as possible, using intermediate assignments and probing rebalances as needed. [~vvcephei] wrote up some thoughts on this in KAFKA-10121. We're aware of this limitation but haven't addressed it since the assignor is deterministic and therefore no-op group changes – such as an existing member being bounced – shouldn't result in a different eventual assignment than the stable one pre-bounce. Unfortunately this assignment identifies clients based on the encoded processId, which is actually randomly generated during StreamThread startup. So the processId identifier would change after a bounce, meaning different initial conditions to the assignor function and therefore a different final result :/ I think if the shuffling of tasks wasn't so bad then even if you did still get a rebalance even with static membership, then it would hardly be noticeable (given that it can continue to actively process during a cooperative rebalance). We could probably improve a majority of cases just by fixing the processId thing, but I feel like we might as well skip that and just go ahead with implementing KAFKA-10121 at that point to improve it for all cases. > Re-deploying Streams app causes rebalance and task migration > > > Key: KAFKA-10678 > URL: https://issues.apache.org/jira/browse/KAFKA-10678 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0, 2.6.1 >Reporter: Bradley Peterson >Priority: Major > Attachments: after, before, broker > > > Re-deploying our Streams app causes a rebalance, even when using static group > membership. Worse, the rebalance creates standby tasks, even when the > previous task assignment was balanced and stable. > Our app is currently using Streams 2.6.1-SNAPSHOT (due to [KAFKA-10633]) but > we saw the same behavior in 2.6.0. The app runs on 4 EC2 instances, each with > 4 streams threads, and data stored on persistent EBS volumes.. During a > redeploy, all EC2 instances are stopped, new instances are launched, and the > EBS volumes are attached to the new instances. We do not use interactive > queries. {{session.timeout.ms}} is set to 30 minutes, and the deployment > finishes well under that. {{num.standby.replicas}} is 0. > h2. Expected Behavior > Given a stable and balanced task assignment prior to deploying, we expect to > see the same task assignment after deploying. Even if a rebalance is > triggered, we do not expect to see new standby tasks. > h2. Observed Behavior > Attached are the "Assigned tasks to clients" log lines from before and after > deploying. The "before" is from over 24 hours ago, the task assignment is > well balanced and "Finished stable assignment of tasks, no followup > rebalances required." is logged. The "after" log lines show the same > assignment of active tasks, but some additional standby tasks. There are > additional log lines about adding and removing active tasks, which I don't > quite understand. > I've also included logs from the broker showing the rebalance was triggered > for "Updating metadata". -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji commented on pull request #9563: URL: https://github.com/apache/kafka/pull/9563#issuecomment-722690665 I got inspired to try and extend this to apply to all request types. I've updated the patch to remove the custom response logic in `FetchResponse` in favor of a general pattern using `SendBuilder`. We had an existing benchmark `FetchResponseBenchmark.testSerializeFetchResponse`, so I tried it out. Here is the results before the patch (10 topics and 500 topics): ``` Result "org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse": 6322.861 ±(99.9%) 310.785 ns/op [Average] (min, avg, max) = (6057.758, 6322.861, 7090.658), stdev = 290.708 CI (99.9%): [6012.076, 6633.646] (assumes normal distribution) Result "org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse": 323310.283 ±(99.9%) 25947.515 ns/op [Average] (min, avg, max) = (301370.273, 323310.283, 383716.556), stdev = 24271.322 CI (99.9%): [297362.768, 349257.799] (assumes normal distribution) ``` Here is the new benchmark (10 topics and 500 topics): ``` Result "org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse": 5701.378 ±(99.9%) 100.848 ns/op [Average] (min, avg, max) = (5601.838, 5701.378, 5925.943), stdev = 94.333 CI (99.9%): [5600.530, 5802.225] (assumes normal distribution) Result "org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse": 298221.825 ±(99.9%) 8173.945 ns/op [Average] (min, avg, max) = (287615.891, 298221.825, 321499.618), stdev = 7645.913 CI (99.9%): [290047.880, 306395.770] (assumes normal distribution) ``` So looks like a modest overall improvement. Note I still need to polish up a few things in the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram opened a new pull request #9567: MINOR: Always return partitions with diverging epochs in fetch response
rajinisivaram opened a new pull request #9567: URL: https://github.com/apache/kafka/pull/9567 This is required to ensure that followers can truncate based on diverging epochs returned in fetch responses. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r518414400 ## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java ## @@ -89,10 +89,11 @@ public ResponseHeader toResponseHeader() { public static RequestHeader parse(ByteBuffer buffer) { short apiKey = -1; try { +int position = buffer.position(); Review comment: This was a bug. This logic assumes that the buffer is at position 0. Let me add a test case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10687) Produce request should be bumped for new error code PRODUCE_FENCED
Boyang Chen created KAFKA-10687: --- Summary: Produce request should be bumped for new error code PRODUCE_FENCED Key: KAFKA-10687 URL: https://issues.apache.org/jira/browse/KAFKA-10687 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Assignee: Boyang Chen Fix For: 2.7.0 In https://issues.apache.org/jira/browse/KAFKA-9910, we missed a case where the ProduceRequest needs to be bumped to return the new error code PRODUCE_FENCED. This gap needs to be addressed as a blocker since it is shipping in 2.7. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
abbccdda commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r518396407 ## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java ## @@ -89,10 +89,11 @@ public ResponseHeader toResponseHeader() { public static RequestHeader parse(ByteBuffer buffer) { short apiKey = -1; try { +int position = buffer.position(); Review comment: Is this specifically for Envelope case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r518396177 ## File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.snapshot + +import java.nio.ByteBuffer +import java.nio.file.Path +import java.util.{Iterator => JIterator} +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.record.FileRecords +import org.apache.kafka.raft.OffsetAndEpoch +import org.apache.kafka.snapshot.SnapshotReader + +final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: OffsetAndEpoch) extends SnapshotReader { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
dajac commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r518387030 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version) + case res: DescribeUserScramCredentialsRequest => DescribeUserScramCredentialsRequestDat
[GitHub] [kafka] dajac commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
dajac commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r518384421 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version, verbose) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version, verbose) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data,
[jira] [Assigned] (KAFKA-6943) Have option to shutdown KS cleanly if any threads crashes, or if all threads crash
[ https://issues.apache.org/jira/browse/KAFKA-6943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson reassigned KAFKA-6943: - Assignee: Walker Carlson > Have option to shutdown KS cleanly if any threads crashes, or if all threads > crash > -- > > Key: KAFKA-6943 > URL: https://issues.apache.org/jira/browse/KAFKA-6943 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Assignee: Walker Carlson >Priority: Major > Labels: user-experience > > ATM users have to implement this themselves. Might be nice to have an option > to configure that if all threads crash, or if any crash, to initiate clean > shutdown. > This also has a gotcha where atm if you call KS#close without a timeout, from > the uncaught exception handler, you dead lock. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9331) Add option to terminate application when StreamThread(s) die
[ https://issues.apache.org/jira/browse/KAFKA-9331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson reassigned KAFKA-9331: - Assignee: Walker Carlson > Add option to terminate application when StreamThread(s) die > > > Key: KAFKA-9331 > URL: https://issues.apache.org/jira/browse/KAFKA-9331 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Bingham >Assignee: Walker Carlson >Priority: Minor > Labels: needs-kip > > Currently, if a {{StreamThread}} dies due to an unexpected exception, the > Streams application continues running. Even if all {{StreamThread}}(s) die, > the application will continue running, but will be in an {{ERROR}} state. > Many users want or expect the application to terminate in the event of a > fatal exception that kills one or more {{StreamThread}}(s). Currently, this > requires extra work from the developer to register an uncaught exception > handler on the {{KafkaStreams}} object and trigger a shutdown as needed. > It would be useful to provide a configurable option for the Streams > application to have it automatically terminate with an exception if one or > more {{StreamThread}}(s) die. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518371722 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +@Category(IntegrationTest.class) +public class StreamsHandlerIntegrationTest { +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +String inputTopic; +StreamsBuilder builder; +Properties properties; +List processorValueCollector; +String idempotentTopic = "idempotentTopic"; +String appId = ""; + +@Before +public void setup() { +final String testId = safeUniqueTestName(getClass(), testName); +appId = "appId_" + testId; +inputTopic = "input" + testId; +cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic); + +IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); Review comment: good questions This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518371511 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java ## @@ -60,6 +60,9 @@ public void onPartitionsAssigned(final Collection partitions) { } else if (assignmentErrorCode.get() == AssignorError.ASSIGNMENT_ERROR.code()) { log.error("Received error code {}", AssignorError.ASSIGNMENT_ERROR); throw new TaskAssignmentException("Hit an unexpected exception during task assignment phase of rebalance"); +} else if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) { Review comment: added unit test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r518363490 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version) + case res: DescribeUserScramCredentialsRequest => DescribeUserScramCredentialsReq
[GitHub] [kafka] kowshik commented on pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration
kowshik commented on pull request #9561: URL: https://github.com/apache/kafka/pull/9561#issuecomment-722644136 I've rebased on top of #9559 now. So the build failure(s) should be gone. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10500) Add API to Start and Stop Stream Threads
[ https://issues.apache.org/jira/browse/KAFKA-10500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson reassigned KAFKA-10500: -- Assignee: Walker Carlson > Add API to Start and Stop Stream Threads > > > Key: KAFKA-10500 > URL: https://issues.apache.org/jira/browse/KAFKA-10500 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bruno Cadonna >Assignee: Walker Carlson >Priority: Major > Labels: needs-kip > > Currently, there is no possibility in Kafka Streams to increase or decrease > the number of stream threads after the Kafka Streams client has been started. > Uncaught exceptions thrown in a stream thread kill the stream thread leaving > the Kafka Streams client with less stream threads for processing than when > the client was started. The only way to replace the killed stream thread is > to restart the whole Kafka Streams client. For transient errors, it might > make sense to replace a killed stream thread with a new one while users try > to find the root cause of the error. That could be accomplished by starting a > new stream thread in the uncaught exception handler of the killed stream > thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #9543: KAFKA-10500: Makes the Stream thread list resizable
vvcephei merged pull request #9543: URL: https://github.com/apache/kafka/pull/9543 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9543: KAFKA-10500: Makes the Stream thread list resizable
vvcephei commented on pull request #9543: URL: https://github.com/apache/kafka/pull/9543#issuecomment-722642034 The test failures look unrelated: ``` Build / JDK 8 / org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync | 34 sec | 1 -- | -- | -- Build / JDK 11 / kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition | 2 min 3 sec | 1 Build / JDK 11 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota | 59 sec | 1 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9566: KAFKA-10618: Update to Uuid class
jolshan commented on pull request #9566: URL: https://github.com/apache/kafka/pull/9566#issuecomment-722639934 @cmccabe @ijuma Let me know if this looks ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #9566: KAFKA-10618: Update to Uuid class
jolshan opened a new pull request #9566: URL: https://github.com/apache/kafka/pull/9566 As decided in KIP-516, the UUID class has been named Uuid. This PR changes all instances of org.apache.kafka.common.UUID to org.apache.kafka.common.Uuid. It also modifies the Uuid class so that it no longer wraps a java.util.UUID object. Now it simply stores two longs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r518359188 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version, verbose) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version, verbose) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518335631 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +@Category(IntegrationTest.class) +public class StreamsHandlerIntegrationTest { +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +String inputTopic; +StreamsBuilder builder; +Properties properties; +List processorValueCollector; +String idempotentTopic = "idempotentTopic"; Review comment: it can be removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas
apovzner commented on a change in pull request #9555: URL: https://github.com/apache/kafka/pull/9555#discussion_r518327928 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) } @volatile private var brokerMaxConnections = config.maxConnections + @volatile private var interBrokerListenerName = config.interBrokerListenerName Review comment: You are right -- I saw that test and thought it was a dynamic config, but the test was verifying that it cannot be updated. I see now that KIP-226 lists that as future work, cool. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rajinisivaram commented on pull request #9382: URL: https://github.com/apache/kafka/pull/9382#issuecomment-722609155 @hachikuji Thanks for the reviews, addressed the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518326776 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +@Category(IntegrationTest.class) +public class StreamsHandlerIntegrationTest { +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +String inputTopic; +StreamsBuilder builder; +Properties properties; +List processorValueCollector; +String idempotentTopic = "idempotentTopic"; +String appId = ""; + +@Before +public void setup() { +final String testId = safeUniqueTestName(getClass(), testName); +appId = "appId_" + testId; +inputTopic = "input" + testId; +cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic); + +IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + +builder = new StreamsBuilder(); + +processorValueCollector = new ArrayList<>(); + +final KStream stream = builder.stream(inputTopic); +stream.process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process")); + +properties = mkObjectProperties( +mkMap( +mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), +mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), +mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"), +mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"), +mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"), + mk
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rajinisivaram commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r518326079 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -221,7 +223,15 @@ abstract class AbstractFetcherThread(name: String, val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions) handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets") - updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) + updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, isTruncationOnFetchSupported) +} + } + + private def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]): Unit = { +inLock(partitionMapLock) { + val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty) + handlePartitionsWithErrors(partitionsWithError, "truncateOnFetchResponse") + updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, maySkipTruncation = false) Review comment: Fixed, removed that flag. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-1800) KafkaException was not recorded at the per-topic metrics
[ https://issues.apache.org/jira/browse/KAFKA-1800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226934#comment-17226934 ] Guozhang Wang commented on KAFKA-1800: -- This has been fixed since 1.0.0; closing now. > KafkaException was not recorded at the per-topic metrics > > > Key: KAFKA-1800 > URL: https://issues.apache.org/jira/browse/KAFKA-1800 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Attachments: KAFKA-1800.patch > > > When KafkaException was thrown from producer.send() call, it is not recorded > on the per-topic record-error-rate, but only the global error-rate. > Since users are usually monitoring on the per-topic metrics, loosing all > dropped message counts at this level that are caused by kafka producer thrown > exceptions such as BufferExhaustedException could be very dangerous. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] splett2 commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas
splett2 commented on a change in pull request #9555: URL: https://github.com/apache/kafka/pull/9555#discussion_r518283600 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) } @volatile private var brokerMaxConnections = config.maxConnections + @volatile private var interBrokerListenerName = config.interBrokerListenerName Review comment: `interBrokerListenerName` is apparently not a dynamic config. see `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate` ... ``` // Verify updating inter-broker listener val props = new Properties props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal) try { reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal)) fail("Inter-broker listener cannot be dynamically updated") } ``` I don't think we allow updating inter broker listener at all, so I think we can remove the test I added. I actually wasn't sure if we allowed it or not, but the code seems to suggest otherwise. ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) } @volatile private var brokerMaxConnections = config.maxConnections + @volatile private var interBrokerListenerName = config.interBrokerListenerName Review comment: `interBrokerListenerName` is apparently not a dynamic config. see `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate` ... ``` // Verify updating inter-broker listener val props = new Properties props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal) try { reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal)) fail("Inter-broker listener cannot be dynamically updated") } ``` I don't think we allow updating inter broker listener at all, so I think we can remove the test I added. I actually wasn't sure if we allowed it or not, but the code seems to suggest that it isn't. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas
splett2 commented on a change in pull request #9555: URL: https://github.com/apache/kafka/pull/9555#discussion_r518283600 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) } @volatile private var brokerMaxConnections = config.maxConnections + @volatile private var interBrokerListenerName = config.interBrokerListenerName Review comment: `interBrokerListenerName` is apparently not a dynamic config. `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate` ... ``` // Verify updating inter-broker listener val props = new Properties props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal) try { reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal)) fail("Inter-broker listener cannot be dynamically updated") } ``` It does seem like we allow updating the interbroker listener when adding/removing listeners, but that case is covered. I will try to verify we allow updating the interbroker listener name through the inter broker security protocol, since apparently that's another way to configure the listener. ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) } @volatile private var brokerMaxConnections = config.maxConnections + @volatile private var interBrokerListenerName = config.interBrokerListenerName Review comment: `interBrokerListenerName` is apparently not a dynamic config. see `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate` ... ``` // Verify updating inter-broker listener val props = new Properties props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal) try { reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal)) fail("Inter-broker listener cannot be dynamically updated") } ``` It does seem like we allow updating the interbroker listener when adding/removing listeners, but that case is covered. I will try to verify we allow updating the interbroker listener name through the inter broker security protocol, since apparently that's another way to configure the listener. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
cadonna commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518283539 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +374,63 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse handleStreamsUncaughtException(final Throwable e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(e); +switch (action) { +case SHUTDOWN_CLIENT: +log.error("Encountered the following exception during processing " + +"and the registered exception handler opted to \" + action + \"." + +" The streams client is going to shut down now. ", e); +close(Duration.ZERO); Review comment: My last comment is not true! Sorry! Everything alright! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9565: MINOR: Move upgraded docs from site to Kafka docs
bbejeck commented on pull request #9565: URL: https://github.com/apache/kafka/pull/9565#issuecomment-722569996 @mimaison ack, I'll take a look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
cadonna commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518283539 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +374,63 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse handleStreamsUncaughtException(final Throwable e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(e); +switch (action) { +case SHUTDOWN_CLIENT: +log.error("Encountered the following exception during processing " + +"and the registered exception handler opted to \" + action + \"." + +" The streams client is going to shut down now. ", e); +close(Duration.ZERO); Review comment: My last comment is not true! Sorry! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518274359 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +@Category(IntegrationTest.class) +public class StreamsHandlerIntegrationTest { +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +String inputTopic; +StreamsBuilder builder; +Properties properties; +List processorValueCollector; +String idempotentTopic = "idempotentTopic"; +String appId = ""; + +@Before +public void setup() { +final String testId = safeUniqueTestName(getClass(), testName); +appId = "appId_" + testId; +inputTopic = "input" + testId; +cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic); + +IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + +builder = new StreamsBuilder(); + +processorValueCollector = new ArrayList<>(); + +final KStream stream = builder.stream(inputTopic); +stream.process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process")); + +properties = mkObjectProperties( +mkMap( +mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), +mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), +mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"), +mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"), +mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"), + mk
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518271918 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +@Category(IntegrationTest.class) +public class StreamsHandlerIntegrationTest { +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +String inputTopic; +StreamsBuilder builder; +Properties properties; +List processorValueCollector; +String idempotentTopic = "idempotentTopic"; +String appId = ""; + +@Before +public void setup() { +final String testId = safeUniqueTestName(getClass(), testName); +appId = "appId_" + testId; +inputTopic = "input" + testId; +cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic); + +IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + +builder = new StreamsBuilder(); + +processorValueCollector = new ArrayList<>(); + +final KStream stream = builder.stream(inputTopic); +stream.process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process")); + +properties = mkObjectProperties( +mkMap( +mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), +mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), +mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"), +mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"), +mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"), + mk
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518271215 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +@Category(IntegrationTest.class) +public class StreamsHandlerIntegrationTest { +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +String inputTopic; +StreamsBuilder builder; +Properties properties; +List processorValueCollector; +String idempotentTopic = "idempotentTopic"; +String appId = ""; + +@Before +public void setup() { +final String testId = safeUniqueTestName(getClass(), testName); +appId = "appId_" + testId; +inputTopic = "input" + testId; +cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic); + +IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + +builder = new StreamsBuilder(); + +processorValueCollector = new ArrayList<>(); + +final KStream stream = builder.stream(inputTopic); +stream.process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process")); + +properties = mkObjectProperties( +mkMap( +mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), +mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), +mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"), +mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"), +mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"), + mk
[GitHub] [kafka] cadonna commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
cadonna commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518269493 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +374,63 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse handleStreamsUncaughtException(final Throwable e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(e); +switch (action) { +case SHUTDOWN_CLIENT: +log.error("Encountered the following exception during processing " + +"and the registered exception handler opted to \" + action + \"." + +" The streams client is going to shut down now. ", e); +close(Duration.ZERO); Review comment: Why not? It would be much cleaner. We would close all stuff like admin client and the metrics, remove the client metrics and set the state to NOT_RUNNING which is not necessarily done with timeout zero (probably not because of the death lock). Additionally, we would get an nice info debug saying `Streams client stopped completely` instead of `Streams client cannot stop completely within the timeout`. ;-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas
apovzner commented on a change in pull request #9555: URL: https://github.com/apache/kafka/pull/9555#discussion_r518269240 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) } @volatile private var brokerMaxConnections = config.maxConnections + @volatile private var interBrokerListenerName = config.interBrokerListenerName Review comment: Good point and definitely agree that `config.interBrokerListenerName` could be expensive as we call it several times per accepting a new connection. The issue here is that interBrokerListenerName is a dynamic config. So, you will need to update the cached value on changes to that config; similar how we notify ConnectionQuotas about config changes from SocketServer.reconfigure(). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10686) Pluggable standby tasks assignor for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-10686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze reassigned KAFKA-10686: - Assignee: Levani Kokhreidze > Pluggable standby tasks assignor for Kafka Streams > -- > > Key: KAFKA-10686 > URL: https://issues.apache.org/jira/browse/KAFKA-10686 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Assignee: Levani Kokhreidze >Priority: Major > Labels: needs-kip > > In production, Kafka Streams instances often run across different clusters > and availability zones. In order to guarantee high availability of the Kafka > Streams deployments, users would need more granular control over which > instances standby tasks can be created. > Idea of this ticket is to expose interface for Kafka Streams which can be > implemented by the users to control where standby tasks can be created. > Kafka Streams can have RackAware assignment as a default implementation that > will take into account `rack.id` of the application and make sure that > standby tasks are created on different racks. > Point of this ticket though is to give more flexibility to users on standby > task creation, in cases where just rack awareness is not enough. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518267956 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +@Category(IntegrationTest.class) +public class StreamsHandlerIntegrationTest { +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +String inputTopic; +StreamsBuilder builder; +Properties properties; +List processorValueCollector; +String idempotentTopic = "idempotentTopic"; +String appId = ""; + +@Before +public void setup() { +final String testId = safeUniqueTestName(getClass(), testName); +appId = "appId_" + testId; +inputTopic = "input" + testId; +cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic); + +IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + +builder = new StreamsBuilder(); + +processorValueCollector = new ArrayList<>(); + +final KStream stream = builder.stream(inputTopic); +stream.process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process")); + +properties = mkObjectProperties( +mkMap( +mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), +mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), +mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"), +mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"), +mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"), + mk
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518265803 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +@Category(IntegrationTest.class) +public class StreamsHandlerIntegrationTest { Review comment: sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518264927 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,6 +1061,72 @@ private boolean close(final long timeoutMs) { } } +private void closeToError() { +if (!setState(State.ERROR)) { +// if transition failed, it means it was either in PENDING_SHUTDOWN +// or NOT_RUNNING already; just check that all threads have been stopped Review comment: I don't think we actually need it either way so I will just remove it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518263937 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +374,63 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse handleStreamsUncaughtException(final Throwable e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(e); +switch (action) { +case SHUTDOWN_CLIENT: +log.error("Encountered the following exception during processing " + +"and the registered exception handler opted to \" + action + \"." + +" The streams client is going to shut down now. ", e); +close(Duration.ZERO); Review comment: It might be but I do not think that it is necessary This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org