[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

2022-08-27 Thread GitBox


guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r956662529


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.
+ */
+private boolean isSelfJoin(final GraphNode streamJoinNode) {
+final AtomicInteger count = new AtomicInteger();
+countSourceNodes(count, streamJoinNode, new HashSet<>());
+if (count.get() > 1) {
+return false;
+}
+if (streamJoinNode.parentNodes().size() > 1) {
+return false;
+}
+for (final GraphNode parent: streamJoinNode.parentNodes()) {
+for (final GraphNode sibling : parent.children()) {
+if (sibling instanceof ProcessorGraphNode) {
+if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+continue;
+}
+}
+if (sibling != streamJoinNode
+&& sibling.buildPriority() < 
streamJoinNode.buildPriority()) {
+return false;
+}
+}
+}
+return true;
+}
+
+private void countSourceNodes(
+final AtomicInteger count,
+final GraphNode currentNode,
+final Set visited) {
+
+if (currentNode instanceof StreamSourceNode) {
+count.incrementAndGet();
+}
+
+for (final GraphNode parent: currentNode.parentNodes()) {
+if (!visited.contains(parent)) {
+visited.add(parent);
+countSourceNodes(count, parent, visited);
+}
+}
+}
+
+private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) {
+return node.processorParameters() != null

Review Comment:
   Why we need the other checks except the last one? Since this is called after 
the whole pass the logical plan should be complete and hence all of the checked 
fields should be null (otherwise it should be a bug)?



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
 public static final String TOPOLOGY_OPTIMIZATION_CONFIG = 
"topology.optimization";
 private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration 
telling Kafka Streams if it should optimize the topology, disabled by default";
 
+public static final String SELF_JOIN_OPTIMIZATION_CONFIG = 
"self.join.optimization";

Review Comment:
   I still have some concerns about the extra configs pattern here, but I will 
leave for open discuss in the KIP thread.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(fi

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12562: KAFKA-10199: Remove tasks from state updater on shutdown

2022-08-27 Thread GitBox


guozhangwang commented on code in PR #12562:
URL: https://github.com/apache/kafka/pull/12562#discussion_r956661497


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -125,6 +125,80 @@ public void shouldShutdownStateUpdaterAndRestart() {
 verify(changelogReader, times(2)).clear();
 }
 
+@Test
+public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws 
Exception {
+stateUpdater.shutdown(Duration.ofMinutes(1));

Review Comment:
   Why we call `shutdown` first? We should have shutdown upon tearing down each 
test.
   
   Ditto in `shouldRemoveUpdatingTasksOnShutdown`.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -100,7 +105,8 @@ public void run() {
 } catch (final RuntimeException anyOtherException) {
 handleRuntimeException(anyOtherException);
 } finally {
-clear();
+removeAddedTasksFromInputQueue();
+removeUpdatingAndPausedTasks();

Review Comment:
   We no longer clear the `restoredActiveTasks`, is that intentional?
   
   Do we assume by the time the thread is shutting down, the stream thread 
would not care about any restored tasks any longer, or are you going to merge 
the restored tasks with removed tasks soon anyways?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -125,6 +125,80 @@ public void shouldShutdownStateUpdaterAndRestart() {
 verify(changelogReader, times(2)).clear();
 }
 
+@Test
+public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws 
Exception {
+stateUpdater.shutdown(Duration.ofMinutes(1));
+final StreamTask statelessTask = 
statelessTask(TASK_0_0).inState(State.RESTORING).build();
+final StreamTask statefulTask = statefulTask(TASK_1_0, 
mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+final StandbyTask standbyTask = standbyTask(TASK_0_2, 
mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+stateUpdater.pause(TASK_0_1);
+stateUpdater.add(statelessTask);
+stateUpdater.add(statefulTask);
+stateUpdater.remove(TASK_1_1);
+stateUpdater.add(standbyTask);
+stateUpdater.resume(TASK_0_1);
+verifyRemovedTasks();
+
+stateUpdater.shutdown(Duration.ofMinutes(1));
+
+verifyRemovedTasks(statelessTask, statefulTask, standbyTask);
+}
+
+@Test
+public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
+stateUpdater.shutdown(Duration.ofMinutes(1));
+stateUpdater = new DefaultStateUpdater(new 
StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, time);
+final StreamTask activeTask = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+final StandbyTask standbyTask = standbyTask(TASK_0_2, 
mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+stateUpdater.start();
+stateUpdater.add(activeTask);
+stateUpdater.add(standbyTask);
+verifyUpdatingTasks(activeTask, standbyTask);
+verifyRemovedTasks();
+
+stateUpdater.shutdown(Duration.ofMinutes(1));
+
+verifyRemovedTasks(activeTask, standbyTask);
+verify(activeTask).maybeCheckpoint(true);
+verify(standbyTask).maybeCheckpoint(true);
+}
+
+@Test
+public void shouldRemovePausedTasksOnShutdown() throws Exception {

Review Comment:
   It's possible that the state updater thread was not started when shutdown 
was called, in that case we do not need to do the latter since it's always 
empty still.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13909) Restart Kafka in KRaft mode with ACLs ends in a RuntimeException

2022-08-27 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13909:
---
Fix Version/s: (was: 3.3.0)

> Restart Kafka in KRaft mode with ACLs ends in a RuntimeException
> 
>
> Key: KAFKA-13909
> URL: https://issues.apache.org/jira/browse/KAFKA-13909
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.2.0
> Environment: Running Kafka in a Docker container
>Reporter: Florian Blumenstein
>Assignee: Luke Chen
>Priority: Major
> Attachments: kafka.log, server.properties
>
>
> Running Kafka in KRaft mode works for the initial startup. When restarting 
> Kafka it ends in a RuntimeException:
> [2022-05-17 08:26:40,959] ERROR [BrokerServer id=1] Fatal error during broker 
> startup. Prepare to shutdown (kafka.server.BrokerServer)
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: An ACL 
> with ID toAvM0TbTfWRmS1kjknRaA already exists.
>         at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown 
> Source)
>         at java.base/java.util.concurrent.CompletableFuture.get(Unknown 
> Source)
>         at kafka.server.BrokerServer.startup(BrokerServer.scala:426)
>         at 
> kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:114)
>         at 
> kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:114)
>         at scala.Option.foreach(Option.scala:437)
>         at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:114)
>         at kafka.Kafka$.main(Kafka.scala:109)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.RuntimeException: An ACL with ID toAvM0TbTfWRmS1kjknRaA 
> already exists.
>         at 
> org.apache.kafka.metadata.authorizer.StandardAuthorizerData.addAcl(StandardAuthorizerData.java:169)
>         at 
> org.apache.kafka.metadata.authorizer.StandardAuthorizer.addAcl(StandardAuthorizer.java:83)
>         at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$19(BrokerMetadataPublisher.scala:234)
>         at java.base/java.util.LinkedHashMap$LinkedEntrySet.forEach(Unknown 
> Source)
>         at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$18(BrokerMetadataPublisher.scala:232)
>         at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$18$adapted(BrokerMetadataPublisher.scala:221)
>         at scala.Option.foreach(Option.scala:437)
>         at 
> kafka.server.metadata.BrokerMetadataPublisher.publish(BrokerMetadataPublisher.scala:221)
>         at 
> kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:258)
>         at 
> kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241)
>         at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>         at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>         at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>         at java.base/java.lang.Thread.run(Unknown Source)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13937) StandardAuthorizer throws "ID 5t1jQ3zWSfeVLMYkN3uong not found in aclsById" exceptions into broker logs

