cadonna commented on code in PR #14193: URL: https://github.com/apache/kafka/pull/14193#discussion_r1345894518
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -387,7 +391,9 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, threadId, processId, log, - stateUpdaterEnabled); + stateUpdaterEnabled, + proceessingThreadsEnabled + ); Review Comment: nit: ```suggestion ); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -452,6 +468,31 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId)); } + private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled, + final boolean stateUpdaterEnabled, + final TopologyMetadata topologyMetadata, + final Time time, + final String threadId, + final Tasks tasks) { + if (processingThreadsEnabled) { + if (!stateUpdaterEnabled) { + throw new IllegalStateException("Processing threads require the state updater to be enabled"); + } + + final DefaultTaskManager defaultTaskManager = new DefaultTaskManager( + time, + threadId, Review Comment: This specifies the names of the executor threads. Is it intended that the executors will have names like `<clientId>-StreamThread-<stream thread index>-TaskExecutor-<executor thread index>`? Can we have a shorter name like <clientId>--TaskExecutor-<executor thread index>`? Or do we -- at the moment -- have the issue that we can have multiple executor threads per stream thread and multiple stream threads? So polling threads and processing threads cannot be changed independently. ########## clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java: ########## @@ -361,7 +369,7 @@ private long nextOffset(TopicPartition tp) { } public synchronized void flush() { - verifyProducerState(); + verifyNotClosed(); Review Comment: Why do you only verify that the producer is not closed and not also that the producer is not fenced like in the other cases? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Set; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +class SynchronizedPartitionGroup extends AbstractPartitionGroup { + + private AbstractPartitionGroup wrapped; Review Comment: Shouldn't this be `final`? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Set; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +abstract class AbstractPartitionGroup { Review Comment: Why is this not an interface instead of an abstract class? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1707,6 +1766,44 @@ void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) { } } + private void maybeLockTasks(final Set<TaskId> ids) { + if (schedulingTaskManager != null && !ids.isEmpty()) { + // Some tasks may be owned by the state updater and do not have to be locked in order to be committed. Review Comment: I do not understand this comment. Tasks owned by the state updater should not need to be committed. The state updater owns restoring active tasks and running standby tasks. None of them has offsets to commit. They do not participate in any transaction. And they are checkpointed in the state updater. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Set; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +class SynchronizedPartitionGroup extends AbstractPartitionGroup { + + private AbstractPartitionGroup wrapped; + + public SynchronizedPartitionGroup(final AbstractPartitionGroup wrapped) { + this.wrapped = wrapped; + } + + @Override + synchronized boolean readyToProcess(final long wallClockTime) { Review Comment: Did you put everywhere a `synchronized` to get started? I suppose not all of them are strictly needed, right? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1707,6 +1766,44 @@ void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) { } } + private void maybeLockTasks(final Set<TaskId> ids) { + if (schedulingTaskManager != null && !ids.isEmpty()) { + // Some tasks may be owned by the state updater and do not have to be locked in order to be committed. + if (log.isDebugEnabled()) { + log.debug("Locking tasks {}", ids.stream().map(TaskId::toString).collect(Collectors.joining(", "))); + } + boolean locked = false; + while (!locked) { + try { + schedulingTaskManager.lockTasks(ids).get(); + locked = true; + } catch (final InterruptedException e) { + log.warn("Interrupted while waiting for tasks {} to be locked", + ids.stream().map(TaskId::toString).collect(Collectors.joining(","))); + } catch (final ExecutionException e) { + log.info("Failed to lock tasks"); + throw new RuntimeException(e); + } + } + } + } + + private void maybeUnlockTasks(final Set<TaskId> ids) { + if (schedulingTaskManager != null && !ids.isEmpty()) { + // Some tasks may be owned by the state updater and do not have to be locked in order to be committed. Review Comment: See my comment above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org