[GitHub] [kafka] ableegoldman opened a new pull request, #12585: HOTFIX: fix PriorityQueue iteration to assign warmups in priority order

2022-09-01 Thread GitBox


ableegoldman opened a new pull request, #12585:
URL: https://github.com/apache/kafka/pull/12585

   Based on a patch submitted to the `confluentinc` fork & then abandoned. 
Needed some updates and minor expansion but more or less just re-applied the 
changes proposed in https://github.com/confluentinc/kafka/pull/697.
   
   Original PR has a _very_ detailed justification for these changes but the 
tl;dr of it is that apparently the PriorityQueue's iterator [does not actually 
guarantee](https://docs.oracle.com/javase/8/docs/api/java/util/PriorityQueue.html#iterator--)
 to return elements in priority order... 😒 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14196) Flaky OffsetValidationTest seems to indicate potential duplication issue during rebalance

2022-09-01 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14196:
--

 Summary: Flaky OffsetValidationTest seems to indicate potential 
duplication issue during rebalance
 Key: KAFKA-14196
 URL: https://issues.apache.org/jira/browse/KAFKA-14196
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.2.1
Reporter: Philip Nee


Several flaky tests under OffsetValidationTest are indicating potential 
consumer duplication issue, when autocommit is enabled.  Below shows the 
failure message:

 
{code:java}
Total consumed records 3366 did not match consumed position 3331 {code}
 