2022-08-27 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13937:
---
Fix Version/s: (was: 3.3.0)

> StandardAuthorizer throws "ID 5t1jQ3zWSfeVLMYkN3uong not found in aclsById" 
> exceptions into broker logs
> ---
>
> Key: KAFKA-13937
> URL: https://issues.apache.org/jira/browse/KAFKA-13937
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.2.0
>Reporter: Jakub Scholz
>Assignee: Luke Chen
>Priority: Major
>
> I'm trying to use the new {{StandardAuthorizer}} in a Kafka cluster running 
> in KRaft mode. When managing the ACLs using the Admin API, the authorizer 
> seems to throw a lot of runtime exceptions in the log. For example ...
> When creating an ACL rule, it seems to create it just fine. But it throws the 
> following exception:
> {code:java}
> 2022-05-25 11:09:18,074 ERROR [StandardAuthorizer 0] addAcl error 
> (org.apache.kafka.metadata.authorizer.StandardAuthorizerData) [EventHandler]
> java.lang.RuntimeException: An ACL with ID 5t1jQ3zWSfeVLMYkN3uong already 
> exists.
>     at 
> org.apache.kafka.metadata.authorizer.StandardAuthorizerData.addAcl(StandardAuthorizerData.java:169)
>     at 
> org.apache.kafka.metadata.authorizer.StandardAuthorizer.addAcl(StandardAuthorizer.java:83)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$19(BrokerMetadataPublisher.scala:234)
>     at 
> java.base/java.util.LinkedHashMap$LinkedEntrySet.forEach(LinkedHashMap.java:671)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$18(BrokerMetadataPublisher.scala:232)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$18$adapted(BrokerMetadataPublisher.scala:221)
>     at scala.Option.foreach(Option.scala:437)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.publish(BrokerMetadataPublisher.scala:221)
>     at 
> kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:258)
>     at 
> kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2(BrokerMetadataListener.scala:119)
>     at 
> kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2$adapted(BrokerMetadataListener.scala:119)
>     at scala.Option.foreach(Option.scala:437)
>     at 
> kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.run(BrokerMetadataListener.scala:119)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> 2022-05-25 11:09:18,076 ERROR [BrokerMetadataPublisher id=0] Error publishing 
> broker metadata at OffsetAndEpoch(offset=3, epoch=1) 
> (kafka.server.metadata.BrokerMetadataPublisher) [EventHandler]
> java.lang.RuntimeException: An ACL with ID 5t1jQ3zWSfeVLMYkN3uong already 
> exists.
>     at 
> org.apache.kafka.metadata.authorizer.StandardAuthorizerData.addAcl(StandardAuthorizerData.java:169)
>     at 
> org.apache.kafka.metadata.authorizer.StandardAuthorizer.addAcl(StandardAuthorizer.java:83)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$19(BrokerMetadataPublisher.scala:234)
>     at 
> java.base/java.util.LinkedHashMap$LinkedEntrySet.forEach(LinkedHashMap.java:671)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$18(BrokerMetadataPublisher.scala:232)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$18$adapted(BrokerMetadataPublisher.scala:221)
>     at scala.Option.foreach(Option.scala:437)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.publish(BrokerMetadataPublisher.scala:221)
>     at 
> kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:258)
>     at 
> kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2(BrokerMetadataListener.scala:119)
>     at 
> kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2$adapted(BrokerMetadataListener.scala:119)
>     at scala.Option.foreach(Option.scala:437)
>     at 
> kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.run(BrokerMetadataListener.scala:119)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(Kafk

[jira] [Updated] (KAFKA-13897) Add 3.1.1 to system tests and streams upgrade tests

2022-08-27 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13897:
---
Fix Version/s: (was: 3.3.0)
   (was: 3.1.2)

> Add 3.1.1 to system tests and streams upgrade tests
> ---
>
> Key: KAFKA-13897
> URL: https://issues.apache.org/jira/browse/KAFKA-13897
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Tom Bentley
>Priority: Blocker
>
> Per the penultimate bullet on the [release 
> checklist|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses],
>  Kafka v3.1.1 is released. We should add this version to the system tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14142) Improve information returned about the cluster metadata partition

