[GitHub] [kafka] ableegoldman opened a new pull request, #12585: HOTFIX: fix PriorityQueue iteration to assign warmups in priority order
ableegoldman opened a new pull request, #12585: URL: https://github.com/apache/kafka/pull/12585 Based on a patch submitted to the `confluentinc` fork & then abandoned. Needed some updates and minor expansion but more or less just re-applied the changes proposed in https://github.com/confluentinc/kafka/pull/697. Original PR has a _very_ detailed justification for these changes but the tl;dr of it is that apparently the PriorityQueue's iterator [does not actually guarantee](https://docs.oracle.com/javase/8/docs/api/java/util/PriorityQueue.html#iterator--) to return elements in priority order... 😒 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14196) Flaky OffsetValidationTest seems to indicate potential duplication issue during rebalance
Philip Nee created KAFKA-14196: -- Summary: Flaky OffsetValidationTest seems to indicate potential duplication issue during rebalance Key: KAFKA-14196 URL: https://issues.apache.org/jira/browse/KAFKA-14196 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.2.1 Reporter: Philip Nee Several flaky tests under OffsetValidationTest are indicating potential consumer duplication issue, when autocommit is enabled. Below shows the failure message: {code:java} Total consumed records 3366 did not match consumed position 3331 {code} After investigating the log, I discovered that the data consumed between the start of a rebalance event and the async commit was lost for those failing tests. In the example below, the rebalance event kicks in at around 1662054846995 (first record), and the async commit of the offset 3739 is completed at around 1662054847015 (right before partitions_revoked). {code:java} {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} {code} A few things to note here: # This is highly flaky, I found 1/4 runs will fail the tests # Manually calling commitSync in the onPartitionsRevoke cb seems to alleviate the issue # Setting includeMetadataInTimeout to false also seems to alleviate the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mghosh4 commented on pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics
mghosh4 commented on PR #10910: URL: https://github.com/apache/kafka/pull/10910#issuecomment-1234907832 Any update on when this diff can be landed? Incase you need some help, I am happy to help. This is a problem that we have had to deal with too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee closed pull request #12580: MINOR: Fix flaky OffsetValidationTest
philipnee closed pull request #12580: MINOR: Fix flaky OffsetValidationTest URL: https://github.com/apache/kafka/pull/12580 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #12578: KAFKA-14195: Fix KRaft AlterConfig policy usage for Legacy/Full case
cmccabe merged PR #12578: URL: https://github.com/apache/kafka/pull/12578 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #12582: MINOR: Demystify rebalance schedule log
vvcephei merged PR #12582: URL: https://github.com/apache/kafka/pull/12582 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #12582: MINOR: Demystify rebalance schedule log
vvcephei commented on PR #12582: URL: https://github.com/apache/kafka/pull/12582#issuecomment-1234803264 The java 17 build failed with this unrelated exception: ``` 10:00:43 Execution failed for task ':core:unitTest'. 10:00:43 > Process 'Gradle Test Executor 50' finished with non-zero exit value 1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #12583: KAFKA-10199: Separate state updater from old restore
wcarlson5 commented on code in PR #12583: URL: https://github.com/apache/kafka/pull/12583#discussion_r960982217 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -396,8 +397,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, new Tasks(new LogContext(logPrefix)), topologyMetadata, adminClient, -stateDirectory, -maybeCreateAndStartStateUpdater(config, changelogReader, time) +stateDirectory, maybeCreateAndStartStateUpdater(config, changelogReader, time) Review Comment: nit: new line ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -867,37 +868,47 @@ void runOnce() { } private void initializeAndRestorePhase() { -// only try to initialize the assigned tasks -// if the state is still in PARTITION_ASSIGNED after the poll call +final java.util.function.Consumer> offsetResetter = partitions -> resetOffsets(partitions, null); final State stateSnapshot = state; -if (stateSnapshot == State.PARTITIONS_ASSIGNED -|| stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) { +if (stateUpdaterEnabled) { +checkStateUpdater(); +} else { +// only try to initialize the assigned tasks +// if the state is still in PARTITION_ASSIGNED after the poll call +if (stateSnapshot == State.PARTITIONS_ASSIGNED +|| stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) { -log.debug("State is {}; initializing tasks if necessary", stateSnapshot); +log.debug("State is {}; initializing tasks if necessary", stateSnapshot); -// transit to restore active is idempotent so we can call it multiple times -changelogReader.enforceRestoreActive(); +if (taskManager.tryToCompleteRestoration(now, offsetResetter)) { +log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs, +taskManager.allTasks().keySet()); +setState(State.RUNNING); +} -if (taskManager.tryToCompleteRestoration(now, partitions -> resetOffsets(partitions, null))) { -changelogReader.transitToUpdateStandby(); -log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs, -taskManager.allTasks().keySet()); -setState(State.RUNNING); +if (log.isDebugEnabled()) { +log.debug("Initialization call done. State is {}", state); +} } if (log.isDebugEnabled()) { -log.debug("Initialization call done. State is {}", state); +log.debug("Idempotently invoking restoration logic in state {}", state); Review Comment: `Idempotently` only for the state. I think that this log could be confusing if you read it as restore logic is Idempotently which I don't think if correct. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -770,7 +771,7 @@ void runOnce() { long totalCommitLatency = 0L; long totalProcessLatency = 0L; long totalPunctuateLatency = 0L; -if (state == State.RUNNING) { +if (state == State.RUNNING || stateUpdaterEnabled) { Review Comment: This seems risky. Can you explain what states this flag could be true? I am worries about CREATED and ERROR states mostly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r961032133 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: Discussed offline with @guozhangwang . If the `map` or other transformation is a sibling of the join node but the join node has a single parent, then this means this transformation is a no-op and hence the optimization can be applied. For the transformation to not be a no-op, the join node must have two or more parents, hence the optimization won't be applicable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r961012010 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: The logical plan will have only one source since they will get merged as they are referring to the same topic. Hence, all the nodes will be children of the source node and siblings of the join node. For example, Case 2: ``` root -> source source -> join-window1, join-window2, stream-join ``` and Case 3: ``` root -> source source -> join-window1, map-values1, join-window2, stream-join ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join
guozhangwang commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r961002924 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); +} +} +} + +/** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. Review Comment: Just curious, in the above case, logically speaking since we are not reassigning the map-valued result stream back to stream1 like `stream1 = stream1.mapValues()`, this step is basically a no-op and should not affect stream1 right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join
guozhangwang commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960983275 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: Hmm.. for case 2), the logical plan would be like: ``` root -> source1 root -> source2 source1 -> join-window1 source2 -> join-window2 source1,source2 -> stream-join ``` Is that right? For case 3), since we are not re-assigning the result of `mapValues` to stream` it should just be a no-op right? The logical plan would be like: ``` root -> source1 root -> source2 source1 -> join-window1 source1 -> map-values1 source2 -> join-window2 source1,source2 -> stream-join ``` Should that be considered optimizable as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14195) Fix KRaft AlterConfig policy usage for Legacy/Full case
Ron Dagostino created KAFKA-14195: - Summary: Fix KRaft AlterConfig policy usage for Legacy/Full case Key: KAFKA-14195 URL: https://issues.apache.org/jira/browse/KAFKA-14195 Project: Kafka Issue Type: Bug Affects Versions: 3.3 Reporter: Ron Dagostino Assignee: Ron Dagostino The fix for https://issues.apache.org/jira/browse/KAFKA-14039 adjusted the invocation of the alter configs policy check in KRaft to match the behavior in ZooKeeper, which is to only provide the configs that were explicitly sent in the request. While the code was correct for the incremental alter configs case, the code actually included the implicit deletions for the legacy/non-incremental alter configs case, and those implicit deletions are not included in the ZooKeeper-based invocation. The implicit deletions should not be passed in the legacy case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rondagostino commented on a diff in pull request #12578: KAFKA-14039: Additional compatibility fix for KRaft Alter Configs
rondagostino commented on code in PR #12578: URL: https://github.com/apache/kafka/pull/12578#discussion_r960958343 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -248,33 +249,45 @@ private void incrementalAlterConfigResource(ConfigResource configResource, } private ApiError validateAlterConfig(ConfigResource configResource, - List newRecords, + List recordsExplicitlyAltered, + List recordsImplicitlyDeleted, boolean newlyCreatedResource) { Map allConfigs = new HashMap<>(); -Map alteredConfigs = new HashMap<>(); +Map alteredConfigsForAlterConfigPolicyCheck = new HashMap<>(); TimelineHashMap existingConfigs = configData.get(configResource); if (existingConfigs != null) allConfigs.putAll(existingConfigs); -for (ApiMessageAndVersion newRecord : newRecords) { +for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) { ConfigRecord configRecord = (ConfigRecord) newRecord.message(); if (configRecord.value() == null) { allConfigs.remove(configRecord.name()); } else { allConfigs.put(configRecord.name(), configRecord.value()); } -alteredConfigs.put(configRecord.name(), configRecord.value()); +alteredConfigsForAlterConfigPolicyCheck.put(configRecord.name(), configRecord.value()); +} +for (ApiMessageAndVersion recordImplicitlyDeleted : recordsImplicitlyDeleted) { +ConfigRecord configRecord = (ConfigRecord) recordImplicitlyDeleted.message(); +allConfigs.remove(configRecord.name()); +// As per KAFKA-14039, do not include implicit deletions caused by using the legacy AlterConfigs API +// in the list passed to the policy in order to maintain backwards compatibility Review Comment: The order a few lines above is `allConfigs.remove(configRecord.name());` followed by `alteredConfigsForAlterConfigPolicyCheck.put(configRecord.name(), configRecord.value());`. If we mirror that here then the comment appears in the place where `alteredConfigsForAlterConfigPolicyCheck.put()` would occur. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960956896 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); +} +} +} + +/** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ +private boolean isSelfJoin(final GraphNode streamJoinNode) { +final AtomicInteger count = new AtomicInteger(); +countSourceNodes(count, streamJoinNode, new HashSet<>()); +if (count.get() > 1) { +return false; +} +if (streamJoinNode.parentNodes().size() > 1) { +return false; +} +for (final GraphNode parent: streamJoinNode.parentNodes()) { +for (final GraphNode sibling : parent.children()) { +if (sibling instanceof ProcessorGraphNode) { +if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) { +continue; +} +} +if (sibling != streamJoinNode +&& sibling.buildPriority() < streamJoinNode.buildPriority()) { +return false; +} +} +} +return true; +} + +private void countSourceNodes( +final AtomicInteger count, +final GraphNode currentNode, +final Set visited) { + +if (currentNode instanceof StreamSourceNode) { +count.incrementAndGet(); +} + +for (final GraphNode parent: currentNode.parentNodes()) { +if (!visited.contains(parent)) { Review Comment: Great catch, thank you! I am not sure about the latter so I implemented the equals and hashcode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12578: KAFKA-14039: Additional compatibility fix for KRaft Alter Configs
jsancio commented on code in PR #12578: URL: https://github.com/apache/kafka/pull/12578#discussion_r960954024 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -248,33 +249,45 @@ private void incrementalAlterConfigResource(ConfigResource configResource, } private ApiError validateAlterConfig(ConfigResource configResource, - List newRecords, + List recordsExplicitlyAltered, + List recordsImplicitlyDeleted, boolean newlyCreatedResource) { Map allConfigs = new HashMap<>(); -Map alteredConfigs = new HashMap<>(); +Map alteredConfigsForAlterConfigPolicyCheck = new HashMap<>(); TimelineHashMap existingConfigs = configData.get(configResource); if (existingConfigs != null) allConfigs.putAll(existingConfigs); -for (ApiMessageAndVersion newRecord : newRecords) { +for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) { ConfigRecord configRecord = (ConfigRecord) newRecord.message(); if (configRecord.value() == null) { allConfigs.remove(configRecord.name()); } else { allConfigs.put(configRecord.name(), configRecord.value()); } -alteredConfigs.put(configRecord.name(), configRecord.value()); +alteredConfigsForAlterConfigPolicyCheck.put(configRecord.name(), configRecord.value()); +} +for (ApiMessageAndVersion recordImplicitlyDeleted : recordsImplicitlyDeleted) { +ConfigRecord configRecord = (ConfigRecord) recordImplicitlyDeleted.message(); +allConfigs.remove(configRecord.name()); +// As per KAFKA-14039, do not include implicit deletions caused by using the legacy AlterConfigs API +// in the list passed to the policy in order to maintain backwards compatibility Review Comment: Should this documentation come before the lines 269-270? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot
ashmeet13 commented on PR #12414: URL: https://github.com/apache/kafka/pull/12414#issuecomment-1234551652 Thank you @jsancio for the review. Have made the changes. For the following comment - >It looks like this method is only used by tests. I think it is okay for this to have an "unknown reason" for now. Currently the function in discussion has no logging taking place hence I haven't added the `UnkownReason` in. Should I update the function to log the same? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ashmeet13 commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot
ashmeet13 commented on code in PR #12414: URL: https://github.com/apache/kafka/pull/12414#discussion_r960902453 ## core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala: ## @@ -31,5 +32,5 @@ trait MetadataSnapshotter { * * @return True if we will write out a new snapshot; false otherwise. */ - def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean + def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage, reason: Set[SnapshotReason]): Boolean Review Comment: Added the `reason` as a `param` in the documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ashmeet13 commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot
ashmeet13 commented on code in PR #12414: URL: https://github.com/apache/kafka/pull/12414#discussion_r960901993 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ## @@ -59,11 +60,11 @@ class BrokerMetadataSnapshotter( val writer = writerBuilder.build( image.highestOffsetAndEpoch().offset, image.highestOffsetAndEpoch().epoch, -lastContainedLogTime - ) +lastContainedLogTime) Review Comment: My bad, must have done this by mistake. Fixed this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ashmeet13 commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot
ashmeet13 commented on code in PR #12414: URL: https://github.com/apache/kafka/pull/12414#discussion_r960901730 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -119,16 +120,29 @@ class BrokerMetadataListener( } _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes - if (shouldSnapshot()) { -maybeStartSnapshot() + + val shouldTakeSnapshot: Set[SnapshotReason] = shouldSnapshot() + if (shouldTakeSnapshot.nonEmpty) { +maybeStartSnapshot(shouldTakeSnapshot) } _publisher.foreach(publish) } } - private def shouldSnapshot(): Boolean = { -(_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || metadataVersionChanged() + private def shouldSnapshot(): Set[SnapshotReason] = { +val metadataVersionHasChanged: Boolean = metadataVersionChanged() +val maxBytesHaveExceeded: Boolean = (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) Review Comment: Removed the type declaration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ndrwdn opened a new pull request, #12584: KAFKA-14194 Fix NPE in Cluster.nodeIfOnline
ndrwdn opened a new pull request, #12584: URL: https://github.com/apache/kafka/pull/12584 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14194) NPE in Cluster.nodeIfOnline
[ https://issues.apache.org/jira/browse/KAFKA-14194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Dean updated KAFKA-14194: Component/s: clients > NPE in Cluster.nodeIfOnline > --- > > Key: KAFKA-14194 > URL: https://issues.apache.org/jira/browse/KAFKA-14194 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Andrew Dean >Priority: Major > > When utilizing rack-aware Kafka consumers and the Kafka broker cluster is > restarted an NPE can occur during transient metadata updates. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14194) NPE in Cluster.nodeIfOnline
[ https://issues.apache.org/jira/browse/KAFKA-14194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Dean updated KAFKA-14194: Affects Version/s: 3.2.1 > NPE in Cluster.nodeIfOnline > --- > > Key: KAFKA-14194 > URL: https://issues.apache.org/jira/browse/KAFKA-14194 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.2.1 >Reporter: Andrew Dean >Priority: Major > > When utilizing rack-aware Kafka consumers and the Kafka broker cluster is > restarted an NPE can occur during transient metadata updates. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14194) NPE in Cluster.nodeIfOnline
Andrew Dean created KAFKA-14194: --- Summary: NPE in Cluster.nodeIfOnline Key: KAFKA-14194 URL: https://issues.apache.org/jira/browse/KAFKA-14194 Project: Kafka Issue Type: Bug Reporter: Andrew Dean When utilizing rack-aware Kafka consumers and the Kafka broker cluster is restarted an NPE can occur during transient metadata updates. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960845326 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); Review Comment: We can have n-way self-joins but they won't get optimized, only the first one will get optimized. However, we can have topologies that have more than one self-joins, not n-way but independent pairs. And all of them can and should be optimized. I added a test case in `InternalStreamsBuilderTest:shouldMarkAllStreamStreamJoinsAsSelfJoin` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960845326 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); Review Comment: We can have multiple joins but they won't get optimized, only the first one will get optimized. However, we can have topologies that have more than one self-joins, not n-way but independent pairs. And all of them can and should be optimized. I added a test case in `InternalStreamsBuilderTest:shouldMarkAllStreamStreamJoinsAsSelfJoin` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12519: KAFKA-10199: Handle exceptions from state updater
cadonna commented on PR #12519: URL: https://github.com/apache/kafka/pull/12519#issuecomment-1234414581 I am sorry, one of my merged PRs has introduced conflicts here. 🙁 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)
mimaison commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1234403651 Yes if the system tests pass, I'm in favor of merging this in 3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…
[ https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17599002#comment-17599002 ] John Gray edited comment on KAFKA-14172 at 9/1/22 2:56 PM: --- I know next to nothing about the internal workings of Kafka, sadly, but I am noticing that KAFKA-12486 was introduced in 3.1.0, which is the version I started noticing problems. I see you helped out with that Jira, [~ableegoldman] , is there any possible way in your mind that it might cause weirdness with state restoration? was (Author: gray.john): I know next to nothing about the internal workings of Kafka, sadly, but I am noticing that KAFKA-12486 was introduced in 3.1.0, which is the version I started noticing problems. I notice you helped out with that Jira, [~ableegoldman] , is there any possible way in your mind that it might cause weirdness with state restoration? > bug: State stores lose state when tasks are reassigned under EOS wit… > - > > Key: KAFKA-14172 > URL: https://issues.apache.org/jira/browse/KAFKA-14172 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1 >Reporter: Martin Hørslev >Priority: Major > > h1. State stores lose state when tasks are reassigned under EOS with standby > replicas and default acceptable lag. > I have observed that state stores used in a transform step under a Exactly > Once semantics ends up losing state after a rebalancing event that includes > reassignment of tasks to previous standby task within the acceptable standby > lag. > > The problem is reproduceable and an integration test have been created to > showcase the [issue|https://github.com/apache/kafka/pull/12540]. > A detailed description of the observed issue is provided > [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45] > Similar issues have been observed and reported to StackOverflow for example > [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #12583: KAFKA-10199: Separate state updater from old restore
cadonna commented on code in PR #12583: URL: https://github.com/apache/kafka/pull/12583#discussion_r960754245 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -280,25 +280,17 @@ public void shouldClassifyExistingTasksWithStateUpdater() { @Test public void shouldAddTasksToStateUpdater() { -final StreamTask task00 = statefulTask(taskId00, taskId00Partitions) +final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) .withInputPartitions(taskId00Partitions) -.inState(State.RESTORING) -.build(); -final StandbyTask task01 = standbyTask(taskId01, taskId01Partitions) +.inState(State.RESTORING).build(); +final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) .withInputPartitions(taskId01Partitions) -.inState(State.RUNNING) -.build(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()).anyTimes(); -expect(consumer.assignment()).andReturn(emptySet()).anyTimes(); -consumer.resume(anyObject()); -expectLastCall().anyTimes(); -expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); - expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01)); -replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); +.inState(State.RUNNING).build(); +final TasksRegistry tasks = mock(TasksRegistry.class); +when(tasks.drainPendingTaskToInit()).thenReturn(mkSet(task00, task01)); +taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); -taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true); -taskManager.handleAssignment(taskId00Assignment, taskId01Assignment); -taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { }); +taskManager.checkStateUpdater(time.milliseconds(), noOpResetter -> { }); Review Comment: In this file, I did a lot of refactorings to use the utils to create task mocks and to mock the tasks registry. I had to touch those tests anyways so I thought I will bring them up-to-date. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12583: KAFKA-10199: Separate state updater from old restore
cadonna commented on PR #12583: URL: https://github.com/apache/kafka/pull/12583#issuecomment-1234388852 Call for review: @wcarlson5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #12583: KAFKA-10199: Separate state updater from old restore
cadonna opened a new pull request, #12583: URL: https://github.com/apache/kafka/pull/12583 Separates the code path for the new state updater from the code path of the old restoration. Ensures that with the state updater tasks are processed before all tasks are running. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…
[ https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17599002#comment-17599002 ] John Gray edited comment on KAFKA-14172 at 9/1/22 2:44 PM: --- I know next to nothing about the internal workings of Kafka, sadly, but I am noticing that KAFKA-12486 was introduced in 3.1.0, which is the version I started noticing problems. I notice you helped out with that Jira, [~ableegoldman] , is there any possible way in your mind that it might cause weirdness with state restoration? was (Author: gray.john): I know next to nothing about the internal workings of Kafka, sadly, but I am noticing that KAFKA-12486 was introduced in 3.1.0, which is the version I started noticing problems. I notice you helped out with that Jira, [~ableegoldman] , is there any possible way in your mind it might cause weirdness with state restoration? > bug: State stores lose state when tasks are reassigned under EOS wit… > - > > Key: KAFKA-14172 > URL: https://issues.apache.org/jira/browse/KAFKA-14172 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1 >Reporter: Martin Hørslev >Priority: Major > > h1. State stores lose state when tasks are reassigned under EOS with standby > replicas and default acceptable lag. > I have observed that state stores used in a transform step under a Exactly > Once semantics ends up losing state after a rebalancing event that includes > reassignment of tasks to previous standby task within the acceptable standby > lag. > > The problem is reproduceable and an integration test have been created to > showcase the [issue|https://github.com/apache/kafka/pull/12540]. > A detailed description of the observed issue is provided > [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45] > Similar issues have been observed and reported to StackOverflow for example > [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…
[ https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598956#comment-17598956 ] John Gray edited comment on KAFKA-14172 at 9/1/22 2:41 PM: --- My stateful/EOS Kafka apps also seem to be struggling on 3.1.0+, with a similar theme: it appears the restore consumers are not consuming all of their messages for a full restore before processing begins. This sad situation seems to happen consistently after Strimzi rolls out an upgrade to our cluster. Once the brokers are all rolled, it seems to trigger a rebalance in our stateful apps, and then we lose data. We do not have the extra disk space for standby replicas, so the acceptable.recovery.lag and related bits to the standby replicas are not at play for us. But the restore consumers fumbling data w/ EOS seems to be a big problem for us as well. was (Author: gray.john): My stateful/EOS Kafka apps also seem to be struggling on 3.1.0+, with a similar theme: it appears the restore consumers are not consuming all of their messages for a full restore before processing begins. This sad situation seems to happen consistently after Strimzi rolls out an upgrade to our cluster. Once the brokers are all rolled, if our stateful apps rebalance, we lose data. We do not have the extra disk space for standby replicas, so the acceptable.recovery.lag and related bits to the standby replicas are not at play for us. But the restore consumers fumbling data w/ EOS seems to be a big problem for us as well. > bug: State stores lose state when tasks are reassigned under EOS wit… > - > > Key: KAFKA-14172 > URL: https://issues.apache.org/jira/browse/KAFKA-14172 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1 >Reporter: Martin Hørslev >Priority: Major > > h1. State stores lose state when tasks are reassigned under EOS with standby > replicas and default acceptable lag. > I have observed that state stores used in a transform step under a Exactly > Once semantics ends up losing state after a rebalancing event that includes > reassignment of tasks to previous standby task within the acceptable standby > lag. > > The problem is reproduceable and an integration test have been created to > showcase the [issue|https://github.com/apache/kafka/pull/12540]. > A detailed description of the observed issue is provided > [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45] > Similar issues have been observed and reported to StackOverflow for example > [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…
[ https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17599002#comment-17599002 ] John Gray commented on KAFKA-14172: --- I know next to nothing about the internal workings of Kafka, sadly, but I am noticing that KAFKA-12486 was introduced in 3.1.0, which is the version I started noticing problems. I notice you helped out with that Jira, [~ableegoldman] , is there any possible way in your mind it might cause weirdness with state restoration? > bug: State stores lose state when tasks are reassigned under EOS wit… > - > > Key: KAFKA-14172 > URL: https://issues.apache.org/jira/browse/KAFKA-14172 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1 >Reporter: Martin Hørslev >Priority: Major > > h1. State stores lose state when tasks are reassigned under EOS with standby > replicas and default acceptable lag. > I have observed that state stores used in a transform step under a Exactly > Once semantics ends up losing state after a rebalancing event that includes > reassignment of tasks to previous standby task within the acceptable standby > lag. > > The problem is reproduceable and an integration test have been created to > showcase the [issue|https://github.com/apache/kafka/pull/12540]. > A detailed description of the observed issue is provided > [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45] > Similar issues have been observed and reported to StackOverflow for example > [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960708055 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); +} +} +} + +/** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. Review Comment: This is the order in which the nodes in the graph are visited and added to the topology. It matters here because the join node can have siblings with smaller build priority, hence they will be before it in the topological order of the topology (applied before the join). The only acceptable nodes are the JoinWindow nodes. If there are others, then it is not a self-join. This check covers cases like: ``` stream1 = builder.stream("topic1"); streams1.mapValues(v -> v); stream2 = builder.stream("topic1"); // same topic stream1.join(stream2) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…
[ https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598956#comment-17598956 ] John Gray edited comment on KAFKA-14172 at 9/1/22 1:29 PM: --- My stateful/EOS Kafka apps also seem to be struggling on 3.1.0+, with a similar theme: it appears the restore consumers are not consuming all of their messages for a full restore before processing begins. This sad situation seems to happen consistently after Strimzi rolls out an upgrade to our cluster. Once the brokers are all rolled, if our stateful apps rebalance, we lose data. We do not have the extra disk space for standby replicas, so the acceptable.recovery.lag and related bits to the standby replicas are not at play for us. But the restore consumers fumbling data w/ EOS seems to be a big problem for us as well. was (Author: gray.john): My stateful/EOS Kafka apps also seem to be struggling on 3.0.0+, with a similar theme: it appears the restore consumers are not consuming all of their messages for a full restore before processing begins. This sad situation seems to happen consistently after Strimzi rolls out an upgrade to our cluster. Once the brokers are all rolled, if our stateful apps rebalance, we lose data. We do not have the extra disk space for standby replicas, so the acceptable.recovery.lag and related bits to the standby replicas are not at play for us. But the restore consumers fumbling data w/ EOS seems to be a big problem for us as well. > bug: State stores lose state when tasks are reassigned under EOS wit… > - > > Key: KAFKA-14172 > URL: https://issues.apache.org/jira/browse/KAFKA-14172 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1 >Reporter: Martin Hørslev >Priority: Major > > h1. State stores lose state when tasks are reassigned under EOS with standby > replicas and default acceptable lag. > I have observed that state stores used in a transform step under a Exactly > Once semantics ends up losing state after a rebalancing event that includes > reassignment of tasks to previous standby task within the acceptable standby > lag. > > The problem is reproduceable and an integration test have been created to > showcase the [issue|https://github.com/apache/kafka/pull/12540]. > A detailed description of the observed issue is provided > [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45] > Similar issues have been observed and reported to StackOverflow for example > [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)
C0urante commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1234282325 @showuon @mimaison @jsancio are we okay with adding this to the 3.3 branch? There's very little additional risk; the only changes to non-testing code are an additional branch [here](https://github.com/C0urante/kafka/blob/2f62b7469cc97a79f114413a71d9d18899022a61/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2407-L2411) and adding a default value for an `Optional` field [here](https://github.com/C0urante/kafka/blob/2f62b7469cc97a79f114413a71d9d18899022a61/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L169-L170). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960654203 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { Review Comment: No, there can be more than two nodes. That's why I am doing a for loop and I am checking to find the nodes that I need to remove with `isStreamJoinWindowNode` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960647123 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); +} +} +} + +/** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ +private boolean isSelfJoin(final GraphNode streamJoinNode) { +final AtomicInteger count = new AtomicInteger(); +countSourceNodes(count, streamJoinNode, new HashSet<>()); +if (count.get() > 1) { +return false; +} +if (streamJoinNode.parentNodes().size() > 1) { +return false; +} +for (final GraphNode parent: streamJoinNode.parentNodes()) { Review Comment: No Guava in Streams :P -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…
[ https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598956#comment-17598956 ] John Gray edited comment on KAFKA-14172 at 9/1/22 1:12 PM: --- My stateful/EOS Kafka apps also seem to be struggling on 3.0.0+, with a similar theme: it appears the restore consumers are not consuming all of their messages for a full restore before processing begins. This sad situation seems to happen consistently after Strimzi rolls out an upgrade to our cluster. Once the brokers are all rolled, if our stateful apps rebalance, we lose data. We do not have the extra disk space for standby replicas, so the acceptable.recovery.lag and related bits to the standby replicas are not at play for us. But the restore consumers fumbling data w/ EOS seems to be a big problem for us as well. was (Author: gray.john): My stateful/EOS Kafka apps also seem to be struggling on 3.0.0+, with a similar theme: it appears the restore consumers are not consuming all of their messages for a full restore before processing begins. This sad situation seems to happen consistently after Strimzi rolls out an upgrade to our cluster. Once the brokers are all rolled, if our stateful apps rebalance, we lose data. We do not have the extra disk space for standby replicas, so the acceptable.recovery.lag and related bits to the standby replicas are not at play for us. But the restore consumers fumbling data w/ EOS seems to be a big problem for us. > bug: State stores lose state when tasks are reassigned under EOS wit… > - > > Key: KAFKA-14172 > URL: https://issues.apache.org/jira/browse/KAFKA-14172 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1 >Reporter: Martin Hørslev >Priority: Major > > h1. State stores lose state when tasks are reassigned under EOS with standby > replicas and default acceptable lag. > I have observed that state stores used in a transform step under a Exactly > Once semantics ends up losing state after a rebalancing event that includes > reassignment of tasks to previous standby task within the acceptable standby > lag. > > The problem is reproduceable and an integration test have been created to > showcase the [issue|https://github.com/apache/kafka/pull/12540]. > A detailed description of the observed issue is provided > [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45] > Similar issues have been observed and reported to StackOverflow for example > [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…
[ https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598956#comment-17598956 ] John Gray commented on KAFKA-14172: --- My stateful/EOS Kafka apps also seem to be struggling on 3.0.0+, with a similar theme: it appears the restore consumers are not consuming all of their messages for a full restore before processing begins. This sad situation seems to happen consistently after Strimzi rolls out an upgrade to our cluster. Once the brokers are all rolled, if our stateful apps rebalance, we lose data. We do not have the extra disk space for standby replicas, so the acceptable.recovery.lag and related bits to the standby replicas are not at play for us. But the restore consumers fumbling data w/ EOS seems to be a big problem for us. > bug: State stores lose state when tasks are reassigned under EOS wit… > - > > Key: KAFKA-14172 > URL: https://issues.apache.org/jira/browse/KAFKA-14172 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1 >Reporter: Martin Hørslev >Priority: Major > > h1. State stores lose state when tasks are reassigned under EOS with standby > replicas and default acceptable lag. > I have observed that state stores used in a transform step under a Exactly > Once semantics ends up losing state after a rebalancing event that includes > reassignment of tasks to previous standby task within the acceptable standby > lag. > > The problem is reproduceable and an integration test have been created to > showcase the [issue|https://github.com/apache/kafka/pull/12540]. > A detailed description of the observed issue is provided > [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45] > Similar issues have been observed and reported to StackOverflow for example > [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vvcephei opened a new pull request, #12582: MINOR: Demystify rebalance schedule log
vvcephei opened a new pull request, #12582: URL: https://github.com/apache/kafka/pull/12582 People have a hard time understanding what's going on with probing rebalances, partly due to cryptic messages like this: ``` 2022-08-31T12:03:29.197+09:00 . Requested to schedule probing rebalance for 1661915609131 ms. 2022-08-31T12:04:29.578+09:00 . Requested to schedule probing rebalance for 1661915669542 ms. ``` We should provide a little more context and also encode the timestamp as a readable date/time. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #12573: KAFKA-10199: Remove changelog unregister from state updater
cadonna merged PR #12573: URL: https://github.com/apache/kafka/pull/12573 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12573: KAFKA-10199: Remove changelog unregister from state updater
cadonna commented on PR #12573: URL: https://github.com/apache/kafka/pull/12573#issuecomment-1234212740 Build failures are not related: ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 17 and Scala 2.13 / kafka.api.SaslClientsWithInvalidCredentialsTest.testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure() Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsTest.testCommitTransactionTimeout(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies[at_least_once] Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once] Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.KStreamRepartitionIntegrationTest.shouldGoThroughRebalancingCorrectly[Optimization = all] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960474042 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,17 +276,36 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(null); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology(final Properties props) { +// Vicky: Do we need to verify props? +final List optimizationConfigs; +if (props == null) { +optimizationConfigs = new ArrayList<>(); +optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION); +} else { +final StreamsConfig config = new StreamsConfig(props); +optimizationConfigs = config.getList(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG); +} mergeDuplicateSourceNodes(); -if (optimizeTopology) { -LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); +if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE) Review Comment: A user can optimize either by providing the config `all` which is the `StreamsConfig.OPTIMIZE` which will apply all optimization rules or by specifying a list of optimization rules. With your suggestion, an optimization will be applied only if `contains(OPTIMIZE)` is true which is not correct. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960457146 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: Hey @guozhangwang ! Cases 1 and 2 are optimizable and will be optimized by the algorithm. I have tests for these cases in `InternalStreamsBuilderTest`. Case 3 is not optimizable and won't be recognized. The reason is that processors like `mapValues` or `filter` or `transform` are black-boxes. There is no way to know how they change the contents of a stream hence there is no way to figure out if the two sides of the join are still the same. Case 4 could be optimizable but I did not consider it. I initially only had in-scope cases like 1 and 2. I can add it by adding a special rule to the rewriter that would check if the parent of the join is a merge, then it's ok to have multiple source nodes as long as they are ancestors of the merge node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960457146 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: Hey @guozhangwang ! Cases 1 and 2 are optimizable and will be optimized by the algorithm. I have tests for this cases in `InternalStreamsBuilderTest`. Case 3 is not optimizable and won't be recognized. The reason is that processors like `mapValues` or `filter` or `transform` are black-boxes. There is no way to know how they change the contents of a stream hence there is no way to figure out if the two sides of the join are still the same. Case 4 could be optimizable but I did not consider it. I initially only had in-scope cases like 1 and 2. I can add it by adding a special rule to the rewriter that would check if the parent of the join is a merge, then it's ok to have multiple source nodes as long as they are ancestors of the merge node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration
yashmayya commented on code in PR #12478: URL: https://github.com/apache/kafka/pull/12478#discussion_r960439387 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ## @@ -199,112 +202,156 @@ private RetryWithToleranceOperator setupExecutor() { @Test public void testExecAndHandleRetriableErrorOnce() throws Exception { -execAndHandleRetriableError(1, 300, new RetriableException("Test")); +execAndHandleRetriableError(6000, 1, Collections.singletonList(300L), new RetriableException("Test"), true); } @Test public void testExecAndHandleRetriableErrorThrice() throws Exception { -execAndHandleRetriableError(3, 2100, new RetriableException("Test")); +execAndHandleRetriableError(6000, 3, Arrays.asList(300L, 600L, 1200L), new RetriableException("Test"), true); +} + +@Test +public void testExecAndHandleRetriableErrorWithInfiniteRetries() throws Exception { +execAndHandleRetriableError(-1, 8, Arrays.asList(300L, 600L, 1200L, 2400L, 4800L, 9600L, 19200L, 38400L), new RetriableException("Test"), true); +} + +@Test +public void testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws Exception { +execAndHandleRetriableError(6000, 10, Arrays.asList(300L, 600L, 1200L, 2400L, 1500L), new RetriableException("Test"), false); } @Test public void testExecAndHandleNonRetriableErrorOnce() throws Exception { -execAndHandleNonRetriableError(1, 0, new Exception("Non Retriable Test")); +execAndHandleNonRetriableError(1, new Exception("Non Retriable Test")); } @Test public void testExecAndHandleNonRetriableErrorThrice() throws Exception { -execAndHandleNonRetriableError(3, 0, new Exception("Non Retriable Test")); +execAndHandleNonRetriableError(3, new Exception("Non Retriable Test")); } -public void execAndHandleRetriableError(int numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception { +@Test +public void testExitLatch() throws Exception { MockTime time = new MockTime(0, 0, 0); -RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time); +CountDownLatch exitLatch = EasyMock.createStrictMock(CountDownLatch.class); +RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch); retryWithToleranceOperator.metrics(errorHandlingMetrics); +EasyMock.expect(mockOperation.call()).andThrow(new RetriableException("test")).anyTimes(); +EasyMock.expect(exitLatch.await(300, TimeUnit.MILLISECONDS)).andAnswer(() -> { +time.sleep(300); +return false; +}); +EasyMock.expect(exitLatch.await(600, TimeUnit.MILLISECONDS)).andAnswer(() -> { +time.sleep(600); +return false; +}); +EasyMock.expect(exitLatch.await(1200, TimeUnit.MILLISECONDS)).andAnswer(() -> { +time.sleep(1200); +return false; +}); +EasyMock.expect(exitLatch.await(2400, TimeUnit.MILLISECONDS)).andAnswer(() -> { +time.sleep(2400); +retryWithToleranceOperator.triggerStop(); Review Comment: I don't think this is ideal but I couldn't think of a better place to make this call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration
yashmayya commented on code in PR #12478: URL: https://github.com/apache/kafka/pull/12478#discussion_r960436353 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ## @@ -175,9 +179,8 @@ protected V execAndRetry(Operation operation) throws Exception { log.trace("Caught a retriable exception while executing {} operation with {}", context.stage(), context.executingClass()); errorHandlingMetrics.recordFailure(); if (checkRetry(startTime)) { Review Comment: Fair enough, I do think that the relevant logic is already tested by the `testExecAndHandle...` tests (I've added an additional one to check the max retries exceeded case) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration
yashmayya commented on code in PR #12478: URL: https://github.com/apache/kafka/pull/12478#discussion_r960435913 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ## @@ -269,7 +289,7 @@ void backoff(int attempt, long deadline) { delay = deadline - time.milliseconds(); } log.debug("Sleeping for {} millis", delay); -time.sleep(delay); +return !exitLatch.await(delay, TimeUnit.MILLISECONDS); Review Comment: Ah, that's a very good point! I didn't account for this because I had (incorrectly) assumed that if a thread's interrupt status is set and a `CountDownLatch` has already been counted down to 0, a call to `CountDownLatch::await` would return immediately with `true` (i.e. the latch's count would be given "priority" over the thread's interrupt status). However, this isn't actually the case and the call to `CountDownLatch::await` will instead immediately throw an `InterruptedException` 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jokeryin432 opened a new pull request, #12581: corporate.xml
jokeryin432 opened a new pull request, #12581: URL: https://github.com/apache/kafka/pull/12581 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #12574: KAFKA-13908 Rethrow ExecutionException to preserve original cause
nizhikov commented on PR #12574: URL: https://github.com/apache/kafka/pull/12574#issuecomment-1233908096 Hello @showuon. Can you, please, take a look at my changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers
[ https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598790#comment-17598790 ] Sergey Ivanov commented on KAFKA-13077: --- [~ivanyu], ohh, we continued investigation and found out that root cause much easier: there was some issue with ZooKeeper day ago and all ZooKeeper data was cleaned (by mistake), including Kafka topics configurations. Of course it made Kafka crazy. We found it with executing ZK command {code:java} stat /broker/topics/__consumer_offsets{code} where we saw it was created day ago (while Kafka cluster is running for a few months), after further investigation found issue with ZK storage cleaning. So, in our case root cause is clear. But as a solution we cleaned up Kafka data too, for our cases it wasn't critical. > Replication failing after unclean shutdown of ZK and all brokers > > > Key: KAFKA-13077 > URL: https://issues.apache.org/jira/browse/KAFKA-13077 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Christopher Auston >Priority: Minor > > I am submitting this in the spirit of what can go wrong when an operator > violates the constraints Kafka depends on. I don't know if Kafka could or > should handle this more gracefully. I decided to file this issue because it > was easy to get the problem I'm reporting with Kubernetes StatefulSets (STS). > By "easy" I mean that I did not go out of my way to corrupt anything, I just > was not careful when restarting ZK and brokers. > I violated the constraints of keeping Zookeeper stable and at least one > running in-sync replica. > I am running the bitnami/kafka helm chart on Amazon EKS. > {quote}% kubectl get po kaf-kafka-0 -ojson |jq .spec.containers'[].image' > "docker.io/bitnami/kafka:2.8.0-debian-10-r43" > {quote} > I started with 3 ZK instances and 3 brokers (both STS). I changed the > cpu/memory requests on both STS and kubernetes proceeded to restart ZK and > kafka instances at the same time. If I recall correctly there were some > crashes and several restarts but eventually all the instances were running > again. It's possible all ZK nodes and all brokers were unavailable at various > points. > The problem I noticed was that two of the brokers were just continually > spitting out messages like: > {quote}% kubectl logs kaf-kafka-0 --tail 10 > [2021-07-13 14:26:08,871] INFO [ProducerStateManager > partition=__transaction_state-0] Loading producer state from snapshot file > 'SnapshotFile(/bitnami/kafka/data/__transaction_state-0/0001.snapshot,1)' > (kafka.log.ProducerStateManager) > [2021-07-13 14:26:08,871] WARN [Log partition=__transaction_state-0, > dir=/bitnami/kafka/data] *Non-monotonic update of high watermark from > (offset=2744 segment=[0:1048644]) to (offset=1 segment=[0:169])* > (kafka.log.Log) > [2021-07-13 14:26:08,874] INFO [Log partition=__transaction_state-10, > dir=/bitnami/kafka/data] Truncating to offset 2 (kafka.log.Log) > [2021-07-13 14:26:08,877] INFO [Log partition=__transaction_state-10, > dir=/bitnami/kafka/data] Loading producer state till offset 2 with message > format version 2 (kafka.log.Log) > [2021-07-13 14:26:08,877] INFO [ProducerStateManager > partition=__transaction_state-10] Loading producer state from snapshot file > 'SnapshotFile(/bitnami/kafka/data/__transaction_state-10/0002.snapshot,2)' > (kafka.log.ProducerStateManager) > [2021-07-13 14:26:08,877] WARN [Log partition=__transaction_state-10, > dir=/bitnami/kafka/data] Non-monotonic update of high watermark from > (offset=2930 segment=[0:1048717]) to (offset=2 segment=[0:338]) > (kafka.log.Log) > [2021-07-13 14:26:08,880] INFO [Log partition=__transaction_state-20, > dir=/bitnami/kafka/data] Truncating to offset 1 (kafka.log.Log) > [2021-07-13 14:26:08,882] INFO [Log partition=__transaction_state-20, > dir=/bitnami/kafka/data] Loading producer state till offset 1 with message > format version 2 (kafka.log.Log) > [2021-07-13 14:26:08,882] INFO [ProducerStateManager > partition=__transaction_state-20] Loading producer state from snapshot file > 'SnapshotFile(/bitnami/kafka/data/__transaction_state-20/0001.snapshot,1)' > (kafka.log.ProducerStateManager) > [2021-07-13 14:26:08,883] WARN [Log partition=__transaction_state-20, > dir=/bitnami/kafka/data] Non-monotonic update of high watermark from > (offset=2956 segment=[0:1048608]) to (offset=1 segment=[0:169]) > (kafka.log.Log) > {quote} > If I describe that topic I can see that several partitions have a leader of 2 > and the ISR is just 2 (NOTE I added two more brokers and tried to reassign > the topic onto brokers 2,3,4 which you can see below). The new brokers also > spit out the messages about "non-monotonic update