cadonna commented on code in PR #14193:
URL: https://github.com/apache/kafka/pull/14193#discussion_r1345894518


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -387,7 +391,9 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
             threadId,
             processId,
             log,
-            stateUpdaterEnabled);
+            stateUpdaterEnabled,
+            proceessingThreadsEnabled
+            );

Review Comment:
   nit:
   ```suggestion
           );
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -452,6 +468,31 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         return 
streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
     }
 
+    private static DefaultTaskManager maybeCreateSchedulingTaskManager(final 
boolean processingThreadsEnabled,
+                                                                       final 
boolean stateUpdaterEnabled,
+                                                                       final 
TopologyMetadata topologyMetadata,
+                                                                       final 
Time time,
+                                                                       final 
String threadId,
+                                                                       final 
Tasks tasks) {
+        if (processingThreadsEnabled) {
+            if (!stateUpdaterEnabled) {
+                throw new IllegalStateException("Processing threads require 
the state updater to be enabled");
+            }
+
+            final DefaultTaskManager defaultTaskManager = new 
DefaultTaskManager(
+                time,
+                threadId,

Review Comment:
   This specifies the names of the executor threads. Is it intended that the 
executors will have names like `<clientId>-StreamThread-<stream thread 
index>-TaskExecutor-<executor thread index>`?
   Can we have a shorter name like  <clientId>--TaskExecutor-<executor thread 
index>`? Or do we -- at the moment -- have the issue that we can have multiple 
executor threads per stream thread and multiple stream threads? So polling 
threads and processing threads cannot be changed independently.



##########
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java:
##########
@@ -361,7 +369,7 @@ private long nextOffset(TopicPartition tp) {
     }
 
     public synchronized void flush() {
-        verifyProducerState();
+        verifyNotClosed();

Review Comment:
   Why do you only verify that the producer is not closed and not also that the 
producer is not fenced like in the other cases?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+class SynchronizedPartitionGroup extends AbstractPartitionGroup {
+
+    private AbstractPartitionGroup wrapped;

Review Comment:
   Shouldn't this be `final`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+abstract class AbstractPartitionGroup {

Review Comment:
   Why is this not an interface instead of an abstract class?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1707,6 +1766,44 @@ void addRecordsToTasks(final ConsumerRecords<byte[], 
byte[]> records) {
         }
     }
 
+    private void maybeLockTasks(final Set<TaskId> ids) {
+        if (schedulingTaskManager != null && !ids.isEmpty()) {
+            // Some tasks may be owned by the state updater and do not have to 
be locked in order to be committed.

Review Comment:
   I do not understand this comment. Tasks owned by the state updater should 
not need to be committed. The state updater owns restoring active tasks and 
running standby tasks. None of them has offsets to commit. They do not 
participate in any transaction. And they are checkpointed in the state updater.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+class SynchronizedPartitionGroup extends AbstractPartitionGroup {
+
+    private AbstractPartitionGroup wrapped;
+
+    public SynchronizedPartitionGroup(final AbstractPartitionGroup wrapped) {
+        this.wrapped = wrapped;
+    }
+
+    @Override
+    synchronized boolean readyToProcess(final long wallClockTime) {

Review Comment:
   Did you put everywhere a `synchronized` to get started? I suppose not all of 
them are strictly needed, right?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1707,6 +1766,44 @@ void addRecordsToTasks(final ConsumerRecords<byte[], 
byte[]> records) {
         }
     }
 
+    private void maybeLockTasks(final Set<TaskId> ids) {
+        if (schedulingTaskManager != null && !ids.isEmpty()) {
+            // Some tasks may be owned by the state updater and do not have to 
be locked in order to be committed.
+            if (log.isDebugEnabled()) {
+                log.debug("Locking tasks {}", 
ids.stream().map(TaskId::toString).collect(Collectors.joining(", ")));
+            }
+            boolean locked = false;
+            while (!locked) {
+                try {
+                    schedulingTaskManager.lockTasks(ids).get();
+                    locked = true;
+                } catch (final InterruptedException e) {
+                    log.warn("Interrupted while waiting for tasks {} to be 
locked",
+                        
ids.stream().map(TaskId::toString).collect(Collectors.joining(",")));
+                } catch (final ExecutionException e) {
+                    log.info("Failed to lock tasks");
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    private void maybeUnlockTasks(final Set<TaskId> ids) {
+        if (schedulingTaskManager != null && !ids.isEmpty()) {
+            // Some tasks may be owned by the state updater and do not have to 
be locked in order to be committed.

Review Comment:
   See my comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to