2022-08-27 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-14142:
---
Fix Version/s: (was: 3.3.0)

> Improve information returned about the cluster metadata partition
> -
>
> Key: KAFKA-14142
> URL: https://issues.apache.org/jira/browse/KAFKA-14142
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jason Gustafson
>Priority: Blocker
>
> The Apacke Kafka operator needs to know when it is safe to format and start a 
> KRaft Controller that had a disk failure of the metadata log dir.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch

2022-08-27 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-14050:
---
Fix Version/s: (was: 3.3.0)

> Older clients cannot deserialize ApiVersions response with finalized feature 
> epoch
> --
>
> Key: KAFKA-14050
> URL: https://issues.apache.org/jira/browse/KAFKA-14050
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
>
> When testing kraft locally, we encountered this exception from an older 
> client:
> {code:java}
> [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | 
> adminclient-1394] org.apache.kafka.common.utils.KafkaThread 
> lambda$configureThread$0 - Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1394':
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'api_keys': Error reading array of size 1207959552, only 579 bytes available
> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
> at 
> org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
> at 
> org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
> at 
> org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
> at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
> at java.base/java.lang.Thread.run(Thread.java:832) {code}
> The cause appears to be from a change to the type of the 
> `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
> int64: 
> [https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.]
> Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this 
> by creating a new field. We will have to leave the existing tag in the 
> protocol spec and consider it dead.
> Credit for this find goes to [~dajac] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14183) Kraft bootstrap metadata file should use snapshot header/footer

2022-08-27 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-14183.

Resolution: Fixed

> Kraft bootstrap metadata file should use snapshot header/footer
> ---
>
> Key: KAFKA-14183
> URL: https://issues.apache.org/jira/browse/KAFKA-14183
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
> Fix For: 3.3.0
>
>
> The bootstrap checkpoint file that we use in kraft is intended to follow the 
> usual snapshot format, but currently it does not include the header/footer 
> control records. The main purpose of these at the moment is to set a version 
> for the checkpoint file itself.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12622) Automate LICENSE file validation

2022-08-27 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-12622:
---
Fix Version/s: 3.4.0
   (was: 3.3.0)

> Automate LICENSE file validation
> 
>
> Key: KAFKA-12622
> URL: https://issues.apache.org/jira/browse/KAFKA-12622
> Project: Kafka
>  Issue Type: Task
>Reporter: John Roesler
>Priority: Major
> Fix For: 3.4.0
>
>
> In https://issues.apache.org/jira/browse/KAFKA-12602, we manually constructed 
> a correct license file for 2.8.0. This file will certainly become wrong again 
> in later releases, so we need to write some kind of script to automate a 
> check.
> It crossed my mind to automate the generation of the file, but it seems to be 
> an intractable problem, considering that each dependency may change licenses, 
> may package license files, link to them from their poms, link to them from 
> their repos, etc. I've also found multiple URLs listed with various 
> delimiters, broken links that I have to chase down, etc.
> Therefore, it seems like the solution to aim for is simply: list all the jars 
> that we package, and print out a report of each jar that's extra or missing 
> vs. the ones in our `LICENSE-binary` file.
> The check should be part of the release script at least, if not part of the 
> regular build (so we keep it up to date as dependencies change).
>  
> Here's how I do this manually right now:
> {code:java}
> // build the binary artifacts
> $ ./gradlewAll releaseTarGz
> // unpack the binary artifact 
> $ tar xf core/build/distributions/kafka_2.13-X.Y.Z.tgz
> $ cd xf kafka_2.13-X.Y.Z
> // list the packaged jars 
> // (you can ignore the jars for our own modules, like kafka, kafka-clients, 
> etc.)
> $ ls libs/
> // cross check the jars with the packaged LICENSE
> // make sure all dependencies are listed with the right versions
> $ cat LICENSE
> // also double check all the mentioned license files are present
> $ ls licenses {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio merged pull request #12565: KAFKA-14183; Cluster metadata bootstrap file should use header/footer

2022-08-27 Thread GitBox


jsancio merged PR #12565:
URL: https://github.com/apache/kafka/pull/12565


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12565: KAFKA-14183; Cluster metadata bootstrap file should use header/footer

2022-08-27 Thread GitBox


jsancio commented on PR #12565:
URL: https://github.com/apache/kafka/pull/12565#issuecomment-1229357606

   Unrelated test failure: 
   
   >Build / JDK 11 and Scala 2.13 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12565: KAFKA-14183; Cluster metadata bootstrap file should use header/footer

2022-08-27 Thread GitBox


jsancio commented on PR #12565:
URL: https://github.com/apache/kafka/pull/12565#issuecomment-1229357539

   > @jsancio thanks for the patch! A few minor non-blocking comments inline. I 
filed a follow-up to add unit tests for BatchFileWrite since I realize I never 
included any (not needed for this PR). 
https://issues.apache.org/jira/browse/KAFKA-14186
   
   @mumrah, Yes, we could use more tests for this type. I added 
BatchFileWriterReaderTest in this commit to test basic header and footer 
support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-08-27 Thread GitBox


ashmeet13 commented on PR #12414:
URL: https://github.com/apache/kafka/pull/12414#issuecomment-1229214713

   Hi @jsancio, bumping this PR up in case it got missed. Could please have a 
look at this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12565: KAFKA-14183; Cluster metadata bootstrap file should use header/footer

2022-08-27 Thread GitBox


mumrah commented on code in PR #12565:
URL: https://github.com/apache/kafka/pull/12565#discussion_r956585846


##
metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java:
##
@@ -36,16 +39,24 @@
 
 
 /**
- * Write an arbitrary set of metadata records into a Kafka metadata log batch 
format. This is similar to the binary
- * format used for metadata snapshot files, but the log epoch and initial 
offset are set to zero.
+ * Write an arbitrary set of metadata records into a Kafka metadata log batch 
format.
+ *
+ * This is similar to the binary format used for metadata snapshot files, but 
the log epoch
+ * and initial offset are set to zero.

Review Comment:
   We should document that this class adds the two control records automatically



##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -260,18 +260,18 @@ private void appendControlMessage(Function valueCreat
 /**
  * Append a {@link LeaderChangeMessage} record to the batch
  *
- * @param @LeaderChangeMessage The message to append
- * @param @currentTimeMs The timestamp of message generation
+ * @param LeaderChangeMessage The message to append
+ * @param currentTimestamp The current time

Review Comment:
   nit "current time in millis" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14186) Add unit tests for BatchFileWriter

2022-08-27 Thread David Arthur (Jira)
David Arthur created KAFKA-14186:


 Summary: Add unit tests for BatchFileWriter
 Key: KAFKA-14186
 URL: https://issues.apache.org/jira/browse/KAFKA-14186
 Project: Kafka
  Issue Type: Test
Reporter: David Arthur
 Fix For: 3.4.0


We have integration tests that cover this class, but no direct unit tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14015) ConfigProvider with ttl fails to restart tasks

2022-08-27 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585390#comment-17585390
 ] 

Yash Mayya edited comment on KAFKA-14015 at 8/27/22 8:51 AM:
-

In essence, this should work as expected when Connect is being run in 
distributed mode. In standalone mode, it looks like there is a similar flow 
[here|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L392-L407]
 (i.e. get latest connector configs -> ask connector for task configs -> check 
if there's a change in the task configs -> if so, stop old set of tasks and 
start new set of tasks). However, that seems to only be called if i) configs 
are updated directly via PUT /config endpoint; ii) connector explicitly 
requests task reconfiguration via its context; iii) connector is resumed via 
the REST API. So we might need to make some changes to ensure that task 
reconfiguration is requested when 
[restartConnector|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L287]
 is called in the StandaloneHerder (might be as simple as adding a call to 
[updateConnectorTasks|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L392-L407]
 in startConnector)


was (Author: yash.mayya):
In essence, this should work as expected when Connect is being run in 
distributed mode. In standalone mode, it looks like there is a similar flow 
[here|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L392-L407]
 (i.e. get latest connector configs -> ask connector for task configs -> check 
if there's a change in the task configs -> if so, stop old set of tasks and 
start new set of tasks). However, that seems to only be called if i) configs 
are updated directly via PUT /config endpoint; ii) connector explicitly 
requests task reconfiguration via its context; iii) connector is resumed via 
the REST API. So we might need to make some changes to ensure that task 
reconfiguration is requested when 
[restartConnector|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L287]
 is called in the StandaloneHerder (might be as simple as adding a call to 
[updateConnectorTasks|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L392-L407]
 in restartConnector)

> ConfigProvider with ttl fails to restart tasks
> --
>
> Key: KAFKA-14015
> URL: https://issues.apache.org/jira/browse/KAFKA-14015
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ross Lawley
>Assignee: Sagar Rao
>Priority: Major
>
> According to the 
> [KIP-297|https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations#KIP297:ExternalizingSecretsforConnectConfigurations-SecretRotation]:
> {quote} * When the Herder receives the onChange() call, it will check a new 
> connector configuration property config.reload.action which can be one of the 
> following:
>  ** The value restart, which means to schedule a restart of the Connector and 
> all its Tasks. This will be the default.
>  ** The value none, which means to do nothing.{quote}
> However, the 
> [restartConnector|https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L287-L294]
>  method only restarts the connector and does not restart any tasks.  Suggest 
> calling {{restartConnectorAndTasks}} instead.
> The result is changed configurations provided by the ConfigProvider are not 
> picked up and existing tasks continue to use outdated configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14015) ConfigProvider with ttl fails to restart tasks

2022-08-27 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585390#comment-17585390
 ] 

Yash Mayya edited comment on KAFKA-14015 at 8/27/22 8:50 AM:
-

In essence, this should work as expected when Connect is being run in 
distributed mode. In standalone mode, it looks like there is a similar flow 
[here|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L392-L407]
 (i.e. get latest connector configs -> ask connector for task configs -> check 
if there's a change in the task configs -> if so, stop old set of tasks and 
start new set of tasks). However, that seems to only be called if i) configs 
are updated directly via PUT /config endpoint; ii) connector explicitly 
requests task reconfiguration via its context; iii) connector is resumed via 
the REST API. So we might need to make some changes to ensure that task 
reconfiguration is requested when 
[restartConnector|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L287]
 is called in the StandaloneHerder (might be as simple as adding a call to 
[updateConnectorTasks|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L392-L407]
 in restartConnector)


was (Author: yash.mayya):
In essence, this should work as expected when Connect is being run in 
distributed mode. In standalone mode, it looks like there is a similar flow 
[here|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L392-L407]
 (i.e. get latest connector configs -> ask connector for task configs -> check 
if there's a change in the task configs -> if so, stop old set of tasks and 
start new set of tasks). However, that seems to only be called if i) configs 
are updated directly via PUT /config endpoint; ii) connector explicitly 
requests task reconfiguration via its context; iii) connector is resumed via 
the REST API. So we might need to make some changes to ensure that task 
reconfiguration is requested when 
[restartConnector|https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L287]
 is called in the StandaloneHerder

> ConfigProvider with ttl fails to restart tasks
> --
>
> Key: KAFKA-14015
> URL: https://issues.apache.org/jira/browse/KAFKA-14015
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ross Lawley
>Assignee: Sagar Rao
>Priority: Major
>
> According to the 
> [KIP-297|https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations#KIP297:ExternalizingSecretsforConnectConfigurations-SecretRotation]:
> {quote} * When the Herder receives the onChange() call, it will check a new 
> connector configuration property config.reload.action which can be one of the 
> following:
>  ** The value restart, which means to schedule a restart of the Connector and 
> all its Tasks. This will be the default.
>  ** The value none, which means to do nothing.{quote}
> However, the 
> [restartConnector|https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L287-L294]
>  method only restarts the connector and does not restart any tasks.  Suggest 
> calling {{restartConnectorAndTasks}} instead.
> The result is changed configurations provided by the ConfigProvider are not 
> picked up and existing tasks continue to use outdated configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #12229: MINOR: Include the inner exception stack trace when re-throwing an exception

2022-08-27 Thread GitBox


showuon commented on code in PR #12229:
URL: https://github.com/apache/kafka/pull/12229#discussion_r956553207


##
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java:
##
@@ -326,7 +326,7 @@ static String formatRequestBody(String scope) throws 
IOException {
 return requestParameters.toString();
 } catch (UnsupportedEncodingException e) {
 // The world has gone crazy!
-throw new IOException(String.format("Encoding %s not supported", 
StandardCharsets.UTF_8.name()));
+throw new IOException(String.format("Encoding %s not supported", 
StandardCharsets.UTF_8.name()), e);

Review Comment:
   Thanks for the explanation @divijvaidya , yes, I checked and confirmed it 
only output the charset name. So it's safe. @ijuma , any concern/comments here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12230: MINOR: Catch InvocationTargetException explicitly and propagate underlying cause

2022-08-27 Thread GitBox


showuon commented on code in PR #12230:
URL: https://github.com/apache/kafka/pull/12230#discussion_r956552230


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -463,8 +463,7 @@ public static  T newParameterizedInstance(String 
className, Object... params)
 throw new ClassNotFoundException(String.format("Unable to access " 
+
 "constructor of %s", className), e);
 } catch (InvocationTargetException e) {
-throw new ClassNotFoundException(String.format("Unable to invoke " 
+
-"constructor of %s", className), e);
+throw new KafkaException(String.format("The constructor of %s 
threw an exception", className), e.getCause());

Review Comment:
   @ijuma , thanks for the explanation. Make sense to me. 
   @divijvaidya I think it's better we revert it. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #12564: MINOR: Cleanups from KAFKA-14097

2022-08-27 Thread GitBox


showuon merged PR #12564:
URL: https://github.com/apache/kafka/pull/12564


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12564: MINOR: Cleanups from KAFKA-14097

2022-08-27 Thread GitBox


showuon commented on PR #12564:
URL: https://github.com/apache/kafka/pull/12564#issuecomment-1229143307

   Failed tests are unrelated.
   ```
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
   Build / JDK 11 and Scala 2.13 / 
kafka.controller.ControllerIntegrationTest.testAlterPartitionErrorsAfterUncleanElection()
   Build / JDK 11 and Scala 2.13 / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org