[GitHub] [kafka] yashmayya commented on a diff in pull request #13375: KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode
yashmayya commented on code in PR #13375: URL: https://github.com/apache/kafka/pull/13375#discussion_r1144234357 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ## @@ -152,11 +154,6 @@ public void stop() { connectCluster.forEach(this::stopWorker); try { kafkaCluster.stop(); -} catch (UngracefulShutdownException e) { -log.warn("Kafka did not shutdown gracefully"); Review Comment: It's being handled here - https://github.com/yashmayya/kafka/blob/f0f4bb9a30ed3fcb4692f16185df0dce4a032efd/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L175-L179 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -174,13 +174,17 @@ private KafkaConfig createNodeConfig(TestKitNode node) { props.put(KafkaConfig$.MODULE$.LogDirsProp(), String.join(",", brokerNode.logDataDirectories())); } -props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), -"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); -props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id())); -props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), -nodes.interBrokerListenerName().value()); -props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), -"CONTROLLER"); + +// listeners could be defined via Builder::setConfigProp which shouldn't be overridden +if (!props.containsKey(KafkaConfig$.MODULE$.ListenersProp())) { Review Comment: Hm that's a good point and I did ponder over this one as well. The alternative would be to parse the user defined listener here and make sure that either the security protocol map, inter broker listener, controller listener are all set appropriately or else do so ourselves - this would involve making sure that the protocols, SASL mechanisms etc. all match up properly which seems potentially error prone (I tried a setup with broker listeners and controller listeners using different security protocols or mechanisms and I wasn't able to get it to work - not sure whether something like that is even currently supported). I'm assuming that users would likely want to set custom listeners in case they want non-plaintext listeners at which point it isn't much extra work to also setup the controller listener accordingly (and there's multiple examples on how to do so in various integration tests). What do you think? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -195,7 +197,22 @@ public void testRestartFailedTask() throws Exception { public void testBrokerCoordinator() throws Exception { ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000)); -connect = connectBuilder.workerProps(workerProps).build(); +Properties brokerProps = new Properties(); + +// Find a free port and use it in the Kafka broker's listeners config. We can't use port 0 in the listeners +// config to get a random free port because in this test we want to stop the Kafka broker and then bring it +// back up and listening on the same port in order to verify that the Connect cluster can re-connect to Kafka +// and continue functioning normally. If we were to use port 0 here, the Kafka broker would most likely listen +// on a different random free port the second time it is started. Review Comment: Sorry, my bad for not clarifying at the outset. There's a couple of reasons - one is that the previous implementation seemed fairly messy where we were checking the bound ports after broker startup and then modifying the broker config and using the static port in the listener configuration on broker restarts only if the user hadn't already configured listener configs. So, for instance, this `startOnlyKafkaOnSamePorts` would not work as expected if the user defines a SASL or SSL listener with port 0. The other reason is that the `KafkaClusterTestKit` being leveraged here would need some refactors to allow changing broker configs after it has already been instantiated - currently, it only supports customising broker configs in its builder. All in all, it just seems a lot cleaner to move the responsibility of using a fixed port in the listeners configuration to the clients of the embedded Kafka cluster in case they want to test functionality involving offline brokers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[GitHub] [kafka] hudeqi commented on pull request #13427: MINOR:Incorrect/canonical use of constants in AdminClientConfig and StreamsConfigTest
hudeqi commented on PR #13427: URL: https://github.com/apache/kafka/pull/13427#issuecomment-1478846920 Could you help to review? @dengziming -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
[ https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-14824: --- Summary: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception (was: ReplicaAlterLogDirsThread may cause serious disk growing in case of unknown exception) > ReplicaAlterLogDirsThread may cause serious disk growing in case of potential > exception > --- > > Key: KAFKA-14824 > URL: https://issues.apache.org/jira/browse/KAFKA-14824 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Priority: Blocker > Attachments: 1.png, 2.png, 3.png, 4.png > > > For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an > unknown exception and the partition fetch is suspended, the paused cleanup > logic of the partition needs to be canceled, otherwise it will lead to > serious unexpected disk usage growth. > > For example, in the actual production environment (the Kafka version used is > 2.5.1), there is such a case: perform log dir balance on this partition > leader broker. After started fetching when the future log is successfully > created, then reset and truncate to the leader's log start offset for the > first time due to out of range. At the same time, because the partition > leader is processing the leaderAndIsrRequest, the leader epoch is updated, so > the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the > 'partitionStates' of the partition are cleaned up. At the same time, the > logic of add ReplicaAlterLogDirsThread for the partition is executing in the > thread that is processing leaderAndIsrRequest. In here, the offset set by > InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread > performs the logic of processFetchRequest, it will throw > "java.lang.IllegalStateException : Offset mismatch for the future replica > anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, > log end offset = 4918576434.", leading to such a result: > ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous > paused cleanup logic of the partition, the disk usage of the corresponding > broker increases infinitely, causing serious problems. > > But I found that trunk fixed this bug in KAFKA-9087, which may cause > ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop > fetch. But I don't know if there will be some other unknown exceptions, and > at the same time, due to the current logic, it will bring the same disk > cleanup failure problem? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
[ https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi reassigned KAFKA-14824: -- Assignee: hudeqi > ReplicaAlterLogDirsThread may cause serious disk growing in case of potential > exception > --- > > Key: KAFKA-14824 > URL: https://issues.apache.org/jira/browse/KAFKA-14824 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Assignee: hudeqi >Priority: Blocker > Attachments: 1.png, 2.png, 3.png, 4.png > > > For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an > unknown exception and the partition fetch is suspended, the paused cleanup > logic of the partition needs to be canceled, otherwise it will lead to > serious unexpected disk usage growth. > > For example, in the actual production environment (the Kafka version used is > 2.5.1), there is such a case: perform log dir balance on this partition > leader broker. After started fetching when the future log is successfully > created, then reset and truncate to the leader's log start offset for the > first time due to out of range. At the same time, because the partition > leader is processing the leaderAndIsrRequest, the leader epoch is updated, so > the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the > 'partitionStates' of the partition are cleaned up. At the same time, the > logic of add ReplicaAlterLogDirsThread for the partition is executing in the > thread that is processing leaderAndIsrRequest. In here, the offset set by > InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread > performs the logic of processFetchRequest, it will throw > "java.lang.IllegalStateException : Offset mismatch for the future replica > anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, > log end offset = 4918576434.", leading to such a result: > ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous > paused cleanup logic of the partition, the disk usage of the corresponding > broker increases infinitely, causing serious problems. > > But I found that trunk fixed this bug in KAFKA-9087, which may cause > ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop > fetch. But I don't know if there will be some other unknown exceptions, and > at the same time, due to the current logic, it will bring the same disk > cleanup failure problem? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk growing in case of unknown exception
[ https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-14824: --- Summary: ReplicaAlterLogDirsThread may cause serious disk growing in case of unknown exception (was: ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception) > ReplicaAlterLogDirsThread may cause serious disk growing in case of unknown > exception > - > > Key: KAFKA-14824 > URL: https://issues.apache.org/jira/browse/KAFKA-14824 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Priority: Blocker > Attachments: 1.png, 2.png, 3.png, 4.png > > > For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an > unknown exception and the partition fetch is suspended, the paused cleanup > logic of the partition needs to be canceled, otherwise it will lead to > serious unexpected disk usage growth. > > For example, in the actual production environment (the Kafka version used is > 2.5.1), there is such a case: perform log dir balance on this partition > leader broker. After started fetching when the future log is successfully > created, then reset and truncate to the leader's log start offset for the > first time due to out of range. At the same time, because the partition > leader is processing the leaderAndIsrRequest, the leader epoch is updated, so > the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the > 'partitionStates' of the partition are cleaned up. At the same time, the > logic of add ReplicaAlterLogDirsThread for the partition is executing in the > thread that is processing leaderAndIsrRequest. In here, the offset set by > InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread > performs the logic of processFetchRequest, it will throw > "java.lang.IllegalStateException : Offset mismatch for the future replica > anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, > log end offset = 4918576434.", leading to such a result: > ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous > paused cleanup logic of the partition, the disk usage of the corresponding > broker increases infinitely, causing serious problems. > > But I found that trunk fixed this bug in KAFKA-9087, which may cause > ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop > fetch. But I don't know if there will be some other unknown exceptions, and > at the same time, due to the current logic, it will bring the same disk > cleanup failure problem? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824:ReplicaAlterLogDirsThread may cause serious disk usage in…
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1478842301 Hello, for "potential exceptions", I did an experiment to simulate a disk failure, which eventually lead to the unexpected disk growing. For details, please refer to the corresponding comment in [jira](https://issues.apache.org/jira/browse/KAFKA-14824) . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception
[ https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703449#comment-17703449 ] hudeqi edited comment on KAFKA-14824 at 3/22/23 2:35 AM: - For "potential exceptions may be throw", I did such an experiment: IOException was artificially injected into "processPartitionData" to simulate that ReplicaAlterLogDirsThread encountered a disk failure during the fetch process, in which the future log is in /data2 and fetching from /data1 (normal data retention time is 1h). Finally, I found the same serious impact as the previous "offset mismatch error" (KAFKA-9087) leads: when the exception thrown by "processPartitionData" was caught, because it was marked as failed, the log cleanup of the corresponding partition was not resumed, so compared to other disks, The log of /data1 will grow without limit. Therefore, I think the defensive measure of "resume the log cleanup of the source partition when failure" is necessary. The screenshot of the experiment can be found in the attachment. was (Author: hudeqi): For "potential exceptions may be throw", I did such an experiment: IOException was artificially injected into "processPartitionData" to simulate that ReplicaAlterLogDirsThread encountered a disk failure during the fetch process, in which the future log is in /data2 and fetching from /data1 (normal data retention time is 1h). Finally, I found the same serious impact as the previous "offset mismatch error" (KAFKA-9087) leads: when the exception thrown by "processPartitionData" was caught, because it was marked as failed, the log cleanup of the corresponding partition was not resumed, so compared to other disks, The log of /data1 will grow without limit. Therefore, I think the defensive measure of "resume the log cleanup of the source partition when failure" is necessary. !1.png! > ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown > exception > --- > > Key: KAFKA-14824 > URL: https://issues.apache.org/jira/browse/KAFKA-14824 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Priority: Blocker > Attachments: 1.png, 2.png, 3.png, 4.png > > > For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an > unknown exception and the partition fetch is suspended, the paused cleanup > logic of the partition needs to be canceled, otherwise it will lead to > serious unexpected disk usage growth. > > For example, in the actual production environment (the Kafka version used is > 2.5.1), there is such a case: perform log dir balance on this partition > leader broker. After started fetching when the future log is successfully > created, then reset and truncate to the leader's log start offset for the > first time due to out of range. At the same time, because the partition > leader is processing the leaderAndIsrRequest, the leader epoch is updated, so > the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the > 'partitionStates' of the partition are cleaned up. At the same time, the > logic of add ReplicaAlterLogDirsThread for the partition is executing in the > thread that is processing leaderAndIsrRequest. In here, the offset set by > InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread > performs the logic of processFetchRequest, it will throw > "java.lang.IllegalStateException : Offset mismatch for the future replica > anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, > log end offset = 4918576434.", leading to such a result: > ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous > paused cleanup logic of the partition, the disk usage of the corresponding > broker increases infinitely, causing serious problems. > > But I found that trunk fixed this bug in KAFKA-9087, which may cause > ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop > fetch. But I don't know if there will be some other unknown exceptions, and > at the same time, due to the current logic, it will bring the same disk > cleanup failure problem? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception
[ https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-14824: --- Attachment: 1.png 2.png 3.png 4.png > ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown > exception > --- > > Key: KAFKA-14824 > URL: https://issues.apache.org/jira/browse/KAFKA-14824 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Priority: Blocker > Attachments: 1.png, 2.png, 3.png, 4.png > > > For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an > unknown exception and the partition fetch is suspended, the paused cleanup > logic of the partition needs to be canceled, otherwise it will lead to > serious unexpected disk usage growth. > > For example, in the actual production environment (the Kafka version used is > 2.5.1), there is such a case: perform log dir balance on this partition > leader broker. After started fetching when the future log is successfully > created, then reset and truncate to the leader's log start offset for the > first time due to out of range. At the same time, because the partition > leader is processing the leaderAndIsrRequest, the leader epoch is updated, so > the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the > 'partitionStates' of the partition are cleaned up. At the same time, the > logic of add ReplicaAlterLogDirsThread for the partition is executing in the > thread that is processing leaderAndIsrRequest. In here, the offset set by > InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread > performs the logic of processFetchRequest, it will throw > "java.lang.IllegalStateException : Offset mismatch for the future replica > anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, > log end offset = 4918576434.", leading to such a result: > ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous > paused cleanup logic of the partition, the disk usage of the corresponding > broker increases infinitely, causing serious problems. > > But I found that trunk fixed this bug in KAFKA-9087, which may cause > ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop > fetch. But I don't know if there will be some other unknown exceptions, and > at the same time, due to the current logic, it will bring the same disk > cleanup failure problem? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception
[ https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703449#comment-17703449 ] hudeqi commented on KAFKA-14824: For "potential exceptions may be throw", I did such an experiment: IOException was artificially injected into "processPartitionData" to simulate that ReplicaAlterLogDirsThread encountered a disk failure during the fetch process, in which the future log is in /data2 and fetching from /data1 (normal data retention time is 1h). Finally, I found the same serious impact as the previous "offset mismatch error" (KAFKA-9087) leads: when the exception thrown by "processPartitionData" was caught, because it was marked as failed, the log cleanup of the corresponding partition was not resumed, so compared to other disks, The log of /data1 will grow without limit. Therefore, I think the defensive measure of "resume the log cleanup of the source partition when failure" is necessary. !1.png! > ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown > exception > --- > > Key: KAFKA-14824 > URL: https://issues.apache.org/jira/browse/KAFKA-14824 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Priority: Blocker > Attachments: 1.png, 2.png, 3.png, 4.png > > > For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an > unknown exception and the partition fetch is suspended, the paused cleanup > logic of the partition needs to be canceled, otherwise it will lead to > serious unexpected disk usage growth. > > For example, in the actual production environment (the Kafka version used is > 2.5.1), there is such a case: perform log dir balance on this partition > leader broker. After started fetching when the future log is successfully > created, then reset and truncate to the leader's log start offset for the > first time due to out of range. At the same time, because the partition > leader is processing the leaderAndIsrRequest, the leader epoch is updated, so > the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the > 'partitionStates' of the partition are cleaned up. At the same time, the > logic of add ReplicaAlterLogDirsThread for the partition is executing in the > thread that is processing leaderAndIsrRequest. In here, the offset set by > InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread > performs the logic of processFetchRequest, it will throw > "java.lang.IllegalStateException : Offset mismatch for the future replica > anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, > log end offset = 4918576434.", leading to such a result: > ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous > paused cleanup logic of the partition, the disk usage of the corresponding > broker increases infinitely, causing serious problems. > > But I found that trunk fixed this bug in KAFKA-9087, which may cause > ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop > fetch. But I don't know if there will be some other unknown exceptions, and > at the same time, due to the current logic, it will bring the same disk > cleanup failure problem? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on pull request #13425: KAFKA-14365: Extract common logic from Fetcher
kirktrue commented on PR #13425: URL: https://github.com/apache/kafka/pull/13425#issuecomment-1478805465 @guozhangwang @hachikuji @rajinisivaram @philipnee This is ready for a review from whomever has the time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
[ https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703339#comment-17703339 ] Greg Harris edited comment on KAFKA-14666 at 3/22/23 12:12 AM: --- I proposed a tactical fix for this in [https://github.com/apache/kafka/pull/13429] which provides variable-precision translation between the most recent restart of MM2 and the end of the replicated topic, similar to the existing behavior pre KAFKA-12468. This uses strategy (2) from above, but limited to syncs read during a single task lifetime to get monotonicity without re-reading the checkpoint topic. A separate improvement can be considered to allow for translation of offsets prior to the latest restart of MM2, or increasing the precision of the translation with new configurations. was (Author: gharris1727): I proposed a tactical fix for this in [https://github.com/apache/kafka/pull/13429] which provides variable-precision translation between the most recent restart of MM2 and the end of the replicated topic, similar to the existing behavior pre KAFKA-12468. This uses strategy (2) from above, but limited to syncs read during a single task lifetime to get monotonicity without re-reading the checkpoint topic. A separate improvement can be considered to allow for translation of offsets prior to the latest restart of MM2, or increasing the accuracy of the translation with new configurations. > MM2 should translate consumer group offsets behind replication flow > --- > > Key: KAFKA-14666 > URL: https://issues.apache.org/jira/browse/KAFKA-14666 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.5.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > MirrorMaker2 includes an offset translation feature which can translate the > offsets for an upstream consumer group to a corresponding downstream consumer > group. It does this by keeping a topic of offset-syncs to correlate upstream > and downstream offsets, and translates any source offsets which are ahead of > the replication flow. > However, if a replication flow is closer to the end of a topic than the > consumer group, then the offset translation feature will refuse to translate > the offset for correctness reasons. This is because the MirrorCheckpointTask > only keeps the latest offset correlation between source and target, it does > not have sufficient information to translate older offsets. > The workarounds for this issue are to: > 1. Pause the replication flow occasionally to allow the source to get ahead > of MM2 > 2. Increase the offset.lag.max to delay offset syncs, increasing the window > for translation to happen. With the fix for KAFKA-12468, this will also > increase the lag of applications that are ahead of the replication flow, so > this is a tradeoff. > Instead, the MirrorCheckpointTask should provide correct and best-effort > translation for consumer groups behind the replication flow by keeping > additional state, or re-reading the offset-syncs topic. This should be a > substantial improvement for use-cases where applications have a higher > latency to commit than the replication flow, or where applications are > reading from the earliest offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax merged pull request #13409: KAFKA-14491: [18/N] Update versioned store to check latest value on timestamped get
mjsax merged PR #13409: URL: https://github.com/apache/kafka/pull/13409 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores
mjsax merged PR #13292: URL: https://github.com/apache/kafka/pull/13292 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #13430: MINOR; Increase log level of some rare events
jsancio merged PR #13430: URL: https://github.com/apache/kafka/pull/13430 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703404#comment-17703404 ] Guozhang Wang commented on KAFKA-13295: --- [~sagarrao] I had a chat with others driving the 4.0 release and it seems like it will still take some time, given that I will resume helping you to finish fixing this bug and not wait for that release. > Long restoration times for new tasks can lead to transaction timeouts > - > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Critical > Labels: eos, new-streams-runtime-should-fix > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-13295: -- Fix Version/s: (was: 4.0.0) > Long restoration times for new tasks can lead to transaction timeouts > - > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Critical > Labels: eos, new-streams-runtime-should-fix > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] SpacRocket commented on a diff in pull request #13382: KAFKA-14722: Make BooleanSerde public
SpacRocket commented on code in PR #13382: URL: https://github.com/apache/kafka/pull/13382#discussion_r1144052014 ## streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java: ## @@ -48,42 +45,4 @@ public NullableValueAndTimestampSerde(final Serde valueSerde) { ); } -static final class BooleanSerde { Review Comment: I think not, I've found following unit tests that are using NullableValueAndTimestampSerializer (currently the only code that depends on BooleanSerde): - NullableValueAndTimestampSerdeTest - These unit-tests check everything as a whole class. There is not a single unit-test that tests only BooleanSerde. - MeteredVersionedKeyValueStoreTest - Same as above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1144002857 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } +private class UnsentOffsetFetchRequest extends RequestState { +public final Set requestedPartitions; +public final GroupState.Generation requestedGeneration; +public CompletableFuture> future; + +public UnsentOffsetFetchRequest(final Set partitions, +final GroupState.Generation generation, +final long retryBackoffMs) { +super(retryBackoffMs); +this.requestedPartitions = partitions; +this.requestedGeneration = generation; +this.future = new CompletableFuture<>(); +} + +public boolean sameRequest(final UnsentOffsetFetchRequest request) { +return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); +} + +public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long currentTimeMs) { +OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder( +groupState.groupId, +true, +new ArrayList<>(this.requestedPartitions), +throwOnFetchStableOffsetUnsupported); +NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( +builder, +coordinatorRequestManager.coordinator()); +unsentRequest.future().whenComplete((r, t) -> { +onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody()); +}); +return unsentRequest; +} + +public void onResponse( +final long currentTimeMs, +final OffsetFetchResponse response) { +Errors responseError = response.groupLevelError(groupState.groupId); +if (responseError != Errors.NONE) { +onFailure(currentTimeMs, responseError); +return; +} +onSuccess(currentTimeMs, response); +} + +private void onFailure(final long currentTimeMs, + final Errors responseError) { +log.debug("Offset fetch failed: {}", responseError.message()); + +// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ? +if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { +retry(currentTimeMs); +} else if (responseError == Errors.NOT_COORDINATOR) { +// re-discover the coordinator and retry + coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), Time.SYSTEM.milliseconds()); +retry(currentTimeMs); +} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId)); +} else { +future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); +} +} + +private void retry(final long currentTimeMs) { +onFailedAttempt(currentTimeMs); +onSendAttempt(currentTimeMs); +pendingRequests.addOffsetFetchRequest(this); +} + +private void onSuccess(final long currentTimeMs, + final OffsetFetchResponse response) { +Set unauthorizedTopics = null; +Map responseData = +response.partitionDataMap(groupState.groupId); +Map offsets = new HashMap<>(responseData.size()); +Set unstableTxnOffsetTopicPartitions = new HashSet<>(); +for (Map.Entry entry : responseData.entrySet()) { +TopicPartition tp = entry.getKey(); +OffsetFetchResponse.PartitionData partitionData = entry.getValue(); +if (partitionData.hasError()) { +Errors error = partitionData.error; +log.debug("Failed to fetch offset for partition {}: {}", tp, error.message()); + +if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { +future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does " + +"not " + +"exist")); +return; +} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { +if (unauthorizedTopics == null) { +unauthorizedTopics
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1143999124 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -137,31 +166,21 @@ CompletableFuture sendAutoCommit(final Map t.ack(currentTimeMs)); } -// Visible for testing -Queue stagedCommits() { -return this.stagedCommits; -} -private class StagedCommit { +private class UnsentOffsetCommitRequest { private final Map offsets; private final String groupId; private final GroupState.Generation generation; private final String groupInstanceId; private final NetworkClientDelegate.FutureCompletionHandler future; -public StagedCommit(final Map offsets, -final String groupId, -final String groupInstanceId, -final GroupState.Generation generation) { +public UnsentOffsetCommitRequest(final Map offsets, + final String groupId, + final String groupInstanceId, + final GroupState.Generation generation) { this.offsets = offsets; -// if no callback is provided, DefaultOffsetCommitCallback will be used. Review Comment: I'll double check if the defaultOffsetCommitCallback is being invoked when user provides a null callback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12694) Specifying a struct-based defaultValue on a SchemaBuilder causes a DataException
[ https://issues.apache.org/jira/browse/KAFKA-12694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-12694: - Assignee: Chris Egerton > Specifying a struct-based defaultValue on a SchemaBuilder causes a > DataException > > > Key: KAFKA-12694 > URL: https://issues.apache.org/jira/browse/KAFKA-12694 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.7.0 >Reporter: Chris Cranford >Assignee: Chris Egerton >Priority: Major > > When making a call to {{SchemaBuilder#defaultValue(Object)}} a > {{DataException}} will be thrown if the value is of type {{STRUCT}} due to a > schema mismatch. This is because the method passes a reference of {{this}}, > referencing a {{SchemaBuilder}} in the {{ConnectSchema#validate}} method > which later relies on a {{ConnectSchema#equals}} equality check which expects > the passed schema to be an actual {{ConnectSchema}} rather than a > {{SchemaBuilder}} object that implements {{Schema}}. > {code} > Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid > default value > at > org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131) > at > io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374) > at > io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) > at > java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at > io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117) > at > io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130) > at > io.debezium.connector.oracle.OracleDatabaseSchema.applySchemaChange(OracleDatabaseSchema.java:70) > at > io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:460) > at > io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createSchemaChangeEventsForTables$2(RelationalSnapshotChangeEventSource.java:273) > ... 10 more > Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do > not match. > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:252) > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213) > at > org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129) > ... 26 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-3910) Cyclic schema support in ConnectSchema and SchemaBuilder
[ https://issues.apache.org/jira/browse/KAFKA-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-3910: Assignee: Chris Egerton (was: Shikhar Bhushan) > Cyclic schema support in ConnectSchema and SchemaBuilder > > > Key: KAFKA-3910 > URL: https://issues.apache.org/jira/browse/KAFKA-3910 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: John Hofman >Assignee: Chris Egerton >Priority: Major > > Cyclic schema's are not supported by ConnectSchema or SchemaBuilder. > Subsequently the AvroConverter (confluentinc/schema-registry) hits a stack > overflow when converting a cyclic avro schema, e.g: > {code} > {"type":"record", > "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]} > {code} > This is a blocking issue for all connectors running on the connect framework > with data containing cyclic references. The AvroConverter cannot support > cyclic schema's until the underlying ConnectSchema and SchemaBuilder do. > To reproduce the stack-overflow (Confluent-3.0.0): > Produce some cyclic data: > {code} > bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test > --property value.schema='{"type":"record", > "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}' > {"value":1,"next":null} > {"value":1,"next":{"list":{"value":2,"next":null}}} > {code} > Then try to consume it with connect: > {code:title=connect-console-sink.properties} > name=local-console-sink > connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector > tasks.max=1 > topics=test > {code} > {code} > ./bin/connect-standalone > ./etc/schema-registry/connect-avro-standalone.properties > connect-console-sink.properties > … start up logging … > java.lang.StackOverflowError > at org.apache.avro.JsonProperties.getJsonProp(JsonProperties.java:54) > at org.apache.avro.JsonProperties.getProp(JsonProperties.java:45) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1055) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values
C0urante commented on PR #10566: URL: https://github.com/apache/kafka/pull/10566#issuecomment-1478578470 Opened https://github.com/apache/kafka/pull/13433 to address this (and cyclic schema support) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13433: KAFKA-12694, KAFKA-3910: Add cyclic schema support, fix default struct values
C0urante commented on PR #13433: URL: https://github.com/apache/kafka/pull/13433#issuecomment-1478577830 cc @urbandan; I've tweaked this a bit (stopped tracking equivalent schemas while comparing default values since that was actually doing nothing and AFAICT is unnecessary, and copied the equivalent schemas map before making any modifications). LMK what you think! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #13433: KAFKA-12694, KAFKA-3910: Add cyclic schema support, fix default struct values
C0urante opened a new pull request, #13433: URL: https://github.com/apache/kafka/pull/13433 - [Jira (cyclic schema support)](https://issues.apache.org/jira/browse/KAFKA-3910) - [Jira (default value for struct schemas)](https://issues.apache.org/jira/browse/KAFKA-12694) Implemented in response to discussion on https://github.com/apache/kafka/pull/10566, where it became apparent that to fully support default values for struct schemas, we would also need to be able to support cyclical schemas. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dimitarndimitrov opened a new pull request, #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver
dimitarndimitrov opened a new pull request, #13432: URL: https://github.com/apache/kafka/pull/13432 Aside from consistency with the implementation of newer APIs this also provides more resilient retries (e.g. in case of TimeoutExceptions) and simplifies the code to some extent. One notable detail is the addition of unsupported version handling in the failure callback in the `AdminApiDriver` and as a new callback in the `AdminApiHandler`. The handler callback is invoked in case an `UnsupportedVersionException` is caught by the driver and returns a flag indicating whether the exception could be handled (like the similar callback in `Call`). - It also seemed prudent to signal whether the exception was caught in the fulfillment or the lookup stage (the latter case could then be delegated to the lookup strategy), but I don't know if there are actual use-cases for that. The change has been tested manually and with a new `KafkaAdminClientTest`. The retry aspect also unexpectedly stalled a Connect `AdminClientUnitTestEnv` test due to retrying after a timeout where the test expected an immediate failure. Also a JMH benchmark for the deleted equivalent of the fulfillment stage of the API has been deleted and not replaced as I wasn't too sure whether I can recreate it after the `AdminApiDriver` refactoring. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest
C0urante commented on code in PR #13383: URL: https://github.com/apache/kafka/pull/13383#discussion_r1143902556 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ## @@ -315,33 +305,28 @@ public void testPause() throws Exception { taskFuture.get(); -PowerMock.verifyAll(); +verifyCleanStartup(); +verifyTaskGetTopic(10); Review Comment: Should this use `count.get()` instead of `10`, since it's possible that the task may have been polled extra times between when line 293 (`assertTrue(awaitLatch(pollLatch));`) completed and when the task finished pausing? ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ## @@ -832,166 +746,117 @@ private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throw // Note that we stub these to allow any number of calls because the thread will continue to // run. The count passed in + latch returned just makes sure we get *at least* that number of // calls -EasyMock.expect(sourceTask.poll()) -.andStubAnswer(() -> { -count.incrementAndGet(); -latch.countDown(); -Thread.sleep(10); -return RECORDS; -}); +doAnswer((Answer>) invocation -> { +count.incrementAndGet(); +latch.countDown(); +Thread.sleep(10); +return RECORDS; +}).when(sourceTask).poll(); + // Fallout of the poll() call -expectSendRecordAnyTimes(); +expectSendRecord(); return latch; } private CountDownLatch expectPolls(int count) throws InterruptedException { return expectPolls(count, new AtomicInteger()); } -@SuppressWarnings("unchecked") -private void expectSendRecordSyncFailure(Throwable error) { -expectConvertHeadersAndKeyValue(false); -expectApplyTransformationChain(false); - -EasyMock.expect( -producer.send(EasyMock.anyObject(ProducerRecord.class), - EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class))) -.andThrow(error); +private void expectSendRecord() throws InterruptedException { +expectSendRecordTaskCommitRecordSucceed(); } -private Capture> expectSendRecordAnyTimes() throws InterruptedException { -return expectSendRecordTaskCommitRecordSucceed(true); +// +private void expectSendRecordProducerCallbackFail() throws InterruptedException { +expectSendRecord(TOPIC, false, false, true, emptyHeaders()); } -private Capture> expectSendRecordOnce() throws InterruptedException { -return expectSendRecordTaskCommitRecordSucceed(false); +private void expectSendRecordTaskCommitRecordSucceed() throws InterruptedException { +expectSendRecord(TOPIC, true, true, true, emptyHeaders()); } -private Capture> expectSendRecordProducerCallbackFail() throws InterruptedException { -return expectSendRecord(TOPIC, false, false, false, true, emptyHeaders()); -} - -private Capture> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws InterruptedException { -return expectSendRecord(TOPIC, anyTimes, true, true, true, emptyHeaders()); -} - -private Capture> expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws InterruptedException { -return expectSendRecord(TOPIC, anyTimes, true, false, true, emptyHeaders()); -} - -private Capture> expectSendRecord( -String topic, -boolean anyTimes, -boolean sendSuccess, -boolean commitSuccess, -boolean isMockedConverters, -Headers headers +private void expectSendRecord( +String topic, +boolean sendSuccess, +boolean commitSuccess, +boolean isMockedConverters, +Headers headers ) throws InterruptedException { if (isMockedConverters) { -expectConvertHeadersAndKeyValue(topic, anyTimes, headers); +expectConvertHeadersAndKeyValue(topic, headers); } -expectApplyTransformationChain(anyTimes); - -Capture> sent = EasyMock.newCapture(); - -// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work -IExpectationSetters> expect = EasyMock.expect( -producer.send(EasyMock.capture(sent), -EasyMock.capture(producerCallbacks))); -IAnswer> expectResponse = () -> { -synchronized (producerCallbacks) { -for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { -if (sendSuccess) { -cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, -
[jira] [Created] (KAFKA-14831) Illegal state errors should be fatal in transactional producer
Jason Gustafson created KAFKA-14831: --- Summary: Illegal state errors should be fatal in transactional producer Key: KAFKA-14831 URL: https://issues.apache.org/jira/browse/KAFKA-14831 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson In KAFKA-14830, the producer hit an illegal state error. The error was propagated to the `Sender` thread and logged, but the producer otherwise continued on. It would be better to make illegal state errors fatal since continuing to write to transactions when the internal state is inconsistent may cause incorrect and unpredictable behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14830) Illegal state error in transactional producer
Jason Gustafson created KAFKA-14830: --- Summary: Illegal state error in transactional producer Key: KAFKA-14830 URL: https://issues.apache.org/jira/browse/KAFKA-14830 Project: Kafka Issue Type: Bug Affects Versions: 3.1.2 Reporter: Jason Gustafson We have seen the following illegal state error in the producer: {code:java} [Producer clientId=client-id2, transactionalId=transactional-id] Transiting to abortable error state due to org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for topic-0:120027 ms has passed since batch creation [Producer clientId=client-id2, transactionalId=transactional-id] Transiting to abortable error state due to org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for topic-1:120026 ms has passed since batch creation [Producer clientId=client-id2, transactionalId=transactional-id] Aborting incomplete transaction [Producer clientId=client-id2, transactionalId=transactional-id] Invoking InitProducerId with current producer ID and epoch ProducerIdAndEpoch(producerId=191799, epoch=0) in order to bump the epoch [Producer clientId=client-id2, transactionalId=transactional-id] ProducerId set to 191799 with epoch 1 [Producer clientId=client-id2, transactionalId=transactional-id] Transiting to abortable error state due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 4 [Producer clientId=client-id2, transactionalId=transactional-id] Transiting to abortable error state due to org.apache.kafka.common.errors.TimeoutException: The request timed out. [Producer clientId=client-id2, transactionalId=transactional-id] Uncaught error in request completion: java.lang.IllegalStateException: TransactionalId transactional-id: Invalid transition attempted from state READY to state ABORTABLE_ERROR at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1089) at org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:508) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:734) at org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:739) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:753) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634) at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562) at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) at java.base/java.lang.Thread.run(Thread.java:829) {code} The producer hits timeouts which cause it to abort an active transaction. After aborting, the producer bumps its epoch, which transitions it back to the `READY` state. Following this, there are two errors for inflight requests, which cause an illegal state transition to `ABORTABLE_ERROR`. But how could the transaction ABORT complete if there were still inflight requests? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1143891018 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -137,31 +166,21 @@ CompletableFuture sendAutoCommit(final Map t.ack(currentTimeMs)); } -// Visible for testing -Queue stagedCommits() { -return this.stagedCommits; -} -private class StagedCommit { +private class UnsentOffsetCommitRequest { private final Map offsets; private final String groupId; private final GroupState.Generation generation; private final String groupInstanceId; private final NetworkClientDelegate.FutureCompletionHandler future; -public StagedCommit(final Map offsets, -final String groupId, -final String groupInstanceId, -final GroupState.Generation generation) { +public UnsentOffsetCommitRequest(final Map offsets, + final String groupId, + final String groupInstanceId, + final GroupState.Generation generation) { this.offsets = offsets; -// if no callback is provided, DefaultOffsetCommitCallback will be used. Review Comment: I'm moving the Callback trigger entirely to the foreground thread. As you can see, the `PrototypeAsyncConsumer` invokes: ``` future.whenComplete((r, t) -> { if (t != null) { callback.onComplete(offsets, new KafkaException(t)); } else { callback.onComplete(offsets, null); } }); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1143888783 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } +private class UnsentOffsetFetchRequest extends RequestState { +public final Set requestedPartitions; +public final GroupState.Generation requestedGeneration; +public CompletableFuture> future; + +public UnsentOffsetFetchRequest(final Set partitions, +final GroupState.Generation generation, +final long retryBackoffMs) { +super(retryBackoffMs); +this.requestedPartitions = partitions; +this.requestedGeneration = generation; +this.future = new CompletableFuture<>(); +} + +public boolean sameRequest(final UnsentOffsetFetchRequest request) { +return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); +} + +public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long currentTimeMs) { +OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder( +groupState.groupId, +true, +new ArrayList<>(this.requestedPartitions), +throwOnFetchStableOffsetUnsupported); +NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( +builder, +coordinatorRequestManager.coordinator()); +unsentRequest.future().whenComplete((r, t) -> { +onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody()); +}); +return unsentRequest; +} + +public void onResponse( +final long currentTimeMs, +final OffsetFetchResponse response) { +Errors responseError = response.groupLevelError(groupState.groupId); +if (responseError != Errors.NONE) { +onFailure(currentTimeMs, responseError); +return; +} +onSuccess(currentTimeMs, response); +} + +private void onFailure(final long currentTimeMs, + final Errors responseError) { +log.debug("Offset fetch failed: {}", responseError.message()); + +// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ? +if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { +retry(currentTimeMs); +} else if (responseError == Errors.NOT_COORDINATOR) { +// re-discover the coordinator and retry + coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), Time.SYSTEM.milliseconds()); +retry(currentTimeMs); +} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId)); +} else { +future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); +} +} + +private void retry(final long currentTimeMs) { +onFailedAttempt(currentTimeMs); +onSendAttempt(currentTimeMs); +pendingRequests.addOffsetFetchRequest(this); +} + +private void onSuccess(final long currentTimeMs, + final OffsetFetchResponse response) { +Set unauthorizedTopics = null; +Map responseData = +response.partitionDataMap(groupState.groupId); +Map offsets = new HashMap<>(responseData.size()); +Set unstableTxnOffsetTopicPartitions = new HashSet<>(); +for (Map.Entry entry : responseData.entrySet()) { +TopicPartition tp = entry.getKey(); +OffsetFetchResponse.PartitionData partitionData = entry.getValue(); +if (partitionData.hasError()) { +Errors error = partitionData.error; +log.debug("Failed to fetch offset for partition {}: {}", tp, error.message()); + +if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { +future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does " + +"not " + +"exist")); +return; +} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { +if (unauthorizedTopics == null) { +unauthorizedTopics
[jira] [Comment Edited] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
[ https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703339#comment-17703339 ] Greg Harris edited comment on KAFKA-14666 at 3/21/23 7:13 PM: -- I proposed a tactical fix for this in [https://github.com/apache/kafka/pull/13429] which provides variable-precision translation between the most recent restart of MM2 and the end of the replicated topic, similar to the existing behavior pre KAFKA-12468. This uses strategy (2) from above, but limited to syncs read during a single task lifetime to get monotonicity without re-reading the checkpoint topic. A separate improvement can be considered to allow for translation of offsets prior to the latest restart of MM2, or increasing the accuracy of the translation with new configurations. was (Author: gharris1727): I proposed a tactical fix for this in [https://github.com/apache/kafka/pull/13429] which provides variable-accuracy translation between the most recent restart of MM2 and the end of the replicated topic, similar to the existing behavior pre KAFKA-12468. This uses strategy (2) from above, but limited to syncs read during a single task lifetime to get monotonicity without re-reading the checkpoint topic. A separate improvement can be considered to allow for translation of offsets prior to the latest restart of MM2, or increasing the accuracy of the translation with new configurations. > MM2 should translate consumer group offsets behind replication flow > --- > > Key: KAFKA-14666 > URL: https://issues.apache.org/jira/browse/KAFKA-14666 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.5.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > MirrorMaker2 includes an offset translation feature which can translate the > offsets for an upstream consumer group to a corresponding downstream consumer > group. It does this by keeping a topic of offset-syncs to correlate upstream > and downstream offsets, and translates any source offsets which are ahead of > the replication flow. > However, if a replication flow is closer to the end of a topic than the > consumer group, then the offset translation feature will refuse to translate > the offset for correctness reasons. This is because the MirrorCheckpointTask > only keeps the latest offset correlation between source and target, it does > not have sufficient information to translate older offsets. > The workarounds for this issue are to: > 1. Pause the replication flow occasionally to allow the source to get ahead > of MM2 > 2. Increase the offset.lag.max to delay offset syncs, increasing the window > for translation to happen. With the fix for KAFKA-12468, this will also > increase the lag of applications that are ahead of the replication flow, so > this is a tradeoff. > Instead, the MirrorCheckpointTask should provide correct and best-effort > translation for consumer groups behind the replication flow by keeping > additional state, or re-reading the offset-syncs topic. This should be a > substantial improvement for use-cases where applications have a higher > latency to commit than the replication flow, or where applications are > reading from the earliest offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1143881070 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } +private class UnsentOffsetFetchRequest extends RequestState { +public final Set requestedPartitions; +public final GroupState.Generation requestedGeneration; +public CompletableFuture> future; + +public UnsentOffsetFetchRequest(final Set partitions, +final GroupState.Generation generation, +final long retryBackoffMs) { +super(retryBackoffMs); +this.requestedPartitions = partitions; +this.requestedGeneration = generation; +this.future = new CompletableFuture<>(); +} + +public boolean sameRequest(final UnsentOffsetFetchRequest request) { +return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); +} + +public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long currentTimeMs) { +OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder( +groupState.groupId, +true, +new ArrayList<>(this.requestedPartitions), +throwOnFetchStableOffsetUnsupported); +NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( +builder, +coordinatorRequestManager.coordinator()); +unsentRequest.future().whenComplete((r, t) -> { +onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody()); +}); +return unsentRequest; +} + +public void onResponse( +final long currentTimeMs, +final OffsetFetchResponse response) { +Errors responseError = response.groupLevelError(groupState.groupId); +if (responseError != Errors.NONE) { +onFailure(currentTimeMs, responseError); +return; +} +onSuccess(currentTimeMs, response); +} + +private void onFailure(final long currentTimeMs, + final Errors responseError) { +log.debug("Offset fetch failed: {}", responseError.message()); + +// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ? +if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { +retry(currentTimeMs); +} else if (responseError == Errors.NOT_COORDINATOR) { +// re-discover the coordinator and retry + coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), Time.SYSTEM.milliseconds()); +retry(currentTimeMs); +} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId)); +} else { +future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); +} +} + +private void retry(final long currentTimeMs) { +onFailedAttempt(currentTimeMs); +onSendAttempt(currentTimeMs); +pendingRequests.addOffsetFetchRequest(this); +} + +private void onSuccess(final long currentTimeMs, + final OffsetFetchResponse response) { +Set unauthorizedTopics = null; +Map responseData = +response.partitionDataMap(groupState.groupId); +Map offsets = new HashMap<>(responseData.size()); +Set unstableTxnOffsetTopicPartitions = new HashSet<>(); +for (Map.Entry entry : responseData.entrySet()) { +TopicPartition tp = entry.getKey(); +OffsetFetchResponse.PartitionData partitionData = entry.getValue(); +if (partitionData.hasError()) { +Errors error = partitionData.error; +log.debug("Failed to fetch offset for partition {}: {}", tp, error.message()); + +if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { +future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does " + +"not " + +"exist")); +return; +} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { +if (unauthorizedTopics == null) { +unauthorizedTopics
[jira] [Comment Edited] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
[ https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703339#comment-17703339 ] Greg Harris edited comment on KAFKA-14666 at 3/21/23 7:08 PM: -- I proposed a tactical fix for this in [https://github.com/apache/kafka/pull/13429] which provides variable-accuracy translation between the most recent restart of MM2 and the end of the replicated topic, similar to the existing behavior pre KAFKA-12468. This uses strategy (2) from above, but limited to syncs read during a single task lifetime to get monotonicity without re-reading the checkpoint topic. A separate improvement can be considered to allow for translation of offsets prior to the latest restart of MM2, or increasing the accuracy of the translation with new configurations. was (Author: gharris1727): I proposed a tactical fix for this in [https://github.com/apache/kafka/pull/13429] which provides variable-accuracy translation between the most recent restart of MM2 and the end of the replicated topic, similar to the existing behavior pre KAFKA-12468. A separate improvement can be considered to allow for translation of offsets prior to the latest restart of MM2, or increasing the accuracy of the translation with new configurations. > MM2 should translate consumer group offsets behind replication flow > --- > > Key: KAFKA-14666 > URL: https://issues.apache.org/jira/browse/KAFKA-14666 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.5.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > MirrorMaker2 includes an offset translation feature which can translate the > offsets for an upstream consumer group to a corresponding downstream consumer > group. It does this by keeping a topic of offset-syncs to correlate upstream > and downstream offsets, and translates any source offsets which are ahead of > the replication flow. > However, if a replication flow is closer to the end of a topic than the > consumer group, then the offset translation feature will refuse to translate > the offset for correctness reasons. This is because the MirrorCheckpointTask > only keeps the latest offset correlation between source and target, it does > not have sufficient information to translate older offsets. > The workarounds for this issue are to: > 1. Pause the replication flow occasionally to allow the source to get ahead > of MM2 > 2. Increase the offset.lag.max to delay offset syncs, increasing the window > for translation to happen. With the fix for KAFKA-12468, this will also > increase the lag of applications that are ahead of the replication flow, so > this is a tradeoff. > Instead, the MirrorCheckpointTask should provide correct and best-effort > translation for consumer groups behind the replication flow by keeping > additional state, or re-reading the offset-syncs topic. This should be a > substantial improvement for use-cases where applications have a higher > latency to commit than the replication flow, or where applications are > reading from the earliest offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
[ https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703339#comment-17703339 ] Greg Harris commented on KAFKA-14666: - I proposed a tactical fix for this in [https://github.com/apache/kafka/pull/13429] which provides variable-accuracy translation between the most recent restart of MM2 and the end of the replicated topic, similar to the existing behavior pre KAFKA-12468. A separate improvement can be considered to allow for translation of offsets prior to the latest restart of MM2, or increasing the accuracy of the translation with new configurations. > MM2 should translate consumer group offsets behind replication flow > --- > > Key: KAFKA-14666 > URL: https://issues.apache.org/jira/browse/KAFKA-14666 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.5.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > MirrorMaker2 includes an offset translation feature which can translate the > offsets for an upstream consumer group to a corresponding downstream consumer > group. It does this by keeping a topic of offset-syncs to correlate upstream > and downstream offsets, and translates any source offsets which are ahead of > the replication flow. > However, if a replication flow is closer to the end of a topic than the > consumer group, then the offset translation feature will refuse to translate > the offset for correctness reasons. This is because the MirrorCheckpointTask > only keeps the latest offset correlation between source and target, it does > not have sufficient information to translate older offsets. > The workarounds for this issue are to: > 1. Pause the replication flow occasionally to allow the source to get ahead > of MM2 > 2. Increase the offset.lag.max to delay offset syncs, increasing the window > for translation to happen. With the fix for KAFKA-12468, this will also > increase the lag of applications that are ahead of the replication flow, so > this is a tradeoff. > Instead, the MirrorCheckpointTask should provide correct and best-effort > translation for consumer groups behind the replication flow by keeping > additional state, or re-reading the offset-syncs topic. This should be a > substantial improvement for use-cases where applications have a higher > latency to commit than the replication flow, or where applications are > reading from the earliest offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13375: KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode
C0urante commented on code in PR #13375: URL: https://github.com/apache/kafka/pull/13375#discussion_r1143840978 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ## @@ -152,11 +154,6 @@ public void stop() { connectCluster.forEach(this::stopWorker); try { kafkaCluster.stop(); -} catch (UngracefulShutdownException e) { -log.warn("Kafka did not shutdown gracefully"); Review Comment: Is this part no longer necessary? ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -241,7 +245,7 @@ public KafkaClusterTestKit build() throws Exception { bootstrapMetadata); } catch (Throwable e) { log.error("Error creating controller {}", node.id(), e); -Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", () -> sharedServer.stopForController()); +Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController); Review Comment: I <3 method references ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -195,7 +197,22 @@ public void testRestartFailedTask() throws Exception { public void testBrokerCoordinator() throws Exception { ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000)); -connect = connectBuilder.workerProps(workerProps).build(); +Properties brokerProps = new Properties(); + +// Find a free port and use it in the Kafka broker's listeners config. We can't use port 0 in the listeners +// config to get a random free port because in this test we want to stop the Kafka broker and then bring it +// back up and listening on the same port in order to verify that the Connect cluster can re-connect to Kafka +// and continue functioning normally. If we were to use port 0 here, the Kafka broker would most likely listen +// on a different random free port the second time it is started. Review Comment: Any reason we're not preserving the `startOnlyKafkaOnSamePorts` method for this type of use case? I see that this change is mentioned in the description, but don't see a rationale for it. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -106,9 +106,6 @@ public void testSingleNodeCluster() throws Exception { EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps); EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps); -clusterA.start(); -clusterB.start(); Review Comment: 🤦 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -174,13 +174,17 @@ private KafkaConfig createNodeConfig(TestKitNode node) { props.put(KafkaConfig$.MODULE$.LogDirsProp(), String.join(",", brokerNode.logDataDirectories())); } -props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), -"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); -props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id())); -props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), -nodes.interBrokerListenerName().value()); -props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), -"CONTROLLER"); + +// listeners could be defined via Builder::setConfigProp which shouldn't be overridden +if (!props.containsKey(KafkaConfig$.MODULE$.ListenersProp())) { Review Comment: I wonder if this has the potential to be a footgun. We're gating the default values of several properties on the presence of a single one; could it confuse people to set just the `listeners` property and then see failures because the previously-used defaults for the protocol map, controller listener, etc. weren't set? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1143865476 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -113,15 +112,45 @@ private void maybeAutoCommit(final long currentTimeMs) { } Map allConsumedOffsets = subscriptionState.allConsumed(); -log.debug("Auto-committing offsets {}", allConsumedOffsets); sendAutoCommit(allConsumedOffsets); autocommit.resetTimer(); +} + +/** + * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an + * {@link UnsentOffsetCommitRequest} and enqueue it to send later. + */ +public CompletableFuture addOffsetCommitRequest(final Map offsets) { +return pendingRequests.addOffsetCommitRequest(offsets); +} + +/** + * Handles {@link org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}. It creates an + * {@link UnsentOffsetFetchRequest} and enqueue it to send later. + */ +public CompletableFuture> addOffsetFetchRequest(final Set partitions) { +return pendingRequests.addOffsetFetchRequest(partitions); +} + +public void clientPoll(final long currentTimeMs) { +this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs)); +} + +// Visible for testing +List unsentOffsetFetchRequests() { +return pendingRequests.unsentOffsetFetches; +} + +// Visible for testing +Queue unsentOffsetCommitRequests() { +return pendingRequests.unsentOffsetCommits; } // Visible for testing CompletableFuture sendAutoCommit(final Map allConsumedOffsets) { -CompletableFuture future = this.add(allConsumedOffsets) +log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets); +return this.addOffsetCommitRequest(allConsumedOffsets) Review Comment: I think you are right, we could just use the pendingQueue to handle the inflight auto commit. Thanks for spotting this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
[ https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-14666: --- Assignee: Greg Harris > MM2 should translate consumer group offsets behind replication flow > --- > > Key: KAFKA-14666 > URL: https://issues.apache.org/jira/browse/KAFKA-14666 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.5.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > MirrorMaker2 includes an offset translation feature which can translate the > offsets for an upstream consumer group to a corresponding downstream consumer > group. It does this by keeping a topic of offset-syncs to correlate upstream > and downstream offsets, and translates any source offsets which are ahead of > the replication flow. > However, if a replication flow is closer to the end of a topic than the > consumer group, then the offset translation feature will refuse to translate > the offset for correctness reasons. This is because the MirrorCheckpointTask > only keeps the latest offset correlation between source and target, it does > not have sufficient information to translate older offsets. > The workarounds for this issue are to: > 1. Pause the replication flow occasionally to allow the source to get ahead > of MM2 > 2. Increase the offset.lag.max to delay offset syncs, increasing the window > for translation to happen. With the fix for KAFKA-12468, this will also > increase the lag of applications that are ahead of the replication flow, so > this is a tradeoff. > Instead, the MirrorCheckpointTask should provide correct and best-effort > translation for consumer groups behind the replication flow by keeping > additional state, or re-reading the offset-syncs topic. This should be a > substantial improvement for use-cases where applications have a higher > latency to commit than the replication flow, or where applications are > reading from the earliest offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1143865476 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -113,15 +112,45 @@ private void maybeAutoCommit(final long currentTimeMs) { } Map allConsumedOffsets = subscriptionState.allConsumed(); -log.debug("Auto-committing offsets {}", allConsumedOffsets); sendAutoCommit(allConsumedOffsets); autocommit.resetTimer(); +} + +/** + * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an + * {@link UnsentOffsetCommitRequest} and enqueue it to send later. + */ +public CompletableFuture addOffsetCommitRequest(final Map offsets) { +return pendingRequests.addOffsetCommitRequest(offsets); +} + +/** + * Handles {@link org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}. It creates an + * {@link UnsentOffsetFetchRequest} and enqueue it to send later. + */ +public CompletableFuture> addOffsetFetchRequest(final Set partitions) { +return pendingRequests.addOffsetFetchRequest(partitions); +} + +public void clientPoll(final long currentTimeMs) { +this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs)); +} + +// Visible for testing +List unsentOffsetFetchRequests() { +return pendingRequests.unsentOffsetFetches; +} + +// Visible for testing +Queue unsentOffsetCommitRequests() { +return pendingRequests.unsentOffsetCommits; } // Visible for testing CompletableFuture sendAutoCommit(final Map allConsumedOffsets) { -CompletableFuture future = this.add(allConsumedOffsets) +log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets); +return this.addOffsetCommitRequest(allConsumedOffsets) Review Comment: I think you are right, we could just use the pendingQueue to handle the inflight auto commit. Thanks for spotting this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13431: KAFKA-14491: [19/N] Combine versioned store RocksDB instances into one
vcrfxia commented on code in PR #13431: URL: https://github.com/apache/kafka/pull/13431#discussion_r1143859781 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -821,4 +823,20 @@ public Options getOptions() { public Position getPosition() { return position; } + +/** + * Same as {@link Bytes#increment(Bytes)} but {@code null} is returned instead of throwing + * {@code IndexOutOfBoundsException} in the event of overflow. + * + * @param input bytes to increment + * @return A new copy of the incremented byte array, or {@code null} if incrementing would + * result in overflow. + */ +static Bytes incrementWithoutOverflow(final Bytes input) { Review Comment: Now that negative segment ID is allowed (only for reserved segments within `LogicalKeyValueSegments`), it's possible that a valid segment ID will overflow when incremented, and we want to handle this gracefully instead of throwing an exception when it happens. ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java: ## @@ -16,15 +16,33 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.HashMap; +import java.util.Map; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +/** + * A {@link Segments} implementation which uses a single underlying RocksDB instance. + * Regular segments with {@code segmentId >= 0} expire according to the specified + * retention period. "Reserved" segments with {@code segmentId < 0} do not expire + * and are completely separate from regular segments in that methods such as + * {@link #getSegmentForTimestamp(long)}, {@link #getOrCreateSegment(long, ProcessorContext)}, + * {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)}, + * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)} + * only return regular segments and not reserved segments. The methods {@link #flush()} + * and {@link #close()} flush and close both regular and reserved segments, due to + * the fact that both types of segments share the same physical RocksDB instance. + * To create a reserved segment, use {@link #createReservedSegment(long, String)} instead. + */ public class LogicalKeyValueSegments extends AbstractSegments { private final RocksDBMetricsRecorder metricsRecorder; private final RocksDBStore physicalStore; +// reserved segments do not expire, and are tracked here separately from regular segments +private final Map reservedSegments = new HashMap<>(); Review Comment: Only one reserved segment is needed for the versioned store implementation (used for the latest value store) but I've chosen to implement support for reserved segments more generally in this class. If we think this is unnecessarily complex, I can update this to be a single `Optional` (or equivalent) instead. ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -124,7 +124,6 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS protected Position position; private OffsetCheckpoint positionCheckpoint; -// VisibleForTesting Review Comment: This cleanup is unrelated to the changes in this PR -- I happened to notice that this method is labeled as visible for testing but is actually called in a number of places from production code, so I've removed the misleading comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #13430: MINOR; Increase log level of some rare events
hachikuji commented on PR #13430: URL: https://github.com/apache/kafka/pull/13430#issuecomment-1478422374 > Okay. I'll revert the change to the send request backoff but I think we should keep the candidate backoff as that is "rare" and would help us debug election delays. What do you think? Yeah, that one is probably fine. One message per election attempt seems ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia opened a new pull request, #13431: KAFKA-14491: [19/N] Combine versioned store RocksDB instances into one
vcrfxia opened a new pull request, #13431: URL: https://github.com/apache/kafka/pull/13431 The RocksDB-based versioned store implementation introduced in https://github.com/apache/kafka/pull/13188 currently uses two physical RocksDB instances per store instance: one for the "latest value store" and another for the "segments store." This PR combines those two RocksDB instances into one by representing the latest value store as a special "reserved" segment within the segments store. This reserved segment has segment ID -1, is never expired, and is not included in the regular `Segments` methods for getting or creating segments, but is represented in the physical RocksDB instance the same way as any other segment. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13430: MINOR; Increase log level of some rare events
jsancio commented on PR #13430: URL: https://github.com/apache/kafka/pull/13430#issuecomment-1478413634 > Upgrading the error messages is probably fair, but do we really need the backoff messages at INFO level? Seems like they have potential to be noisy. Okay. I'll revert the change to the send request backoff but I think we should keep the candidate backoff as that is "rare" and would help us debug election delays. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #13430: MINOR; Increase log level of some rare events
hachikuji commented on PR #13430: URL: https://github.com/apache/kafka/pull/13430#issuecomment-1478403979 Upgrading the error messages is probably fair, but do we really need the backoff messages at INFO level? Seems like they have potential to be noisy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #13430: MINOR; Increase log level of some rare events
jsancio opened a new pull request, #13430: URL: https://github.com/apache/kafka/pull/13430 To help debug KRaft's behavior this change increases the log level of some rare messages to INFO level. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13426: KAFKA-14814: Skip target state updates when the configs store has sam…
C0urante commented on PR #13426: URL: https://github.com/apache/kafka/pull/13426#issuecomment-1478385471 @yashmayya that's my bad, it does appear that you're correct and that the `Connector` won't be restarted, but a request for new task configs will take place. IMO this change still has some value since, at the very least, it leads to cleaner logs when redundant requests are made. But if we think this isn't worth the implementation complexity and testing efforts, we don't have to merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14740) Missing source tag on MirrorSource metrics
[ https://issues.apache.org/jira/browse/KAFKA-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-14740. Fix Version/s: 3.5.0 Resolution: Fixed > Missing source tag on MirrorSource metrics > -- > > Key: KAFKA-14740 > URL: https://issues.apache.org/jira/browse/KAFKA-14740 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > Fix For: 3.5.0 > > > The metrics defined in MirrorSourceMetrics have the following tags "target", > "topic", "partition". It would be good to also have a "source" tag with the > source cluster alias. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison merged pull request #13420: KAFKA-14740: Add source tag to MirrorSourceMetrics - KIP-911
mimaison merged PR #13420: URL: https://github.com/apache/kafka/pull/13420 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13420: KAFKA-14740: Add source tag to MirrorSourceMetrics - KIP-911
mimaison commented on PR #13420: URL: https://github.com/apache/kafka/pull/13420#issuecomment-1478373825 I've now closed the voting thread. [Test failures](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-13420/1/testReport/) are not related, merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)
C0urante commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1143650777 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ## @@ -261,6 +326,78 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } +@Test +public void testIncrementalAlterConfigsRequested() throws Exception { +MockAdminClient admin = spy(new MockAdminClient()); +MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), +new DefaultReplicationPolicy(), MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIG, new DefaultConfigPropertyFilter(), admin)); +final String topic = "testtopic"; +List entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); +Config config = new Config(entries); +doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); +doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); +doNothing().when(connector).deprecatedAlterConfigs(any()); +connector.syncTopicConfigs(); +Map topicConfigs = Collections.singletonMap("source." + topic, config); +verify(connector).incrementalAlterConfigs(topicConfigs); + +// the next time we sync topic configurations, expect to use the deprecated API +connector.syncTopicConfigs(); +verify(connector, times(2)).syncTopicConfigs(); Review Comment: There's no need to verify these invocations since we make both of them in this test case ourselves. (This applies to other test cases as well, such as `testIncrementalAlterConfigsRequired`.) ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -514,6 +541,40 @@ private void updateTopicConfigs(Map topicConfigs) { })); } +// visible for testing +@SuppressWarnings("deprecation") Review Comment: This part can be removed now (🎉) ```suggestion ``` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -540,10 +601,13 @@ Map describeTopicConfigs(Set topics) Config targetConfig(Config sourceConfig) { List entries = sourceConfig.entries().stream() -.filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) -.filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) -.filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) -.collect(Collectors.toList()); +.filter(x -> (!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) +|| (x.isDefault() && shouldReplicateSourceDefault(x.name( +|| (!x.isDefault() && useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG))) +.filter(x -> !x.isReadOnly() && !x.isSensitive()) +.filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) +.filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) Review Comment: This double-checks default values, which I don't believe is correct (we should rely solely on `ConfigPropertyFilter::shouldReplicateSourceDefault` for those). Perhaps all of the logic can be moved to the `shouldReplicateTopicConfigurationProperty` method, including determining whether the property is a default or not (and how to handle that accordingly)? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ## @@ -30,6 +30,9 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter { public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude"; public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist"; +public static final String USE_DEFAULTS_FROM = "use.defaults.from"; +private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's defaults to use when syncing topic configurations."; +private static final String USE_DEFAULTS_FROM_DEFAULT = "target"; private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic configuration properties and/or regexes " + "that should not be replicated."; Review Comment: Can we add a note that this will also apply to properties with default values if `use.defaults.from` is set to `source`? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourc
[GitHub] [kafka] C0urante commented on a diff in pull request #13426: KAFKA-14814: Skip target state updates when the configs store has sam…
C0urante commented on code in PR #13426: URL: https://github.com/apache/kafka/pull/13426#discussion_r1143794237 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java: ## @@ -889,6 +890,51 @@ public void testBackgroundUpdateTargetState() throws Exception { PowerMock.verifyAll(); } +@Test +public void testSameTargetState() throws Exception { +// verify that we handle target state changes correctly when they come up through the log + +expectConfigure(); +List> existingRecords = Arrays.asList( +new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), +CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), +new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), +CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), +new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), +CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), +new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), +CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); +LinkedHashMap deserialized = new LinkedHashMap<>(); +deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); +deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); +deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); +deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); +logOffset = 5; + +expectStart(existingRecords, deserialized); + +// on resume update listener isn't called Review Comment: We don't have any explicit expectations or checks in this test that the config update listener is never called, but the test would still fail if it were invoked when we call `PowerMock::verifyFall`. Is that correct? If so, is there any way we can make that logic explicit? I'm worried someone might migrate this test over to Mockito (which has nice mocks by default) and accidentally cause this test to start spuriously passing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores
vcrfxia commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1143766313 ## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ## @@ -302,7 +319,54 @@ public void shouldAllowCustomIQv2ForCustomStoreImplementations() { .withPartitions(Collections.singleton(0)); final StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); -assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); +assertThat(result.getOnlyPartitionResult().getResult(), equalTo("success")); +} + +@Test +public void shouldCreateGlobalTable() throws Exception { +// produce data to global store topic and track in-memory for processor to verify +final DataTracker data = new DataTracker(); +produceDataToTopic(globalTableTopic, data, baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); +produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); +produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + +// build topology and start app +final StreamsBuilder streamsBuilder = new StreamsBuilder(); + +streamsBuilder +.globalTable( +globalTableTopic, +Consumed.with(Serdes.Integer(), Serdes.String()), +Materialized +.as(new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION)) +.withKeySerde(Serdes.Integer()) +.withValueSerde(Serdes.String()) +); +streamsBuilder +.stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) +.process(() -> new VersionedStoreContentCheckerProcessor(false, data)) +.to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + +final Properties props = props(); +kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); +kafkaStreams.start(); + +// produce source data to trigger store verifications in processor +int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp + 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8")); + +// wait for output and verify +final List> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( +TestUtils.consumerConfig( +CLUSTER.bootstrapServers(), +IntegerDeserializer.class, +IntegerDeserializer.class), +outputStream, +numRecordsProduced); + +for (final KeyValue receivedRecord : receivedRecords) { +// verify zero failed checks for each record +assertThat(receivedRecord.value, equalTo(0)); Review Comment: > I was referring to this comment: https://github.com/apache/kafka/pull/13340#discussion_r1128550162 I see. My previous comment was about whether we can have a single processor write to the store and also read from it in order to verify its contents after writing. While it is possible to write to a global store from a processor (via `addGlobalStore()`), it is not possible to have that processor also write verification results downstream (the signature of that processor always returns `void` from its `process()` method). So no matter what we need to separate the verification logic from the logic that writes to the store. That's what I was trying to say earlier, but in an abbreviated/confusing way. Sounds like we're on the same page now regarding what the processor which reads from the store (and writes verification results downstream) is doing, so we should be good 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14585) Move StorageTool to tools
[ https://issues.apache.org/jira/browse/KAFKA-14585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703309#comment-17703309 ] Muralidhar Basani commented on KAFKA-14585: --- [~mimaison] I think am done with the changes. I noticed 'formatCommand' method is invoked from other core classes which are not part of this change. Hence that method is also copied to ToolsUtils. Can you pls take a look at the PR ? > Move StorageTool to tools > - > > Key: KAFKA-14585 > URL: https://issues.apache.org/jira/browse/KAFKA-14585 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Muralidhar Basani >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1143739168 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } +private class UnsentOffsetFetchRequest extends RequestState { Review Comment: yap that sounds good! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1143738304 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } +private class UnsentOffsetFetchRequest extends RequestState { +public final Set requestedPartitions; +public final GroupState.Generation requestedGeneration; +public CompletableFuture> future; + +public UnsentOffsetFetchRequest(final Set partitions, +final GroupState.Generation generation, +final long retryBackoffMs) { +super(retryBackoffMs); +this.requestedPartitions = partitions; +this.requestedGeneration = generation; +this.future = new CompletableFuture<>(); +} + +public boolean sameRequest(final UnsentOffsetFetchRequest request) { +return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); +} + +public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long currentTimeMs) { +OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder( +groupState.groupId, +true, +new ArrayList<>(this.requestedPartitions), +throwOnFetchStableOffsetUnsupported); +NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( +builder, +coordinatorRequestManager.coordinator()); +unsentRequest.future().whenComplete((r, t) -> { +onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody()); +}); +return unsentRequest; +} + +public void onResponse( +final long currentTimeMs, +final OffsetFetchResponse response) { +Errors responseError = response.groupLevelError(groupState.groupId); +if (responseError != Errors.NONE) { +onFailure(currentTimeMs, responseError); +return; +} +onSuccess(currentTimeMs, response); +} + +private void onFailure(final long currentTimeMs, + final Errors responseError) { +log.debug("Offset fetch failed: {}", responseError.message()); + +// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ? +if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { +retry(currentTimeMs); +} else if (responseError == Errors.NOT_COORDINATOR) { +// re-discover the coordinator and retry + coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), Time.SYSTEM.milliseconds()); +retry(currentTimeMs); +} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId)); +} else { +future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); +} +} + +private void retry(final long currentTimeMs) { +onFailedAttempt(currentTimeMs); +onSendAttempt(currentTimeMs); +pendingRequests.addOffsetFetchRequest(this); +} + +private void onSuccess(final long currentTimeMs, + final OffsetFetchResponse response) { +Set unauthorizedTopics = null; +Map responseData = +response.partitionDataMap(groupState.groupId); +Map offsets = new HashMap<>(responseData.size()); +Set unstableTxnOffsetTopicPartitions = new HashSet<>(); +for (Map.Entry entry : responseData.entrySet()) { +TopicPartition tp = entry.getKey(); +OffsetFetchResponse.PartitionData partitionData = entry.getValue(); +if (partitionData.hasError()) { +Errors error = partitionData.error; +log.debug("Failed to fetch offset for partition {}: {}", tp, error.message()); + +if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { +future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does " + +"not " + +"exist")); +return; +} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { +if (unauthorizedTopics == null) { +unauthorizedTopics
[jira] [Resolved] (KAFKA-8713) [Connect] JsonConverter NULL Values are replaced by default values even in NULLABLE fields
[ https://issues.apache.org/jira/browse/KAFKA-8713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-8713. --- Fix Version/s: 3.5.0 Assignee: Mickael Maison Resolution: Fixed > [Connect] JsonConverter NULL Values are replaced by default values even in > NULLABLE fields > -- > > Key: KAFKA-8713 > URL: https://issues.apache.org/jira/browse/KAFKA-8713 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.3.0, 2.2.1 >Reporter: Cheng Pan >Assignee: Mickael Maison >Priority: Major > Labels: needs-kip > Fix For: 3.5.0 > > > Class JsonConverter line: 582 > {code:java} > private static JsonNode convertToJson(Schema schema, Object logicalValue) > { > if (logicalValue == null) { > if (schema == null) // Any schema is valid and we don't have a > default, so treat this as an optional schema > return null; > if (schema.defaultValue() != null) > return convertToJson(schema, schema.defaultValue()); > if (schema.isOptional()) > return JsonNodeFactory.instance.nullNode(); > throw new DataException("Conversion error: null value for field > that is required and has no default value"); > } > > } > {code} > h1.Expect: > Value `null` is valid for an optional filed, even though the filed has a > default value. > Only when field is required, the converter return default value fallback > when value is `null`. > h1.Actual: > Always return default value if `null` was given. > h1. Example: > I'm not sure if the current behavior is the exactly expected, but at least on > MySQL, a table define as > {code:sql} > create table t1 { >name varchar(40) not null, >create_time datetime default '1999-01-01 11:11:11' null, >update_time datetime default '1999-01-01 11:11:11' null > } > {code} > Just insert a record: > {code:sql} > INSERT INTO `t1` (`name`, `update_time`) VALUES ('kafka', null); > {code} > The result is: > {code:json} > { > "name": "kafka", > "create_time": "1999-01-01 11:11:11", > "update_time": null > } > {code} > But when I use debezium pull binlog and send the record to Kafka with > JsonConverter, the result changed to: > {code:json} > { > "name": "kafka", > "create_time": "1999-01-01 11:11:11", > "update_time": "1999-01-01 11:11:11" > } > {code} > For more details, see: https://issues.jboss.org/browse/DBZ-1064 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison merged pull request #13419: KAFKA-8713: Allow using null for field in JsonConverter (KIP-581)
mimaison merged PR #13419: URL: https://github.com/apache/kafka/pull/13419 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mukkachaitanya commented on pull request #13426: KAFKA-14814: Skip target state updates when the configs store has sam…
mukkachaitanya commented on PR #13426: URL: https://github.com/apache/kafka/pull/13426#issuecomment-1478255352 Hi, @yashmayya / @C0urante let me know if you folks can take a stab at reviewing the PR. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13419: KAFKA-8713: Allow using null for field in JsonConverter (KIP-581)
mimaison commented on PR #13419: URL: https://github.com/apache/kafka/pull/13419#issuecomment-1478251287 Thanks for the review @C0urante ! I've now closed the voting thread. Even though they are not available here, tests have [run](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-13419/1/) in the CI, and none of the failure are related. Merging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #13428: MINOR: Refactor Mirror integration tests to reduce duplication
gharris1727 opened a new pull request, #13428: URL: https://github.com/apache/kafka/pull/13428 The IdentityReplicationIntegrationTest nontrivially overrides two tests from the Base test. Most of the implementation is copied, but modifications to the copy change assertions that have to do with the backup -> primary mirror flow. Instead of having the Base::testReplication which is active-active, and the Identity::testReplication which is primary->backup only, use a flag to control the assertions for the two variants of the test, so that most of the code can be deduplicated. Additionally, utilize the remotTopicName test function to derive target topic names programmatically, instead of hardcoding them in the two variants of the test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13382: KAFKA-14722: Make BooleanSerde public
wcarlson5 commented on code in PR #13382: URL: https://github.com/apache/kafka/pull/13382#discussion_r1143691894 ## streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java: ## @@ -48,42 +45,4 @@ public NullableValueAndTimestampSerde(final Serde valueSerde) { ); } -static final class BooleanSerde { Review Comment: Are there tests for this code that need to cleaned up? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors
mimaison commented on code in PR #13424: URL: https://github.com/apache/kafka/pull/13424#discussion_r1143576130 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java: ## @@ -103,6 +103,9 @@ public List> taskConfigs(int maxTasks) { public void stop() { log.info("Stopped {} connector {}", this.getClass().getSimpleName(), connectorName); connectorHandle.recordConnectorStop(); +if (Boolean.parseBoolean(commonConfigs.getOrDefault("connector.stop.inject.error", "false"))) { +throw new RuntimeException("Injecting errors during connector start"); Review Comment: `during connector start` -> `during connector stop` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java: ## @@ -245,6 +245,14 @@ default void validateConnectorConfig(Map connectorConfig, Callba */ void restartConnectorAndTasks(RestartRequest request, Callback cb); +/** + * Stop the conector. This call will asynchronously suspend processing by the connector and all Review Comment: `connector` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1090,6 +1090,39 @@ public void putConnectorConfig(final String connName, final Map ); } +@Override +public void stopConnector(final String connName, final Callback callback) { +log.trace("Submitting request to transition connector {} to STOPPED state", connName); + +addRequest( +() -> { +refreshConfigSnapshot(workerSyncTimeoutMs); +if (!configState.contains(connName)) +throw new NotFoundException("Unknown connector " + connName); + +// We only allow the leader to handle this request since it involves writing task configs to the config topic +if (!isLeader()) { +callback.onCompletion(new NotLeaderException("Only the leader can transition connectors to the STOPPED state.", leaderUrl()), null); +return null; +} + +// TODO: We may want to add a new ConfigBackingStore method for stopping a connector so that Review Comment: Should we create a ticket for this TODO? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ## @@ -221,28 +223,44 @@ public boolean isRunning() { } @SuppressWarnings("fallthrough") -private void pause() { +private void stop(boolean paused) { +State newState = paused ? State.PAUSED : State.STOPPED; try { -switch (state) { -case STOPPED: -return; +if ((state == State.STOPPED || state == State.PAUSED) && state == newState) { Review Comment: As `newState` is either `PAUSED` or `STOPPED`, would `state == newState` be enough here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)
C0urante commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1143623680 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -514,6 +540,37 @@ private void updateTopicConfigs(Map topicConfigs) { })); } +// visible for testing +void incrementalAlterConfigs(Map topicConfigs) { +Map> configOps = new HashMap<>(); +for (Map.Entry topicConfig : topicConfigs.entrySet()) { +Collection ops = new ArrayList<>(); +ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey()); +for (ConfigEntry config : topicConfig.getValue().entries()) { +if (config.isDefault() && !shouldReplicateSourceDefault(config.source())) { +ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE)); +} else { +ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET)); +} +} +configOps.put(configResource, ops); +} +log.trace("Syncing configs for {} topics.", configOps.size()); + targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> { +if (e != null) { +if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT) Review Comment: We can mock the connector's context and ensure that the correct method has been invoked: ```java @Test public void testIncrementalAlterConfigsRequiredButUnsupported() throws Exception { MockAdminClient admin = spy(new MockAdminClient()); ConnectorContext connectorContext = mock(ConnectorContext.class); MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIG, new DefaultConfigPropertyFilter(), admin)); connector.initialize(connectorContext); final String topic = "testtopic"; List entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); Config config = new Config(entries); doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); doNothing().when(connector).deprecatedAlterConfigs(any()); connector.syncTopicConfigs(); verify(connectorContext).raiseError(isA(UnsupportedVersionException.class)); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13420: KAFKA-14740: Add source tag to MirrorSourceMetrics - KIP-911
C0urante commented on PR #13420: URL: https://github.com/apache/kafka/pull/13420#issuecomment-1477881989 (I did not notice that the KIP voting thread has not closed yet. This should probably only be merged after that happens 😄) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14797) MM2 does not emit offset syncs when conservative translation logic exceeds positive max.offset.lag
[ https://issues.apache.org/jira/browse/KAFKA-14797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-14797. --- Resolution: Fixed > MM2 does not emit offset syncs when conservative translation logic exceeds > positive max.offset.lag > -- > > Key: KAFKA-14797 > URL: https://issues.apache.org/jira/browse/KAFKA-14797 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > This is a regression in MirrorMaker 2 introduced by KAFKA-12468. > Reproduction steps: > 1. Set max.offset.lag to a non-zero value. > 2. Set up a 1-1 replication flow which does not skip upstream offsets or have > a concurrent producer to the target topic. > 3. Produce more than max.offset.lag records to the source topic and allow > replication to proceed. > 4. Examine end offsets, checkpoints and/or target consumer group lag > Expected behavior: > Consumer group lag should be at most max.offset.lag. > Actual behavior: > Consumer group lag is significantly larger than max.offset.lag. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag
C0urante commented on PR #13367: URL: https://github.com/apache/kafka/pull/13367#issuecomment-1477875999 Merged and backported to 3.3 and 3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703204#comment-17703204 ] Atul Jain commented on KAFKA-14597: --- [~talestonini] I have another issue with process-latency [-avg, -max] . Is it possible that the observed discrepancies in latencies are due to the reasons you have mentioned above? I will describe my topology here. I have two sources and both these sources are writing data to their respective state stores. I have added delays (using sleep) of 3 and 1 sec respectively while writing to these state stores. I also have a PunctuateProcessor which runs Punctuate(60s Wall Clock Time). The punctuate logic checks the data in both state stores and adds them to sink. (attached topology diagram) !image-2023-03-21-19-01-54-713.png! *expected values of latencies* process-latency-max = 3sec process-latency-avg = avg(3sec, 1sec) = 2sec *Observed latencies on stream-thread-metrics* process-latency-max = 2334 ms process-latency-avg = 1971.91 ms !image-2023-03-21-19-03-07-525.png! *Observed latencies on stream-task-metrics* : (Obtained by setting metrics.recording.level=DEBUG) process-latency-max = 3000 ms process-latency-avg = 2034 ms !image-2023-03-21-19-03-28-625.png! Kindly let me know if it is a different issue and needs to be addressed separately. Thanks > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-07-24-352.png, > image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, > image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, > record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Atul Jain updated KAFKA-14597: -- Attachment: image-2023-03-21-19-03-28-625.png > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-07-24-352.png, > image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, > image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, > record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Atul Jain updated KAFKA-14597: -- Attachment: image-2023-03-21-19-03-07-525.png > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-07-24-352.png, > image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, > image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, > record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Atul Jain updated KAFKA-14597: -- Attachment: image-2023-03-21-19-01-54-713.png > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-07-24-352.png, > image-2023-03-21-19-01-54-713.png, process-latency-max.jpg, > record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag
C0urante merged PR #13367: URL: https://github.com/apache/kafka/pull/13367 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13425: KAFKA-14365: Extract common logic from Fetcher
C0urante commented on PR #13425: URL: https://github.com/apache/kafka/pull/13425#issuecomment-1477800891 ![fetch](https://user-images.githubusercontent.com/8636148/226614066-a13bd150-9bf5-4c1e-b578-8b6f1bf51c55.gif) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #13326: KAFKA-14774 the removed listeners should not be reconfigurable
chia7712 commented on PR #13326: URL: https://github.com/apache/kafka/pull/13326#issuecomment-1477661271 @showuon @mumrah Could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13369: KAFKA-14172: Should clear cache when active recycled from standby
lucasbru commented on code in PR #13369: URL: https://github.com/apache/kafka/pull/13369#discussion_r1143208858 ## streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java: ## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** + * An integration test to verify EOS properties when using Caching and Standby replicas + * while tasks are being redistributed after re-balancing event. + * The intent is not that this test should be merged into the repo but only provided for evidence on how to reproduce. + * One test fail and two test pass reliably on an i7-8750H CPU @ 2.20GHz × 12 with 32 GiB Memory + */ +@Category(IntegrationTest.class) +@SuppressWarnings("deprecation") Review Comment: No need to bite the bullet from my side ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on pull request #13097: [Draft] KAFKA-10532: close clean for EOS when it's RUNNING standby or RESTORING active
lucasbru commented on PR #13097: URL: https://github.com/apache/kafka/pull/13097#issuecomment-1477637108 > 2. streams exception: translated from kafka exception from restore consumer's other calls, as well as the restore callback) `InvalidOffsetException` is rethrown as `StreamsException` in `prepareChangelogs` and `hasRestoredToEnd`. That seems to violate your categorization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703143#comment-17703143 ] Atul Jain commented on KAFKA-14597: --- I currently have 2.6.0 version > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-07-24-352.png, > process-latency-max.jpg, record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] muralibasani commented on pull request #13417: KAFKA-14585: Moving StorageTool from core to tools module
muralibasani commented on PR #13417: URL: https://github.com/apache/kafka/pull/13417#issuecomment-1477601010 @mimaison PR is ready to review. - Had to modify build.gradle to avoid circular dependency. As formatCommand is being used in other classes in core module. - There is an unrelated test failing I guess. Hope it's ok ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13382: KAFKA-14722: Make BooleanSerde public
lucasbru commented on code in PR #13382: URL: https://github.com/apache/kafka/pull/13382#discussion_r1143171856 ## clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java: ## @@ -274,6 +282,13 @@ static public Serde UUID() { return new UUIDSerde(); } +/** + * A serde for nullable {@code Boolean} type. Review Comment: I checked again, and I guess you don't have to worry too much about my concern. When I read that comment you wrote, I understand "this class turns `true`, `false`, `null` into a byte array", but it actually maps `null` to `null` and the caller has to find a way how to deal with the `null` result. This seems like this is a convention generally in KStreams, so I think the code is fine, I was just worried about giving the wrong impression to as user of this function, but, as I said, probably safe to ignore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)
tinaselenge commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1143154814 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -514,6 +540,37 @@ private void updateTopicConfigs(Map topicConfigs) { })); } +// visible for testing +void incrementalAlterConfigs(Map topicConfigs) { +Map> configOps = new HashMap<>(); +for (Map.Entry topicConfig : topicConfigs.entrySet()) { +Collection ops = new ArrayList<>(); +ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey()); +for (ConfigEntry config : topicConfig.getValue().entries()) { +if (config.isDefault() && !shouldReplicateSourceDefault(config.source())) { +ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE)); +} else { +ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET)); +} +} +configOps.put(configResource, ops); +} +log.trace("Syncing configs for {} topics.", configOps.size()); + targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> { +if (e != null) { +if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT) Review Comment: I have addressed this by adding `context.raiseError(new UnsupportedVersionException())` however didn't test this in the unit tests. I tried mocking the API to return `UnsupportedVersionException`, but not sure how to check the error from the connect framework in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)
tinaselenge commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1143154814 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -514,6 +540,37 @@ private void updateTopicConfigs(Map topicConfigs) { })); } +// visible for testing +void incrementalAlterConfigs(Map topicConfigs) { +Map> configOps = new HashMap<>(); +for (Map.Entry topicConfig : topicConfigs.entrySet()) { +Collection ops = new ArrayList<>(); +ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey()); +for (ConfigEntry config : topicConfig.getValue().entries()) { +if (config.isDefault() && !shouldReplicateSourceDefault(config.source())) { +ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE)); +} else { +ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET)); +} +} +configOps.put(configResource, ops); +} +log.trace("Syncing configs for {} topics.", configOps.size()); + targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> { +if (e != null) { +if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT) Review Comment: I have addressed this by adding `context.raiseError(new UnsupportedVersionException())` however didn't test this in the unit tests. I tried mocking the API to return `UnsupportedVersionException`, but not sure how to catch the error from the connect framework in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703129#comment-17703129 ] Tales Tonini commented on KAFKA-14597: -- Hi [~atuljainiitk] , may I ask what Kafka Streams version you have in your app? Thanks. > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-07-24-352.png, > process-latency-max.jpg, record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tinaselenge commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)
tinaselenge commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1143144850 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -514,6 +540,37 @@ private void updateTopicConfigs(Map topicConfigs) { })); } +// visible for testing +void incrementalAlterConfigs(Map topicConfigs) { +Map> configOps = new HashMap<>(); +for (Map.Entry topicConfig : topicConfigs.entrySet()) { +Collection ops = new ArrayList<>(); +ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey()); +for (ConfigEntry config : topicConfig.getValue().entries()) { +if (config.isDefault() && !shouldReplicateSourceDefault(config.source())) { +ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE)); +} else { +ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET)); +} +} +configOps.put(configResource, ops); +} +log.trace("Syncing configs for {} topics.", configOps.size()); + targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> { +if (e != null) { +if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT) +&& e instanceof UnsupportedVersionException) { +//Fallback logic +log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. Therefore using deprecated AlterConfigs API for syncing topic configurations", sourceAndTarget.target(), e); +alterConfigs(topicConfigs); Review Comment: I agree with this, and removed the alterConfigs call from here. Setting the `use.incremental.alter.configs` to `never` should be enough here so that next time we sync configs, it should use the deprecated API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13427: MINOR:Incorrect/canonical use of constants in AdminClientConfig and StreamsConfigTest
hudeqi commented on PR #13427: URL: https://github.com/apache/kafka/pull/13427#issuecomment-1477565223 Hello, please help to review. @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge commented on pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)
tinaselenge commented on PR #13373: URL: https://github.com/apache/kafka/pull/13373#issuecomment-1477564128 @mimaison @C0urante Thank you so much for reviewing the PR. I think I have addressed all the review comments, please let me know if I have missed anything or you have any further comments. The jenkins build has failed due to mostly unrelated tests, except one integration test for mirrormaker, `MirrorConnectorsIntegrationSSLTest > testReplication() FAILED`. However, this test passes on my local machine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi closed pull request #13377: MINOR: Fix typos in JaasContext
hudeqi closed pull request #13377: MINOR: Fix typos in JaasContext URL: https://github.com/apache/kafka/pull/13377 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703118#comment-17703118 ] Atul Jain commented on KAFKA-14597: --- Hi [~cadonna] , {quote}Could you run your Streams application and ensure that {{process-total}} is 1 or greater when you look at {{{}record-e2e-latency-max{}}}? Please, let us know whether the value of the metric makes more sense then. {quote} In order to report this issue, I started the Streams app and captured the screenshot immediately after starting the streams application. Adding a new screenshot with process-total more than 0 to better represent the issue. !image-2023-03-21-15-07-24-352.png! The metric values do not make sense because they are not accounting for the sleep time added in a processor > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-06-47-836.png, > image-2023-03-21-15-07-24-352.png, process-latency-max.jpg, > record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Atul Jain updated KAFKA-14597: -- Attachment: (was: image-2023-03-21-15-06-47-836.png) > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-07-24-352.png, > process-latency-max.jpg, record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Atul Jain updated KAFKA-14597: -- Attachment: image-2023-03-21-15-07-24-352.png > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-06-47-836.png, > image-2023-03-21-15-07-24-352.png, process-latency-max.jpg, > record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Atul Jain updated KAFKA-14597: -- Attachment: image-2023-03-21-15-06-47-836.png > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-06-47-836.png, > process-latency-max.jpg, record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi closed pull request #13412: MINOR: Fix low bound in AdminClientConfig and DistributedConfig
hudeqi closed pull request #13412: MINOR: Fix low bound in AdminClientConfig and DistributedConfig URL: https://github.com/apache/kafka/pull/13412 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #13422: MINOR: Cleanups in clients common.config
mimaison merged PR #13422: URL: https://github.com/apache/kafka/pull/13422 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14812) ProducerPerformance still counting successful sending in console when sending failed
[ https://issues.apache.org/jira/browse/KAFKA-14812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-14812: -- Assignee: hudeqi > ProducerPerformance still counting successful sending in console when sending > failed > > > Key: KAFKA-14812 > URL: https://issues.apache.org/jira/browse/KAFKA-14812 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.3.2 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > Labels: kafka-producer-perf-test, tools > Attachments: WechatIMG27.jpeg > > > When using ProducerPerformance, I found that when the sending fails, it is > still counted as successfully sent by stat and the metrics are printed in > console. For example, when there is no write permission and cannot be written > in, the sending success rate is still magically displayed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14814) Skip restart of connectors when redundant resume request is made
[ https://issues.apache.org/jira/browse/KAFKA-14814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703107#comment-17703107 ] Yash Mayya commented on KAFKA-14814: Hi [~cmukka20], I've re-assigned the ticket to you! Feel free to ping me for review whenever it's ready. > Skip restart of connectors when redundant resume request is made > > > Key: KAFKA-14814 > URL: https://issues.apache.org/jira/browse/KAFKA-14814 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chaitanya Mukka >Priority: Minor > > Consecutive requests to the {{PUT /connectors//resume}} endpoint will > cause the Connector to be restarted. This is a little wasteful and conflicts > with the idempotent nature of that endpoint. We can tweak the > {{MemoryConfigBackingStore}} and {{KafkaConfigBackingStore}} classes to not > invoke the {{onConnectorTargetStateChange}} method of their > {{ConfigUpdateListener}} instance if they pick up a new target state that > matches the current target state of the connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] chia7712 merged pull request #13404: KAFKA-14812: ProducerPerformance still counting successful sending in …
chia7712 merged PR #13404: URL: https://github.com/apache/kafka/pull/13404 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14814) Skip restart of connectors when redundant resume request is made
[ https://issues.apache.org/jira/browse/KAFKA-14814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14814: -- Assignee: Chaitanya Mukka (was: Yash Mayya) > Skip restart of connectors when redundant resume request is made > > > Key: KAFKA-14814 > URL: https://issues.apache.org/jira/browse/KAFKA-14814 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chaitanya Mukka >Priority: Minor > > Consecutive requests to the {{PUT /connectors//resume}} endpoint will > cause the Connector to be restarted. This is a little wasteful and conflicts > with the idempotent nature of that endpoint. We can tweak the > {{MemoryConfigBackingStore}} and {{KafkaConfigBackingStore}} classes to not > invoke the {{onConnectorTargetStateChange}} method of their > {{ConfigUpdateListener}} instance if they pick up a new target state that > matches the current target state of the connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] chia7712 commented on pull request #13404: KAFKA-14812: ProducerPerformance still counting successful sending in …
chia7712 commented on PR #13404: URL: https://github.com/apache/kafka/pull/13404#issuecomment-1477457628 ``` [2023-03-17T04:47:45.614Z] BUILD SUCCESSFUL in 1h 56m 35s [2023-03-17T04:47:45.614Z] 224 actionable tasks: 120 executed, 104 up-to-date ``` tests pass. will merge it later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect
[ https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-6891. --- Fix Version/s: 3.5.0 Resolution: Fixed > send.buffer.bytes should be allowed to set -1 in KafkaConnect > - > > Key: KAFKA-6891 > URL: https://issues.apache.org/jira/browse/KAFKA-6891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Assignee: Zheng-Xian Li >Priority: Major > Fix For: 3.5.0 > > > *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* > constraint in *DistributedConfig*, whereas *-1* should be also allowed to set -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] chia7712 commented on pull request #13398: KAFKA-6891: send.buffer.bytes should be allowed to set -1 in KafkaConnect
chia7712 commented on PR #13398: URL: https://github.com/apache/kafka/pull/13398#issuecomment-1477456249 @garyparrot thanks for this contribution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #13398: KAFKA-6891: send.buffer.bytes should be allowed to set -1 in KafkaConnect
chia7712 merged PR #13398: URL: https://github.com/apache/kafka/pull/13398 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #13398: KAFKA-6891: send.buffer.bytes should be allowed to set -1 in KafkaConnect
chia7712 commented on PR #13398: URL: https://github.com/apache/kafka/pull/13398#issuecomment-1477443749 ``` [2023-03-20T05:31:14.475Z] BUILD SUCCESSFUL in 1h 44m 21s [2023-03-20T05:31:14.475Z] 224 actionable tasks: 120 executed, 104 up-to-date ``` the tests pass. will merge it later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14814) Skip restart of connectors when redundant resume request is made
[ https://issues.apache.org/jira/browse/KAFKA-14814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703073#comment-17703073 ] Chaitanya Mukka commented on KAFKA-14814: - Hi [~yash.mayya]! If you haven't started on this ticket would it be ok if I can pick it up? This seems like a good starter ticket. I have a quick draft up here - [https://github.com/apache/kafka/pull/13426] > Skip restart of connectors when redundant resume request is made > > > Key: KAFKA-14814 > URL: https://issues.apache.org/jira/browse/KAFKA-14814 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Minor > > Consecutive requests to the {{PUT /connectors//resume}} endpoint will > cause the Connector to be restarted. This is a little wasteful and conflicts > with the idempotent nature of that endpoint. We can tweak the > {{MemoryConfigBackingStore}} and {{KafkaConfigBackingStore}} classes to not > invoke the {{onConnectorTargetStateChange}} method of their > {{ConfigUpdateListener}} instance if they pick up a new target state that > matches the current target state of the connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)