[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnson Okorie updated KAFKA-16692: --- Affects Version/s: 3.6.1 (was: 3.6.2) > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.2. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.2 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|#L269]]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, this seems to be these requests are still sent to other brokers in > our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping _latestUsableVersion_ check as > expected. I am wondering if it is possible that because > _discoverBrokerVersions_ is set to false for the network client of the > AddPartitionsToTxnManager, it skips fetching ApiVersions? I can see here that > we create the network client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > This _NetworkUtils.buildNetworkClient_ seems to create a network client that > has _discoverBrokerVersions_ set to false. > I was hoping I could get some assistance debugging this issue. Happy to > provide any additional information needed. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnson Okorie updated KAFKA-16692: --- Description: We have a kafka cluster running on version 3.5.2 that we are upgrading to 3.6.2. This cluster has a lot of clients with exactly one semantics enabled and hence creating transactions. As we replaced brokers with the new binaries, we observed lots of clients in the cluster experiencing the following error: {code:java} 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, transactionalId=] Got error produce response with correlation id 6402937 on topic-partition , retrying (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before a response was received.{code} On inspecting the broker, we saw the following errors on brokers still running Kafka version 3.5.2: {code:java} message: Closing socket for because of error exception_exception_class: org.apache.kafka.common.errors.InvalidRequestException exception_exception_message: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled exception_stacktrace: org.apache.kafka.common.errors.InvalidRequestException: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled {code} On the new brokers running 3.6.2 we saw the following errors: {code:java} [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for node 1043 with a network exception.{code} I can also see this : {code:java} [AddPartitionsToTxnManager broker=1055]Cancelled in-flight ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 being disconnected (elapsed time since creation: 11ms, elapsed time since send: 4ms, request timeout: 3ms){code} We started investigating this issue and digging through the changes in 3.6, we came across some changes introduced as part of KAFKA-14402 that we thought might lead to this behaviour. First we could see that _transaction.partition.verification.enable_ is enabled by default and enables a new code path that culminates in we sending version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated [here|#L269]]. >From a >[discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] >on the mailing list, [~jolshan] pointed out that this scenario shouldn't be >possible as the following code paths should prevent version 4 >ADD_PARTITIONS_TO_TXN requests being sent to other brokers: [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] However, this seems to be these requests are still sent to other brokers in our environment. On further inspection of the code, I am wondering if the following code path could lead to this issue: [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] In this scenario, we don't have any _NodeApiVersions_ available for the specified nodeId and potentially skipping _latestUsableVersion_ check as expected. I am wondering if it is possible that because _discoverBrokerVersions_ is set to false for the network client of the AddPartitionsToTxnManager, it skips fetching ApiVersions? I can see here that we create the network client here: [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] This _NetworkUtils.buildNetworkClient_ seems to create a network client that has _discoverBrokerVersions_ set to false. I was hoping I could get some assistance debugging this issue. Happy to provide any additional information needed. was: We have a kafka cluster running on version 3.5.2 that we are upgrading to 3.6.2. This cluster has a lot of clients with exactly one semantics enabled and hence creating transactions. As we replaced brokers with the new binaries, we observed lots of clients in the cluster experiencing the following error: {code:java} 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, transactionalId=] Got error produce response with correlation id 6402937 on topic-partition , retrying (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before a response was received.{code} On inspecting the broker, we saw the following errors on brokers still running Kafka version 3.5.2: {code:java} message: Closing socket for because of error exception_exception_class: org.apache.kafka.common.errors.InvalidRequestException exception_exception_message: Received request api key ADD_PARTITIONS_TO_TXN with version
Re: [PR] KAFKA-16688: Use helper method to shutdown ExecutorService [kafka]
gaurav-narula commented on PR #15886: URL: https://github.com/apache/kafka/pull/15886#issuecomment-2100220810 > Before merging this PR, I want to know which `timerTask` gets hanging when shutdown. Do you have any idea? I don't unfortunately but I see you're trying to investigate that in #15891. I'll keep this one open meanwhile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when
Johnson Okorie created KAFKA-16692: -- Summary: InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when Key: KAFKA-16692 URL: https://issues.apache.org/jira/browse/KAFKA-16692 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.6.2 Reporter: Johnson Okorie We have a kafka cluster running on version 3.5.2 that we are upgrading to 3.6.2. This cluster has a lot of clients with exactly one semantics enabled and hence creating transactions. As we replaced brokers with the new binaries, we observed lots of clients in the cluster experiencing the following error: {code:java} 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, transactionalId=] Got error produce response with correlation id 6402937 on topic-partition , retrying (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before a response was received.{code} On inspecting the broker, we saw the following errors on brokers still running Kafka version 3.5.2: {code:java} message: Closing socket for because of error exception_exception_class: org.apache.kafka.common.errors.InvalidRequestException exception_exception_message: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled exception_stacktrace: org.apache.kafka.common.errors.InvalidRequestException: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled {code} On the new brokers running 3.6.2 we saw the following errors: {code:java} [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for node 1043 with a network exception.{code} I can also see this : {code:java} [AddPartitionsToTxnManager broker=1055]Cancelled in-flight ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 being disconnected (elapsed time since creation: 11ms, elapsed time since send: 4ms, request timeout: 3ms){code} We started investigating this issue and digging through the changes in 3.6, we came across some changes introduced as part of [KAFKA-14402|https://issues.apache.org/jira/browse/KAFKA-14402] that we thought might lead to this behaviour. First we could see that _transaction.partition.verification.enable_ is enabled by default and enables a new code path that culminates in we sending version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated [here|[https://github.com/apache/kafka/blob/cb35ddc5ca233d5cca6f51c1c41b952a7e9fe1a0/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]]. >From a >[discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] >on the mailing list, [~jolshan] pointed out that this scenario shouldn't be >possible as the following code paths should prevent version 4 >ADD_PARTITIONS_TO_TXN requests being sent to other brokers: [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] However, this seems to be these requests are still sent to other brokers in our environment. On further inspection of the code, I am wondering if the following code path could lead to this issue: [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] In this scenario, we don't have any _NodeApiVersions_ available for the specified nodeId and potentially skipping _latestUsableVersion_ check as expected. I am wondering if it is possible that because _discoverBrokerVersions_ is set to false for the network client of the AddPartitionsToTxnManager, it skips fetching ApiVersions? I can see here that we create the network client here: [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] This _NetworkUtils.buildNetworkClient_ seems to create a network client that has _discoverBrokerVersions_ set to false. I was hoping I could get some assistance debugging this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]
chia7712 commented on PR #15890: URL: https://github.com/apache/kafka/pull/15890#issuecomment-2100176348 @gongxuanzhang thanks for this patch. Could you take a look at https://issues.apache.org/jira/browse/KAFKA-10787? Personally, we should introduce the new checkstyle file. And then we apply it by module to reduce the patch size. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]
lucasbru commented on code in PR #15882: URL: https://github.com/apache/kafka/pull/15882#discussion_r1593738288 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -544,69 +585,78 @@ private void handleReassignedActiveTask(final Task task, } private void handleTasksInStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate) { + final Map> standbyTasksToCreate, + final Map> tasksToRecycle, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater, + final Map failedTasks) { +final Map> newInputPartitions = new HashMap<>(); +final Map> standbyInputPartitions = new HashMap<>(); +final Map> activeInputPartitions = new HashMap<>(); +final Map> futuresForUpdatingInputPartitions = new LinkedHashMap<>(); +final Map> futuresForActiveTasksToRecycle = new LinkedHashMap<>(); +final Map> futuresForStandbyTasksToRecycle = new LinkedHashMap<>(); +final Map> futuresForTasksToClose = new LinkedHashMap<>(); for (final Task task : stateUpdater.getTasks()) { final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { -final Set inputPartitions = activeTasksToCreate.get(taskId); -if (task.isActive() && !task.inputPartitions().equals(inputPartitions)) { -if (tasks.removePendingTaskToCloseClean(taskId)) { - tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(taskId, inputPartitions); -} else { -tasks.addPendingTaskToUpdateInputPartitions(taskId, inputPartitions); -stateUpdater.remove(taskId); -} -} else if (task.isActive()) { -if (tasks.removePendingActiveTaskToSuspend(taskId)) { -log.info( -"We were planning on suspending a task {} because it was revoked " + -"The task got reassigned to this thread, so we cancel suspending " + -"of the task, but add it back to the state updater, since we do not know " + -"if it is fully restored yet.", -taskId); -tasks.addPendingTaskToAddBack(taskId); -} -if (tasks.removePendingTaskToCloseClean(taskId)) { -log.info( -"We were planning on closing task {} because we lost one of its partitions." + -"The task got reassigned to this thread, so cancel closing of the task, but add it back to the " + -"state updater, since we may have to catch up on the changelog.", -taskId); -tasks.addPendingTaskToAddBack(taskId); +if (task.isActive()) { +if (!task.inputPartitions().equals(activeTasksToCreate.get(taskId))) { +final CompletableFuture future = stateUpdater.removeWithFuture(taskId); +futuresForUpdatingInputPartitions.put(taskId, future); +newInputPartitions.put(taskId, activeTasksToCreate.get(taskId)); } } else { -removeTaskToRecycleFromStateUpdater(taskId, inputPartitions); +final CompletableFuture future = stateUpdater.removeWithFuture(taskId); +futuresForStandbyTasksToRecycle.put(taskId, future); +activeInputPartitions.put(taskId, activeTasksToCreate.get(taskId)); } activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { if (task.isActive()) { -removeTaskToRecycleFromStateUpdater(taskId, standbyTasksToCreate.get(taskId)); -} else { -if (tasks.removePendingTaskToRecycle(taskId) != null) { -log.info( -"We were planning on recycling standby task {} to an active task." + -"The task got reassigned to this thread as a standby task, so cancel recycling of the task, " + -"but add it back to the state updater, since we may have to catch up on the changelog.", -taskId); -tasks.addPendingTaskToAddBack(taskId); -} +final Comp
Re: [PR] MINOR: enable test for ensureInternalEndpointIsSecured [kafka]
chia7712 commented on code in PR #15868: URL: https://github.com/apache/kafka/pull/15868#discussion_r1593737692 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java: ## @@ -115,11 +112,11 @@ public void ensureInternalEndpointIsSecured() throws Throwable { // Try again, but with an invalid signature log.info( "Making a POST request to the {} endpoint with no connector started and an invalid signature header; " -+ "expecting 403 error response", ++ "expecting 503 error response", connectorTasksEndpoint ); assertEquals( -FORBIDDEN.getStatusCode(), +SERVICE_UNAVAILABLE.getStatusCode(), Review Comment: `503` is a temporary error, because the worker is still in starting. Could you please add a wait to check the 503 first ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule
[ https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844598#comment-17844598 ] xuanzhang gong commented on KAFKA-16643: I have finished this modification task link:https://github.com/apache/kafka/pull/15890 > Add ModifierOrder checkstyle rule > - > > Key: KAFKA-16643 > URL: https://issues.apache.org/jira/browse/KAFKA-16643 > Project: Kafka > Issue Type: Task > Components: build >Reporter: Greg Harris >Priority: Minor > > Checkstyle offers the ModifierOrder rule: > [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that > Kafka violates in a lot of places. We should decide if this is a checkstyle > rule we should be following or not, and potentially enable it moving forward. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]
sidyag commented on code in PR #15837: URL: https://github.com/apache/kafka/pull/15837#discussion_r1593693523 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3037,14 +3062,71 @@ class KafkaApisTest extends Logging { } @Test - def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = { + def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion_allowedWithAlterCluster(): Unit = { Review Comment: Made the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]
chia7712 commented on code in PR #15873: URL: https://github.com/apache/kafka/pull/15873#discussion_r1593681340 ## core/src/test/java/kafka/admin/ConfigCommandUnitTest.java: ## @@ -410,6 +448,432 @@ public void testOptionEntityTypeNames() { doTestOptionEntityTypeNames(false); } +@Test +public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfUnrecognisedEntityType() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfBrokerEntityTypeIsNotAnInteger() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--broker", "A", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--broker", "A", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfMixedEntityTypeFlags() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfInvalidHost() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A,B", "--entity-type", "ips", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfInvalidHostUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A,B", "--entity-type", "ips", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfUnresolvableHost() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "RFC2606.invalid", "--entity-type", "ips",
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
clolov commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1593670905 ## metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java: ## @@ -717,7 +717,7 @@ public void testUnregisterBroker() throws Throwable { setBrokerId(0). setClusterId(active.clusterId()). setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). -setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)). +setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV1)). Review Comment: Hopefully this ought to be addressed in the next commit as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
clolov commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1593668135 ## metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java: ## @@ -283,7 +283,7 @@ private static Stream metadataVersionsForTestPartitionRegistration() return Stream.of( MetadataVersion.IBP_3_7_IV1, MetadataVersion.IBP_3_7_IV2, -MetadataVersion.IBP_3_8_IV0 +MetadataVersion.IBP_3_8_IV1 Review Comment: My miss, updated in the next commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16679 merge unit test down to the class of integration test [kafka]
showuon commented on code in PR #15884: URL: https://github.com/apache/kafka/pull/15884#discussion_r1593636792 ## tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java: ## @@ -160,28 +154,26 @@ private String outputWithoutEpoch(String output) { int pos = output.indexOf("Epoch: "); return (pos > 0) ? output.substring(0, pos) : output; } -} -class FeatureCommandUnitTest { Review Comment: ditto, adding comments to make it clear the following tests are unit tests. ## tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java: ## @@ -112,12 +107,7 @@ private static void executeAndAssertOutput(String json, String expOut, Admin adm }); assertTrue(output.contains(expOut)); } -} -/** - * Unit test of {@link DeleteRecordsCommand} tool. - */ Review Comment: I would keep the comment to make it clear the following tests are unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-10199: Shutdown with new remove operation in state updater [kafka]
cadonna opened a new pull request, #15894: URL: https://github.com/apache/kafka/pull/15894 Uses the new remove operation of the state updater that returns a future to shutdown the task manager. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844579#comment-17844579 ] FTR edited comment on KAFKA-16687 at 5/8/24 8:09 AM: - Update: Testing longer for 3.6.0, the resident memory of Java App proccess wasn't more than the max Heap size. But as 3.7.0, total reserved and committed Native Memory was more than OS memory, also it's caused by [NMT Internal] memory. Again, in the end, after free and buffer/cache memory was exhausted, kswapd0 process was working to swap. JDK version: OpenJDK 1.8.0_342 was (Author: JIRAUSER305339): Update: Testing longer for 3.6.0, the resident memory of Java App proccess wasn't more than the max Heap size. But as 3.7.0, total reserved and committed Native Memory was more than OS memory, also it's caused by [NMT Internal] memory. Again, in the end, after free and buffer/cache memory was exhausted, kswapd0 process was working to swap. > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using
[ https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16640. Fix Version/s: 3.8.0 Resolution: Fixed > Replace TestUtils#resource by scala.util.Using > -- > > Key: KAFKA-16640 > URL: https://issues.apache.org/jira/browse/KAFKA-16640 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > Fix For: 3.8.0 > > > `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we > don't need to have custom try-resource function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16640: Replace TestUtils#resource by scala.util.Using [kafka]
chia7712 merged PR #15881: URL: https://github.com/apache/kafka/pull/15881 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Revoke tasks from state updater with new remove [kafka]
cadonna merged PR #15871: URL: https://github.com/apache/kafka/pull/15871 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Revoke tasks from state updater with new remove [kafka]
rishiraj88 commented on code in PR #15871: URL: https://github.com/apache/kafka/pull/15871#discussion_r1593553420 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -623,6 +623,21 @@ private void addToTasksToClose(final Map> futures, Review Comment: OK then. Let's continue to have it as a method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844579#comment-17844579 ] FTR commented on KAFKA-16687: - Update: Testing longer for 3.6.0, the resident memory of Java App proccess wasn't more than the max Heap size. But as 3.7.0, total reserved and committed Native Memory was more than OS memory, also it's caused by [NMT Internal] memory. Again, in the end, after free and buffer/cache memory was exhausted, kswapd0 process was working to swap. > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Remove dev_version parameter from streams tests [kafka]
lucasbru merged PR #15874: URL: https://github.com/apache/kafka/pull/15874 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove dev_version parameter from streams tests [kafka]
lucasbru commented on PR #15874: URL: https://github.com/apache/kafka/pull/15874#issuecomment-2099949443 https://confluent-kafka-branch-builder-system-test-results-v2.s3-us-west-2.amazonaws.com/trunk/2024-05-07--001./report.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16511: -- Fix Version/s: 3.7.1 > Leaking tiered segments > --- > > Key: KAFKA-16511 > URL: https://issues.apache.org/jira/browse/KAFKA-16511 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Francois Visconte >Assignee: Kamal Chandraprakash >Priority: Major > Labels: tiered-storage > Fix For: 3.8.0, 3.7.1 > > > I have some topics there were not written since a few days (having 12h > retention) where some data remains on tiered storage (in our case S3) and > they are never deleted. > > Looking at the log history, it appears that we never even tried to delete > these segments: > When looking at one of the non-leaking segment, I get the following > interesting messages: > > {code:java} > "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current > earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), > segment-end-offset: 2976819 and segment-epochs: [5]" > "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data > for completed successfully > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data > for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied > 02968418.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ}" > "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, > metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > {code} > > Which looks right because we can see logs from both the plugin and remote log > manager indicating that the remote log segment was removed. > Now if I look on one of the leaked segment, here is what I see > > {code:java} > "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied > 02971163.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, > id=8dP13VDYSaiFlubl9SNBTQ}" > "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, > id=8dP13VDYSaiFlubl9SNBTQ} > , startOffset=2971163, endOffset=2978396, brokerId=10001, > maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, > segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > "2024-04-02T00:43:20.003Z","""kafka""","""10001"
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1593525765 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: > While updating the [UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535), the HWM also gets updated and it doesn't hit the fetchHighWatermarkMetadata (or) convertToOffsetMetadataOrThrow so the call will succeed even when current-log-start-offset > old-HWM. For [UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535), I think it's safe we didn't throw exception when `current-log-start-offset > old-HWM` because it will be called when initialization or `maybeIncrementLogStartOffset`. In the latter, we've checked `newLogStartOffset > logStartOffset` already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]
showuon merged PR #15817: URL: https://github.com/apache/kafka/pull/15817 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]
showuon commented on PR #15817: URL: https://github.com/apache/kafka/pull/15817#issuecomment-2099912904 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1593511224 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: Ah, OK. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1593500243 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -221,41 +199,28 @@ private void emitNonJoinedOuterRecords( // reset to MAX_VALUE in case the store is empty sharedTimeTracker.minTime = Long.MAX_VALUE; -try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { +try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; -boolean outerJoinLeftWindowOpen = false; -boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { -if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { -// if windows are open for both joinSides we can break since there are no more candidates to emit +final KeyValue, LeftOrRightValue> nextKeyValue = it.next(); +final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = nextKeyValue.key; +sharedTimeTracker.minTime = timestampedKeyAndJoinSide.getTimestamp(); +if (isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, true) && isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, false)) { Review Comment: Yeah, you are right! That's a good catch. I was depending too much on the tests. I couldn't find any test that triggered the condition even after reverting the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org