After investigating the log, I discovered that the data consumed between the 
start of a rebalance event and the async commit was lost for those failing 
tests.  In the example below, the rebalance event kicks in at around 
1662054846995 (first record), and the async commit of the offset 3739 is 
completed at around 1662054847015 (right before partitions_revoked).

 
{code:java}
{"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
{"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
{"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
{"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
{"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
{"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
 {code}
A few things to note here:
 # This is highly flaky, I found 1/4 runs will fail the tests
 # Manually calling commitSync in the onPartitionsRevoke cb seems to alleviate 
the issue
 # Setting includeMetadataInTimeout to false also seems to alleviate the issue.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mghosh4 commented on pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

2022-09-01 Thread GitBox


mghosh4 commented on PR #10910:
URL: https://github.com/apache/kafka/pull/10910#issuecomment-1234907832

   Any update on when this diff can be landed? Incase you need some help, I am 
happy to help. This is a problem that we have had to deal with too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee closed pull request #12580: MINOR: Fix flaky OffsetValidationTest

2022-09-01 Thread GitBox


philipnee closed pull request #12580: MINOR: Fix flaky OffsetValidationTest
URL: https://github.com/apache/kafka/pull/12580


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #12578: KAFKA-14195: Fix KRaft AlterConfig policy usage for Legacy/Full case

2022-09-01 Thread GitBox


cmccabe merged PR #12578:
URL: https://github.com/apache/kafka/pull/12578


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #12582: MINOR: Demystify rebalance schedule log

2022-09-01 Thread GitBox


vvcephei merged PR #12582:
URL: https://github.com/apache/kafka/pull/12582


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #12582: MINOR: Demystify rebalance schedule log

2022-09-01 Thread GitBox


vvcephei commented on PR #12582:
URL: https://github.com/apache/kafka/pull/12582#issuecomment-1234803264

   The java 17 build failed with this unrelated exception:
   
   ```
   10:00:43  Execution failed for task ':core:unitTest'.
   10:00:43  > Process 'Gradle Test Executor 50' finished with non-zero exit 
value 1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #12583: KAFKA-10199: Separate state updater from old restore

2022-09-01 Thread GitBox


wcarlson5 commented on code in PR #12583:
URL: https://github.com/apache/kafka/pull/12583#discussion_r960982217


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -396,8 +397,7 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
 new Tasks(new LogContext(logPrefix)),
 topologyMetadata,
 adminClient,
-stateDirectory,
-maybeCreateAndStartStateUpdater(config, changelogReader, time)
+stateDirectory, maybeCreateAndStartStateUpdater(config, 
changelogReader, time)

Review Comment:
   nit: new line



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -867,37 +868,47 @@ void runOnce() {
 }
 
 private void initializeAndRestorePhase() {
-// only try to initialize the assigned tasks
-// if the state is still in PARTITION_ASSIGNED after the poll call
+final java.util.function.Consumer> offsetResetter 
= partitions -> resetOffsets(partitions, null);
 final State stateSnapshot = state;
-if (stateSnapshot == State.PARTITIONS_ASSIGNED
-|| stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
+if (stateUpdaterEnabled) {
+checkStateUpdater();
+} else {
+// only try to initialize the assigned tasks
+// if the state is still in PARTITION_ASSIGNED after the poll call
+if (stateSnapshot == State.PARTITIONS_ASSIGNED
+|| stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
 
-log.debug("State is {}; initializing tasks if necessary", 
stateSnapshot);
+log.debug("State is {}; initializing tasks if necessary", 
stateSnapshot);
 
-// transit to restore active is idempotent so we can call it 
multiple times
-changelogReader.enforceRestoreActive();
+if (taskManager.tryToCompleteRestoration(now, offsetResetter)) 
{
+log.info("Restoration took {} ms for all tasks {}", 
time.milliseconds() - lastPartitionAssignedMs,
+taskManager.allTasks().keySet());
+setState(State.RUNNING);
+}
 
-if (taskManager.tryToCompleteRestoration(now, partitions -> 
resetOffsets(partitions, null))) {
-changelogReader.transitToUpdateStandby();
-log.info("Restoration took {} ms for all tasks {}", 
time.milliseconds() - lastPartitionAssignedMs,
-taskManager.allTasks().keySet());
-setState(State.RUNNING);
+if (log.isDebugEnabled()) {
+log.debug("Initialization call done. State is {}", state);
+}
 }
 
 if (log.isDebugEnabled()) {
-log.debug("Initialization call done. State is {}", state);
+log.debug("Idempotently invoking restoration logic in state 
{}", state);

Review Comment:
   `Idempotently` only for the state. I think that this log could be confusing 
if you read it as restore logic is Idempotently which I don't think if correct.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -770,7 +771,7 @@ void runOnce() {
 long totalCommitLatency = 0L;
 long totalProcessLatency = 0L;
 long totalPunctuateLatency = 0L;
-if (state == State.RUNNING) {
+if (state == State.RUNNING || stateUpdaterEnabled) {

Review Comment:
   This seems risky. Can you explain what states this flag could be true?
   
   I am worries about CREATED and ERROR states mostly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r961032133


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   Discussed offline with @guozhangwang .
   
   If the `map` or other transformation is a sibling of the join node but the 
join node has a single parent, then this means this transformation is a no-op 
and hence the optimization can be applied. For the transformation to not be a 
no-op, the join node must have two or more parents, hence the optimization 
won't be applicable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r961012010


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   The logical plan will have only one source since they will get merged as 
they are referring to the same topic. Hence, all the nodes will be children of 
the source node and siblings of the join node. 
   For example,
   Case 2:
   ```
   root -> source
   source -> join-window1, join-window2, stream-join
   ```
   
   and
   Case 3:
   ```
   root -> source
   source -> join-window1, map-values1, join-window2, stream-join
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r961002924


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.

Review Comment:
   Just curious, in the above case, logically speaking since we are not 
reassigning the map-valued result stream back to stream1 like `stream1 = 
stream1.mapValues()`, this step is basically a no-op and should not affect 
stream1 right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960983275


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   Hmm.. for case 2), the logical plan would be like:
   
   ```
   root -> source1
   root -> source2
   source1 -> join-window1
   source2 -> join-window2
   source1,source2 -> stream-join
   ```
   
   Is that right?
   
   For case 3), since we are not re-assigning the result of `mapValues` to 
stream` it should just be a no-op right? The logical plan would be like:
   
   ```
   root -> source1
   root -> source2
   source1 -> join-window1
   source1 -> map-values1
   source2 -> join-window2
   source1,source2 -> stream-join
   ```
   
   Should that be considered optimizable as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14195) Fix KRaft AlterConfig policy usage for Legacy/Full case

2022-09-01 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-14195:
-

 Summary: Fix KRaft AlterConfig policy usage for Legacy/Full case
 Key: KAFKA-14195
 URL: https://issues.apache.org/jira/browse/KAFKA-14195
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.3
Reporter: Ron Dagostino
Assignee: Ron Dagostino


The fix for https://issues.apache.org/jira/browse/KAFKA-14039 adjusted the 
invocation of the alter configs policy check in KRaft to match the behavior in 
ZooKeeper, which is to only provide the configs that were explicitly sent in 
the request. While the code was correct for the incremental alter configs case, 
the code actually included the implicit deletions for the 
legacy/non-incremental alter configs case, and those implicit deletions are not 
included in the ZooKeeper-based invocation. The implicit deletions should not 
be passed in the legacy case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on a diff in pull request #12578: KAFKA-14039: Additional compatibility fix for KRaft Alter Configs

2022-09-01 Thread GitBox


rondagostino commented on code in PR #12578:
URL: https://github.com/apache/kafka/pull/12578#discussion_r960958343


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -248,33 +249,45 @@ private void 
incrementalAlterConfigResource(ConfigResource configResource,
 }
 
 private ApiError validateAlterConfig(ConfigResource configResource,
- List newRecords,
+ List 
recordsExplicitlyAltered,
+ List 
recordsImplicitlyDeleted,
  boolean newlyCreatedResource) {
 Map allConfigs = new HashMap<>();
-Map alteredConfigs = new HashMap<>();
+Map alteredConfigsForAlterConfigPolicyCheck = new 
HashMap<>();
 TimelineHashMap existingConfigs = 
configData.get(configResource);
 if (existingConfigs != null) allConfigs.putAll(existingConfigs);
-for (ApiMessageAndVersion newRecord : newRecords) {
+for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) {
 ConfigRecord configRecord = (ConfigRecord) newRecord.message();
 if (configRecord.value() == null) {
 allConfigs.remove(configRecord.name());
 } else {
 allConfigs.put(configRecord.name(), configRecord.value());
 }
-alteredConfigs.put(configRecord.name(), configRecord.value());
+alteredConfigsForAlterConfigPolicyCheck.put(configRecord.name(), 
configRecord.value());
+}
+for (ApiMessageAndVersion recordImplicitlyDeleted : 
recordsImplicitlyDeleted) {
+ConfigRecord configRecord = (ConfigRecord) 
recordImplicitlyDeleted.message();
+allConfigs.remove(configRecord.name());
+// As per KAFKA-14039, do not include implicit deletions caused by 
using the legacy AlterConfigs API
+// in the list passed to the policy in order to maintain backwards 
compatibility

Review Comment:
   The order a few lines above is `allConfigs.remove(configRecord.name());` 
followed by `alteredConfigsForAlterConfigPolicyCheck.put(configRecord.name(), 
configRecord.value());`.  If we mirror that here then the comment appears in 
the place where `alteredConfigsForAlterConfigPolicyCheck.put()` would occur.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960956896


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.
+ */
+private boolean isSelfJoin(final GraphNode streamJoinNode) {
+final AtomicInteger count = new AtomicInteger();
+countSourceNodes(count, streamJoinNode, new HashSet<>());
+if (count.get() > 1) {
+return false;
+}
+if (streamJoinNode.parentNodes().size() > 1) {
+return false;
+}
+for (final GraphNode parent: streamJoinNode.parentNodes()) {
+for (final GraphNode sibling : parent.children()) {
+if (sibling instanceof ProcessorGraphNode) {
+if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+continue;
+}
+}
+if (sibling != streamJoinNode
+&& sibling.buildPriority() < 
streamJoinNode.buildPriority()) {
+return false;
+}
+}
+}
+return true;
+}
+
+private void countSourceNodes(
+final AtomicInteger count,
+final GraphNode currentNode,
+final Set visited) {
+
+if (currentNode instanceof StreamSourceNode) {
+count.incrementAndGet();
+}
+
+for (final GraphNode parent: currentNode.parentNodes()) {
+if (!visited.contains(parent)) {

Review Comment:
   Great catch, thank you! I am not sure about the latter so I implemented the 
equals and hashcode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12578: KAFKA-14039: Additional compatibility fix for KRaft Alter Configs

2022-09-01 Thread GitBox


jsancio commented on code in PR #12578:
URL: https://github.com/apache/kafka/pull/12578#discussion_r960954024


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -248,33 +249,45 @@ private void 
incrementalAlterConfigResource(ConfigResource configResource,
 }
 
 private ApiError validateAlterConfig(ConfigResource configResource,
- List newRecords,
+ List 
recordsExplicitlyAltered,
+ List 
recordsImplicitlyDeleted,
  boolean newlyCreatedResource) {
 Map allConfigs = new HashMap<>();
-Map alteredConfigs = new HashMap<>();
+Map alteredConfigsForAlterConfigPolicyCheck = new 
HashMap<>();
 TimelineHashMap existingConfigs = 
configData.get(configResource);
 if (existingConfigs != null) allConfigs.putAll(existingConfigs);
-for (ApiMessageAndVersion newRecord : newRecords) {
+for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) {
 ConfigRecord configRecord = (ConfigRecord) newRecord.message();
 if (configRecord.value() == null) {
 allConfigs.remove(configRecord.name());
 } else {
 allConfigs.put(configRecord.name(), configRecord.value());
 }
-alteredConfigs.put(configRecord.name(), configRecord.value());
+alteredConfigsForAlterConfigPolicyCheck.put(configRecord.name(), 
configRecord.value());
+}
+for (ApiMessageAndVersion recordImplicitlyDeleted : 
recordsImplicitlyDeleted) {
+ConfigRecord configRecord = (ConfigRecord) 
recordImplicitlyDeleted.message();
+allConfigs.remove(configRecord.name());
+// As per KAFKA-14039, do not include implicit deletions caused by 
using the legacy AlterConfigs API
+// in the list passed to the policy in order to maintain backwards 
compatibility

Review Comment:
   Should this documentation come before the lines 269-270?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-09-01 Thread GitBox


ashmeet13 commented on PR #12414:
URL: https://github.com/apache/kafka/pull/12414#issuecomment-1234551652

   Thank you @jsancio for the review. Have made the changes.
   
   For the following comment -
   >It looks like this method is only used by tests. I think it is okay for 
this to have an "unknown reason" for now.
   
   Currently the function in discussion has no logging taking place hence I 
haven't added the `UnkownReason` in. Should I update the function to log the 
same?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ashmeet13 commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-09-01 Thread GitBox


ashmeet13 commented on code in PR #12414:
URL: https://github.com/apache/kafka/pull/12414#discussion_r960902453


##
core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala:
##
@@ -31,5 +32,5 @@ trait MetadataSnapshotter {
*
* @return  True if we will write out a new snapshot; 
false otherwise.
*/
-  def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): 
Boolean
+  def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage, 
reason: Set[SnapshotReason]): Boolean

Review Comment:
   Added the `reason` as a `param` in the documentation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ashmeet13 commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-09-01 Thread GitBox


ashmeet13 commented on code in PR #12414:
URL: https://github.com/apache/kafka/pull/12414#discussion_r960901993


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##
@@ -59,11 +60,11 @@ class BrokerMetadataSnapshotter(
   val writer = writerBuilder.build(
 image.highestOffsetAndEpoch().offset,
 image.highestOffsetAndEpoch().epoch,
-lastContainedLogTime
-  )
+lastContainedLogTime)

Review Comment:
   My bad, must have done this by mistake. Fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ashmeet13 commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-09-01 Thread GitBox


ashmeet13 commented on code in PR #12414:
URL: https://github.com/apache/kafka/pull/12414#discussion_r960901730


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -119,16 +120,29 @@ class BrokerMetadataListener(
   }
 
   _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-  if (shouldSnapshot()) {
-maybeStartSnapshot()
+  
+  val shouldTakeSnapshot: Set[SnapshotReason] = shouldSnapshot()
+  if (shouldTakeSnapshot.nonEmpty) {
+maybeStartSnapshot(shouldTakeSnapshot)
   }
 
   _publisher.foreach(publish)
 }
   }
 
-  private def shouldSnapshot(): Boolean = {
-(_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || 
metadataVersionChanged()
+  private def shouldSnapshot(): Set[SnapshotReason] = {
+val metadataVersionHasChanged: Boolean = metadataVersionChanged()
+val maxBytesHaveExceeded: Boolean = (_bytesSinceLastSnapshot >= 
maxBytesBetweenSnapshots)

Review Comment:
   Removed the type declaration



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ndrwdn opened a new pull request, #12584: KAFKA-14194 Fix NPE in Cluster.nodeIfOnline

2022-09-01 Thread GitBox


ndrwdn opened a new pull request, #12584:
URL: https://github.com/apache/kafka/pull/12584

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14194) NPE in Cluster.nodeIfOnline

2022-09-01 Thread Andrew Dean (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Dean updated KAFKA-14194:

Component/s: clients

> NPE in Cluster.nodeIfOnline
> ---
>
> Key: KAFKA-14194
> URL: https://issues.apache.org/jira/browse/KAFKA-14194
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Andrew Dean
>Priority: Major
>
> When utilizing rack-aware Kafka consumers and the Kafka broker cluster is 
> restarted an NPE can occur during transient metadata updates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14194) NPE in Cluster.nodeIfOnline

2022-09-01 Thread Andrew Dean (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Dean updated KAFKA-14194:

Affects Version/s: 3.2.1

> NPE in Cluster.nodeIfOnline
> ---
>
> Key: KAFKA-14194
> URL: https://issues.apache.org/jira/browse/KAFKA-14194
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.1
>Reporter: Andrew Dean
>Priority: Major
>
> When utilizing rack-aware Kafka consumers and the Kafka broker cluster is 
> restarted an NPE can occur during transient metadata updates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14194) NPE in Cluster.nodeIfOnline

2022-09-01 Thread Andrew Dean (Jira)
Andrew Dean created KAFKA-14194:
---

 Summary: NPE in Cluster.nodeIfOnline
 Key: KAFKA-14194
 URL: https://issues.apache.org/jira/browse/KAFKA-14194
 Project: Kafka
  Issue Type: Bug
Reporter: Andrew Dean


When utilizing rack-aware Kafka consumers and the Kafka broker cluster is 
restarted an NPE can occur during transient metadata updates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960845326


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);

Review Comment:
   We can have n-way self-joins but they won't get optimized, only the first 
one will get optimized. However, we can have topologies that have more than one 
self-joins, not n-way but independent pairs. And all of them can and should be 
optimized. I added a test case in 
`InternalStreamsBuilderTest:shouldMarkAllStreamStreamJoinsAsSelfJoin`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960845326


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);

Review Comment:
   We can have multiple joins but they won't get optimized, only the first one 
will get optimized. However, we can have topologies that have more than one 
self-joins, not n-way but independent pairs. And all of them can and should be 
optimized. I added a test case in 
`InternalStreamsBuilderTest:shouldMarkAllStreamStreamJoinsAsSelfJoin`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12519: KAFKA-10199: Handle exceptions from state updater

2022-09-01 Thread GitBox


cadonna commented on PR #12519:
URL: https://github.com/apache/kafka/pull/12519#issuecomment-1234414581

   I am sorry, one of my merged PRs has introduced conflicts here. 🙁 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)

2022-09-01 Thread GitBox


mimaison commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1234403651

   Yes if the system tests pass, I'm in favor of merging this in 3.3. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…

2022-09-01 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17599002#comment-17599002
 ] 

John Gray edited comment on KAFKA-14172 at 9/1/22 2:56 PM:
---

I know next to nothing about the internal workings of Kafka, sadly, but I am 
noticing that KAFKA-12486 was introduced in 3.1.0, which is the version I 
started noticing problems. I see you helped out with that Jira, [~ableegoldman] 
, is there any possible way in your mind that it might cause weirdness with 
state restoration? 


was (Author: gray.john):
I know next to nothing about the internal workings of Kafka, sadly, but I am 
noticing that KAFKA-12486 was introduced in 3.1.0, which is the version I 
started noticing problems. I notice you helped out with that Jira, 
[~ableegoldman] , is there any possible way in your mind that it might cause 
weirdness with state restoration? 

> bug: State stores lose state when tasks are reassigned under EOS wit…
> -
>
> Key: KAFKA-14172
> URL: https://issues.apache.org/jira/browse/KAFKA-14172
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1
>Reporter: Martin Hørslev
>Priority: Major
>
> h1. State stores lose state when tasks are reassigned under EOS with standby 
> replicas and default acceptable lag.
> I have observed that state stores used in a transform step under a Exactly 
> Once semantics ends up losing state after a rebalancing event that includes 
> reassignment of tasks to previous standby task within the acceptable standby 
> lag.
>  
> The problem is reproduceable and an integration test have been created to 
> showcase the [issue|https://github.com/apache/kafka/pull/12540]. 
> A detailed description of the observed issue is provided 
> [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45]
> Similar issues have been observed and reported to StackOverflow for example 
> [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12583: KAFKA-10199: Separate state updater from old restore

2022-09-01 Thread GitBox


cadonna commented on code in PR #12583:
URL: https://github.com/apache/kafka/pull/12583#discussion_r960754245


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -280,25 +280,17 @@ public void shouldClassifyExistingTasksWithStateUpdater() 
{
 
 @Test
 public void shouldAddTasksToStateUpdater() {
-final StreamTask task00 = statefulTask(taskId00, taskId00Partitions)
+final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
 .withInputPartitions(taskId00Partitions)
-.inState(State.RESTORING)
-.build();
-final StandbyTask task01 = standbyTask(taskId01, taskId01Partitions)
+.inState(State.RESTORING).build();
+final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
 .withInputPartitions(taskId01Partitions)
-.inState(State.RUNNING)
-.build();
-
expect(changeLogReader.completedChangelogs()).andReturn(emptySet()).anyTimes();
-expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-consumer.resume(anyObject());
-expectLastCall().anyTimes();
-expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
-
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
-replay(activeTaskCreator, standbyTaskCreator, consumer, 
changeLogReader);
+.inState(State.RUNNING).build();
+final TasksRegistry tasks = mock(TasksRegistry.class);
+when(tasks.drainPendingTaskToInit()).thenReturn(mkSet(task00, task01));
+taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
 
-taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
-taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
-taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter 
-> { });
+taskManager.checkStateUpdater(time.milliseconds(), noOpResetter -> { 
});
 

Review Comment:
   In this file, I did a lot of refactorings to use the utils to create task 
mocks and to mock the tasks registry. I had to touch those tests anyways so I 
thought I will bring them up-to-date.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12583: KAFKA-10199: Separate state updater from old restore

2022-09-01 Thread GitBox


cadonna commented on PR #12583:
URL: https://github.com/apache/kafka/pull/12583#issuecomment-1234388852

   Call for review: @wcarlson5 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12583: KAFKA-10199: Separate state updater from old restore

2022-09-01 Thread GitBox


cadonna opened a new pull request, #12583:
URL: https://github.com/apache/kafka/pull/12583

   Separates the code path for the new state updater from
   the code path of the old restoration.
   
   Ensures that with the state updater tasks are processed
   before all tasks are running.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…

2022-09-01 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17599002#comment-17599002
 ] 

John Gray edited comment on KAFKA-14172 at 9/1/22 2:44 PM:
---

I know next to nothing about the internal workings of Kafka, sadly, but I am 
noticing that KAFKA-12486 was introduced in 3.1.0, which is the version I 
started noticing problems. I notice you helped out with that Jira, 
[~ableegoldman] , is there any possible way in your mind that it might cause 
weirdness with state restoration? 


was (Author: gray.john):
I know next to nothing about the internal workings of Kafka, sadly, but I am 
noticing that KAFKA-12486 was introduced in 3.1.0, which is the version I 
started noticing problems. I notice you helped out with that Jira, 
[~ableegoldman] , is there any possible way in your mind it might cause 
weirdness with state restoration? 

> bug: State stores lose state when tasks are reassigned under EOS wit…
> -
>
> Key: KAFKA-14172
> URL: https://issues.apache.org/jira/browse/KAFKA-14172
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1
>Reporter: Martin Hørslev
>Priority: Major
>
> h1. State stores lose state when tasks are reassigned under EOS with standby 
> replicas and default acceptable lag.
> I have observed that state stores used in a transform step under a Exactly 
> Once semantics ends up losing state after a rebalancing event that includes 
> reassignment of tasks to previous standby task within the acceptable standby 
> lag.
>  
> The problem is reproduceable and an integration test have been created to 
> showcase the [issue|https://github.com/apache/kafka/pull/12540]. 
> A detailed description of the observed issue is provided 
> [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45]
> Similar issues have been observed and reported to StackOverflow for example 
> [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…

2022-09-01 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598956#comment-17598956
 ] 

John Gray edited comment on KAFKA-14172 at 9/1/22 2:41 PM:
---

My stateful/EOS Kafka apps also seem to be struggling on 3.1.0+, with a similar 
theme: it appears the restore consumers are not consuming all of their messages 
for a full restore before processing begins. This sad situation seems to happen 
consistently after Strimzi rolls out an upgrade to our cluster. Once the 
brokers are all rolled, it seems to trigger a rebalance in our stateful apps, 
and then we lose data. We do not have the extra disk space for standby 
replicas, so the acceptable.recovery.lag and related bits to the standby 
replicas are not at play for us. But the restore consumers fumbling data w/ EOS 
seems to be a big problem for us as well. 


was (Author: gray.john):
My stateful/EOS Kafka apps also seem to be struggling on 3.1.0+, with a similar 
theme: it appears the restore consumers are not consuming all of their messages 
for a full restore before processing begins. This sad situation seems to happen 
consistently after Strimzi rolls out an upgrade to our cluster. Once the 
brokers are all rolled, if our stateful apps rebalance, we lose data. We do not 
have the extra disk space for standby replicas, so the acceptable.recovery.lag 
and related bits to the standby replicas are not at play for us. But the 
restore consumers fumbling data w/ EOS seems to be a big problem for us as 
well. 

> bug: State stores lose state when tasks are reassigned under EOS wit…
> -
>
> Key: KAFKA-14172
> URL: https://issues.apache.org/jira/browse/KAFKA-14172
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1
>Reporter: Martin Hørslev
>Priority: Major
>
> h1. State stores lose state when tasks are reassigned under EOS with standby 
> replicas and default acceptable lag.
> I have observed that state stores used in a transform step under a Exactly 
> Once semantics ends up losing state after a rebalancing event that includes 
> reassignment of tasks to previous standby task within the acceptable standby 
> lag.
>  
> The problem is reproduceable and an integration test have been created to 
> showcase the [issue|https://github.com/apache/kafka/pull/12540]. 
> A detailed description of the observed issue is provided 
> [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45]
> Similar issues have been observed and reported to StackOverflow for example 
> [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…

2022-09-01 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17599002#comment-17599002
 ] 

John Gray commented on KAFKA-14172:
---

I know next to nothing about the internal workings of Kafka, sadly, but I am 
noticing that KAFKA-12486 was introduced in 3.1.0, which is the version I 
started noticing problems. I notice you helped out with that Jira, 
[~ableegoldman] , is there any possible way in your mind it might cause 
weirdness with state restoration? 

> bug: State stores lose state when tasks are reassigned under EOS wit…
> -
>
> Key: KAFKA-14172
> URL: https://issues.apache.org/jira/browse/KAFKA-14172
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1
>Reporter: Martin Hørslev
>Priority: Major
>
> h1. State stores lose state when tasks are reassigned under EOS with standby 
> replicas and default acceptable lag.
> I have observed that state stores used in a transform step under a Exactly 
> Once semantics ends up losing state after a rebalancing event that includes 
> reassignment of tasks to previous standby task within the acceptable standby 
> lag.
>  
> The problem is reproduceable and an integration test have been created to 
> showcase the [issue|https://github.com/apache/kafka/pull/12540]. 
> A detailed description of the observed issue is provided 
> [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45]
> Similar issues have been observed and reported to StackOverflow for example 
> [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960708055


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.

Review Comment:
   This is the order in which the nodes in the graph are visited and added to 
the topology. It matters here because the join node can have siblings with 
smaller build priority, hence they will be before it in the topological order 
of the topology (applied before the join). The only acceptable nodes are the 
JoinWindow nodes. If there are others, then it is not a self-join. 
   
   This check covers cases like:
   ```
   stream1 = builder.stream("topic1");
   streams1.mapValues(v -> v);
   stream2 = builder.stream("topic1"); // same topic
   stream1.join(stream2)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…

2022-09-01 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598956#comment-17598956
 ] 

John Gray edited comment on KAFKA-14172 at 9/1/22 1:29 PM:
---

My stateful/EOS Kafka apps also seem to be struggling on 3.1.0+, with a similar 
theme: it appears the restore consumers are not consuming all of their messages 
for a full restore before processing begins. This sad situation seems to happen 
consistently after Strimzi rolls out an upgrade to our cluster. Once the 
brokers are all rolled, if our stateful apps rebalance, we lose data. We do not 
have the extra disk space for standby replicas, so the acceptable.recovery.lag 
and related bits to the standby replicas are not at play for us. But the 
restore consumers fumbling data w/ EOS seems to be a big problem for us as 
well. 


was (Author: gray.john):
My stateful/EOS Kafka apps also seem to be struggling on 3.0.0+, with a similar 
theme: it appears the restore consumers are not consuming all of their messages 
for a full restore before processing begins. This sad situation seems to happen 
consistently after Strimzi rolls out an upgrade to our cluster. Once the 
brokers are all rolled, if our stateful apps rebalance, we lose data. We do not 
have the extra disk space for standby replicas, so the acceptable.recovery.lag 
and related bits to the standby replicas are not at play for us. But the 
restore consumers fumbling data w/ EOS seems to be a big problem for us as 
well. 

> bug: State stores lose state when tasks are reassigned under EOS wit…
> -
>
> Key: KAFKA-14172
> URL: https://issues.apache.org/jira/browse/KAFKA-14172
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1
>Reporter: Martin Hørslev
>Priority: Major
>
> h1. State stores lose state when tasks are reassigned under EOS with standby 
> replicas and default acceptable lag.
> I have observed that state stores used in a transform step under a Exactly 
> Once semantics ends up losing state after a rebalancing event that includes 
> reassignment of tasks to previous standby task within the acceptable standby 
> lag.
>  
> The problem is reproduceable and an integration test have been created to 
> showcase the [issue|https://github.com/apache/kafka/pull/12540]. 
> A detailed description of the observed issue is provided 
> [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45]
> Similar issues have been observed and reported to StackOverflow for example 
> [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)

2022-09-01 Thread GitBox


C0urante commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1234282325

   @showuon @mimaison @jsancio are we okay with adding this to the 3.3 branch? 
There's very little additional risk; the only changes to non-testing code are 
an additional branch 
[here](https://github.com/C0urante/kafka/blob/2f62b7469cc97a79f114413a71d9d18899022a61/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2407-L2411)
 and adding a default value for an `Optional` field 
[here](https://github.com/C0urante/kafka/blob/2f62b7469cc97a79f114413a71d9d18899022a61/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L169-L170).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960654203


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {

Review Comment:
   No, there can be more than two nodes. That's why I am doing a for loop and I 
am checking to find the nodes that I need to remove with 
`isStreamJoinWindowNode` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960647123


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.
+ */
+private boolean isSelfJoin(final GraphNode streamJoinNode) {
+final AtomicInteger count = new AtomicInteger();
+countSourceNodes(count, streamJoinNode, new HashSet<>());
+if (count.get() > 1) {
+return false;
+}
+if (streamJoinNode.parentNodes().size() > 1) {
+return false;
+}
+for (final GraphNode parent: streamJoinNode.parentNodes()) {

Review Comment:
   No Guava in Streams :P



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…

2022-09-01 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598956#comment-17598956
 ] 

John Gray edited comment on KAFKA-14172 at 9/1/22 1:12 PM:
---

My stateful/EOS Kafka apps also seem to be struggling on 3.0.0+, with a similar 
theme: it appears the restore consumers are not consuming all of their messages 
for a full restore before processing begins. This sad situation seems to happen 
consistently after Strimzi rolls out an upgrade to our cluster. Once the 
brokers are all rolled, if our stateful apps rebalance, we lose data. We do not 
have the extra disk space for standby replicas, so the acceptable.recovery.lag 
and related bits to the standby replicas are not at play for us. But the 
restore consumers fumbling data w/ EOS seems to be a big problem for us as 
well. 


was (Author: gray.john):
My stateful/EOS Kafka apps also seem to be struggling on 3.0.0+, with a similar 
theme: it appears the restore consumers are not consuming all of their messages 
for a full restore before processing begins. This sad situation seems to happen 
consistently after Strimzi rolls out an upgrade to our cluster. Once the 
brokers are all rolled, if our stateful apps rebalance, we lose data. We do not 
have the extra disk space for standby replicas, so the acceptable.recovery.lag 
and related bits to the standby replicas are not at play for us. But the 
restore consumers fumbling data w/ EOS seems to be a big problem for us. 

> bug: State stores lose state when tasks are reassigned under EOS wit…
> -
>
> Key: KAFKA-14172
> URL: https://issues.apache.org/jira/browse/KAFKA-14172
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1
>Reporter: Martin Hørslev
>Priority: Major
>
> h1. State stores lose state when tasks are reassigned under EOS with standby 
> replicas and default acceptable lag.
> I have observed that state stores used in a transform step under a Exactly 
> Once semantics ends up losing state after a rebalancing event that includes 
> reassignment of tasks to previous standby task within the acceptable standby 
> lag.
>  
> The problem is reproduceable and an integration test have been created to 
> showcase the [issue|https://github.com/apache/kafka/pull/12540]. 
> A detailed description of the observed issue is provided 
> [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45]
> Similar issues have been observed and reported to StackOverflow for example 
> [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14172) bug: State stores lose state when tasks are reassigned under EOS wit…

2022-09-01 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598956#comment-17598956
 ] 

John Gray commented on KAFKA-14172:
---

My stateful/EOS Kafka apps also seem to be struggling on 3.0.0+, with a similar 
theme: it appears the restore consumers are not consuming all of their messages 
for a full restore before processing begins. This sad situation seems to happen 
consistently after Strimzi rolls out an upgrade to our cluster. Once the 
brokers are all rolled, if our stateful apps rebalance, we lose data. We do not 
have the extra disk space for standby replicas, so the acceptable.recovery.lag 
and related bits to the standby replicas are not at play for us. But the 
restore consumers fumbling data w/ EOS seems to be a big problem for us. 

> bug: State stores lose state when tasks are reassigned under EOS wit…
> -
>
> Key: KAFKA-14172
> URL: https://issues.apache.org/jira/browse/KAFKA-14172
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1
>Reporter: Martin Hørslev
>Priority: Major
>
> h1. State stores lose state when tasks are reassigned under EOS with standby 
> replicas and default acceptable lag.
> I have observed that state stores used in a transform step under a Exactly 
> Once semantics ends up losing state after a rebalancing event that includes 
> reassignment of tasks to previous standby task within the acceptable standby 
> lag.
>  
> The problem is reproduceable and an integration test have been created to 
> showcase the [issue|https://github.com/apache/kafka/pull/12540]. 
> A detailed description of the observed issue is provided 
> [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45]
> Similar issues have been observed and reported to StackOverflow for example 
> [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vvcephei opened a new pull request, #12582: MINOR: Demystify rebalance schedule log

2022-09-01 Thread GitBox


vvcephei opened a new pull request, #12582:
URL: https://github.com/apache/kafka/pull/12582

   People have a hard time understanding what's going on with probing 
rebalances, partly due to cryptic messages like this:
   
   ```
   2022-08-31T12:03:29.197+09:00 . Requested to schedule probing rebalance 
for 1661915609131 ms.
   2022-08-31T12:04:29.578+09:00 . Requested to schedule probing rebalance 
for 1661915669542 ms.
   ```
   
   We should provide a little more context and also encode the timestamp as a 
readable date/time.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12573: KAFKA-10199: Remove changelog unregister from state updater

2022-09-01 Thread GitBox


cadonna merged PR #12573:
URL: https://github.com/apache/kafka/pull/12573


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12573: KAFKA-10199: Remove changelog unregister from state updater

2022-09-01 Thread GitBox


cadonna commented on PR #12573:
URL: https://github.com/apache/kafka/pull/12573#issuecomment-1234212740

   Build failures are not related:
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 17 and Scala 2.13 / 
kafka.api.SaslClientsWithInvalidCredentialsTest.testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure()
   Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsTest.testCommitTransactionTimeout(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2]
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies[at_least_once]
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once]
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.KStreamRepartitionIntegrationTest.shouldGoThroughRebalancingCorrectly[Optimization
 = all]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960474042


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,17 +276,36 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(null);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(final Properties props) {
+// Vicky: Do we need to verify props?
+final List optimizationConfigs;
+if (props == null) {
+optimizationConfigs = new ArrayList<>();
+optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);
+} else {
+final StreamsConfig config = new StreamsConfig(props);
+optimizationConfigs = 
config.getList(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG);
+}
 
 mergeDuplicateSourceNodes();
-if (optimizeTopology) {
-LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
+if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)

Review Comment:
   A user can optimize either by providing the config `all` which is the 
`StreamsConfig.OPTIMIZE` which will apply all optimization rules or by 
specifying a list of optimization rules. With your suggestion, an optimization 
will be applied only if `contains(OPTIMIZE)` is true which is not correct. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960457146


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   Hey @guozhangwang ! 
   Cases 1 and 2 are optimizable and will be optimized by the algorithm. I have 
tests for these cases in `InternalStreamsBuilderTest`.  
   Case 3 is not optimizable and won't be recognized. The reason is that 
processors like `mapValues` or `filter` or `transform` are black-boxes. There 
is no way to know how they change the contents of a stream hence there is no 
way to figure out if the two sides of the join are still the same. 
   Case 4 could be optimizable but I did not consider it. I initially only had 
in-scope cases like 1 and 2. I can add it by adding a special rule to the 
rewriter that would check if the parent of the join is a merge, then it's ok to 
have multiple source nodes as long as they are ancestors of the merge node. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960457146


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   Hey @guozhangwang ! 
   Cases 1 and 2 are optimizable and will be optimized by the algorithm. I have 
tests for this cases in `InternalStreamsBuilderTest`.  
   Case 3 is not optimizable and won't be recognized. The reason is that 
processors like `mapValues` or `filter` or `transform` are black-boxes. There 
is no way to know how they change the contents of a stream hence there is no 
way to figure out if the two sides of the join are still the same. 
   Case 4 could be optimizable but I did not consider it. I initially only had 
in-scope cases like 1 and 2. I can add it by adding a special rule to the 
rewriter that would check if the parent of the join is a merge, then it's ok to 
have multiple source nodes as long as they are ancestors of the merge node. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration

2022-09-01 Thread GitBox


yashmayya commented on code in PR #12478:
URL: https://github.com/apache/kafka/pull/12478#discussion_r960439387


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##
@@ -199,112 +202,156 @@ private RetryWithToleranceOperator setupExecutor() {
 
 @Test
 public void testExecAndHandleRetriableErrorOnce() throws Exception {
-execAndHandleRetriableError(1, 300, new RetriableException("Test"));
+execAndHandleRetriableError(6000, 1, Collections.singletonList(300L), 
new RetriableException("Test"), true);
 }
 
 @Test
 public void testExecAndHandleRetriableErrorThrice() throws Exception {
-execAndHandleRetriableError(3, 2100, new RetriableException("Test"));
+execAndHandleRetriableError(6000, 3, Arrays.asList(300L, 600L, 1200L), 
new RetriableException("Test"), true);
+}
+
+@Test
+public void testExecAndHandleRetriableErrorWithInfiniteRetries() throws 
Exception {
+execAndHandleRetriableError(-1, 8, Arrays.asList(300L, 600L, 1200L, 
2400L, 4800L, 9600L, 19200L, 38400L), new RetriableException("Test"), true);
+}
+
+@Test
+public void testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws 
Exception {
+execAndHandleRetriableError(6000, 10, Arrays.asList(300L, 600L, 1200L, 
2400L, 1500L), new RetriableException("Test"), false);
 }
 
 @Test
 public void testExecAndHandleNonRetriableErrorOnce() throws Exception {
-execAndHandleNonRetriableError(1, 0, new Exception("Non Retriable 
Test"));
+execAndHandleNonRetriableError(1, new Exception("Non Retriable Test"));
 }
 
 @Test
 public void testExecAndHandleNonRetriableErrorThrice() throws Exception {
-execAndHandleNonRetriableError(3, 0, new Exception("Non Retriable 
Test"));
+execAndHandleNonRetriableError(3, new Exception("Non Retriable Test"));
 }
 
-public void execAndHandleRetriableError(int numRetriableExceptionsThrown, 
long expectedWait, Exception e) throws Exception {
+@Test
+public void testExitLatch() throws Exception {
 MockTime time = new MockTime(0, 0, 0);
-RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time);
+CountDownLatch exitLatch = 
EasyMock.createStrictMock(CountDownLatch.class);
+RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new 
ProcessingContext(), exitLatch);
 retryWithToleranceOperator.metrics(errorHandlingMetrics);
+EasyMock.expect(mockOperation.call()).andThrow(new 
RetriableException("test")).anyTimes();
+EasyMock.expect(exitLatch.await(300, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+time.sleep(300);
+return false;
+});
+EasyMock.expect(exitLatch.await(600, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+time.sleep(600);
+return false;
+});
+EasyMock.expect(exitLatch.await(1200, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+time.sleep(1200);
+return false;
+});
+EasyMock.expect(exitLatch.await(2400, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+time.sleep(2400);
+retryWithToleranceOperator.triggerStop();

Review Comment:
   I don't think this is ideal but I couldn't think of a better place to make 
this call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration

2022-09-01 Thread GitBox


yashmayya commented on code in PR #12478:
URL: https://github.com/apache/kafka/pull/12478#discussion_r960436353


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -175,9 +179,8 @@ protected  V execAndRetry(Operation operation) throws 
Exception {
 log.trace("Caught a retriable exception while executing {} 
operation with {}", context.stage(), context.executingClass());
 errorHandlingMetrics.recordFailure();
 if (checkRetry(startTime)) {

Review Comment:
   Fair enough, I do think that the relevant logic is already tested by the 
`testExecAndHandle...` tests (I've added an additional one to check the max 
retries exceeded case)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration

2022-09-01 Thread GitBox


yashmayya commented on code in PR #12478:
URL: https://github.com/apache/kafka/pull/12478#discussion_r960435913


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -269,7 +289,7 @@ void backoff(int attempt, long deadline) {
 delay = deadline - time.milliseconds();
 }
 log.debug("Sleeping for {} millis", delay);
-time.sleep(delay);
+return !exitLatch.await(delay, TimeUnit.MILLISECONDS);

Review Comment:
   Ah, that's a very good point! I didn't account for this because I had 
(incorrectly) assumed that if a thread's interrupt status is set and a 
`CountDownLatch` has already been counted down to 0, a call to 
`CountDownLatch::await` would return immediately with `true` (i.e. the latch's 
count would be given "priority" over the thread's interrupt status). However, 
this isn't actually the case and the call to `CountDownLatch::await` will 
instead immediately throw an `InterruptedException` 😄



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jokeryin432 opened a new pull request, #12581: corporate.xml

2022-09-01 Thread GitBox


jokeryin432 opened a new pull request, #12581:
URL: https://github.com/apache/kafka/pull/12581

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #12574: KAFKA-13908 Rethrow ExecutionException to preserve original cause

2022-09-01 Thread GitBox


nizhikov commented on PR #12574:
URL: https://github.com/apache/kafka/pull/12574#issuecomment-1233908096

   Hello @showuon.
   
   Can you, please, take a look at my changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers

2022-09-01 Thread Sergey Ivanov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598790#comment-17598790
 ] 

Sergey Ivanov commented on KAFKA-13077:
---

[~ivanyu], ohh, we continued investigation and found out that root cause much 
easier: there was some issue with ZooKeeper day ago and all ZooKeeper data was 
cleaned (by mistake), including Kafka topics configurations. Of course it made 
Kafka crazy.

We found it with executing ZK command
{code:java}
stat /broker/topics/__consumer_offsets{code}
where we saw it was created day ago (while Kafka cluster is running for a few 
months), after further investigation found issue with ZK storage cleaning.

So, in our case root cause is clear. But as a solution we cleaned up Kafka data 
too, for our cases it wasn't critical.

> Replication failing after unclean shutdown of ZK and all brokers
> 
>
> Key: KAFKA-13077
> URL: https://issues.apache.org/jira/browse/KAFKA-13077
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Christopher Auston
>Priority: Minor
>
> I am submitting this in the spirit of what can go wrong when an operator 
> violates the constraints Kafka depends on. I don't know if Kafka could or 
> should handle this more gracefully. I decided to file this issue because it 
> was easy to get the problem I'm reporting with Kubernetes StatefulSets (STS). 
> By "easy" I mean that I did not go out of my way to corrupt anything, I just 
> was not careful when restarting ZK and brokers.
> I violated the constraints of keeping Zookeeper stable and at least one 
> running in-sync replica. 
> I am running the bitnami/kafka helm chart on Amazon EKS.
> {quote}% kubectl get po kaf-kafka-0 -ojson |jq .spec.containers'[].image'
> "docker.io/bitnami/kafka:2.8.0-debian-10-r43"
> {quote}
> I started with 3 ZK instances and 3 brokers (both STS). I changed the 
> cpu/memory requests on both STS and kubernetes proceeded to restart ZK and 
> kafka instances at the same time. If I recall correctly there were some 
> crashes and several restarts but eventually all the instances were running 
> again. It's possible all ZK nodes and all brokers were unavailable at various 
> points.
> The problem I noticed was that two of the brokers were just continually 
> spitting out messages like:
> {quote}% kubectl logs kaf-kafka-0 --tail 10
> [2021-07-13 14:26:08,871] INFO [ProducerStateManager 
> partition=__transaction_state-0] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-0/0001.snapshot,1)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,871] WARN [Log partition=__transaction_state-0, 
> dir=/bitnami/kafka/data] *Non-monotonic update of high watermark from 
> (offset=2744 segment=[0:1048644]) to (offset=1 segment=[0:169])* 
> (kafka.log.Log)
> [2021-07-13 14:26:08,874] INFO [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Truncating to offset 2 (kafka.log.Log)
> [2021-07-13 14:26:08,877] INFO [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Loading producer state till offset 2 with message 
> format version 2 (kafka.log.Log)
> [2021-07-13 14:26:08,877] INFO [ProducerStateManager 
> partition=__transaction_state-10] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-10/0002.snapshot,2)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,877] WARN [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Non-monotonic update of high watermark from 
> (offset=2930 segment=[0:1048717]) to (offset=2 segment=[0:338]) 
> (kafka.log.Log)
> [2021-07-13 14:26:08,880] INFO [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Truncating to offset 1 (kafka.log.Log)
> [2021-07-13 14:26:08,882] INFO [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Loading producer state till offset 1 with message 
> format version 2 (kafka.log.Log)
> [2021-07-13 14:26:08,882] INFO [ProducerStateManager 
> partition=__transaction_state-20] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-20/0001.snapshot,1)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,883] WARN [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Non-monotonic update of high watermark from 
> (offset=2956 segment=[0:1048608]) to (offset=1 segment=[0:169]) 
> (kafka.log.Log)
> {quote}
> If I describe that topic I can see that several partitions have a leader of 2 
> and the ISR is just 2 (NOTE I added two more brokers and tried to reassign 
> the topic onto brokers 2,3,4 which you can see below). The new brokers also 
> spit out the messages about "non-monotonic update