Repository: kafka Updated Branches: refs/heads/0.10.1 53474ae6c -> d9ea35e6d
KAFKA-4253: Fix Kafka Stream thread shutting down process ordering Changed the ordering in `StreamThread.shutdown` 1. commitAll (we need to commit so that any cached data is flushed through the topology) 2. close all tasks 3. producer.flush() - so any records produced during close are flushed and we have offsets for them 4. close all state managers 5. close producers/consumers 6. remove the tasks Also in `onPartitionsRevoked` 1. commitAll 2. close all tasks 3. producer.flush 4. close all state managers Author: Damian Guy <[email protected]> Reviewers: Eno Thereska, Guozhang Wang Closes #1970 from dguy/kafka-4253 (cherry picked from commit c9da62794547ee75a7e2ade1aa8bbf4a1443c49c) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9ea35e6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9ea35e6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9ea35e6 Branch: refs/heads/0.10.1 Commit: d9ea35e6d34933efb4f16eaca8551fa834d8b292 Parents: 53474ae Author: Damian Guy <[email protected]> Authored: Thu Oct 6 09:43:39 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Oct 6 09:43:48 2016 -0700 ---------------------------------------------------------------------- .../processor/internals/AbstractTask.java | 14 ++- .../internals/ProcessorStateManager.java | 5 +- .../processor/internals/StandbyTask.java | 10 ++ .../streams/processor/internals/StreamTask.java | 13 ++- .../processor/internals/StreamThread.java | 112 ++++++++++++++----- .../streams/state/internals/RocksDBStore.java | 1 - .../processor/internals/AbstractTaskTest.java | 10 ++ .../internals/ProcessorStateManagerTest.java | 6 +- .../processor/internals/StandbyTaskTest.java | 4 +- .../processor/internals/StreamThreadTest.java | 6 + .../state/internals/StateStoreTestUtils.java | 31 +---- .../apache/kafka/test/NoOpRecordCollector.java | 48 ++++++++ 12 files changed, 189 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index fe0d99c..7bda3f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -108,10 +108,15 @@ public abstract class AbstractTask { public abstract void commit(); + + public abstract void close(); + + public abstract void commitOffsets(); + /** * @throws ProcessorStateException if there is an error while closing the state manager */ - public void close() { + void closeStateManager() { try { stateMgr.close(recordCollectorOffsets()); } catch (IOException e) { @@ -168,4 +173,11 @@ public abstract class AbstractTask { sb.append("\n"); return sb.toString(); } + + /** + * Flush all state stores owned by this task + */ + public void flushState() { + stateMgr.flush((InternalProcessorContext) this.context()); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 2e1e4da..9d2e63f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -326,18 +326,17 @@ public class ProcessorStateManager { } /** - * @throws IOException if any error happens when flushing or closing the state stores + * @throws IOException if any error happens when closing the state stores */ public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException { try { - // attempting to flush and close the stores, just in case they + // attempting to close the stores, just in case they // are not closed by a ProcessorNode yet if (!stores.isEmpty()) { log.debug("task [{}] Closing stores.", taskId); for (Map.Entry<String, StateStore> entry : stores.entrySet()) { log.debug("task [{}} Closing storage engine {}", taskId, entry.getKey()); try { - entry.getValue().flush(); entry.getValue().close(); } catch (Exception e) { throw new ProcessorStateException(String.format("task [%s] Failed to close state store %s", taskId, entry.getKey()), e); http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 40c4d9c..e57b44a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -98,6 +98,16 @@ public class StandbyTask extends AbstractTask { initializeOffsetLimits(); } + @Override + public void close() { + //no-op + } + + @Override + public void commitOffsets() { + // no-op + } + /** * Produces a string representation contain useful information about a StreamTask. * This is useful in debugging scenarios. http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 14daf56..061cfeb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -269,6 +269,14 @@ public class StreamTask extends AbstractTask implements Punctuator { recordCollector.flush(); // 3) commit consumed offsets if it is dirty already + commitOffsets(); + } + + /** + * commit consumed offsets if needed + */ + @Override + public void commitOffsets() { if (commitOffsetNeeded) { Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { @@ -333,10 +341,9 @@ public class StreamTask extends AbstractTask implements Punctuator { } } - super.close(); - - if (exception != null) + if (exception != null) { throw exception; + } } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f1913b8..0667865 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -132,8 +132,8 @@ public class StreamThread extends Thread { public void onPartitionsRevoked(Collection<TopicPartition> assignment) { try { initialized.set(false); - commitAll(); lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned + shutdownTasksAndState(true); } catch (Throwable t) { rebalanceException = t; throw t; @@ -259,21 +259,10 @@ public class StreamThread extends Thread { private void shutdown() { log.info("stream-thread [{}] Shutting down", this.getName()); + shutdownTasksAndState(false); - // Exceptions should not prevent this call from going through all shutdown steps - try { - commitAll(); - } catch (Throwable e) { - // already logged in commitAll() - } - // Close standby tasks before closing the restore consumer since closing standby tasks uses the restore consumer. - removeStandbyTasks(); - - // We need to first close the underlying clients before closing the state - // manager, for example we need to make sure producer's record sends - // have all been acked before the state manager records - // changelog sent offsets + // close all embedded clients try { producer.close(); } catch (Throwable e) { @@ -290,11 +279,80 @@ public class StreamThread extends Thread { log.error("stream-thread [{}] Failed to close restore consumer: ", this.getName(), e); } + // remove all tasks removeStreamTasks(); + removeStandbyTasks(); log.info("stream-thread [{}] Stream thread shutdown complete", this.getName()); } + private void shutdownTasksAndState(final boolean rethrowExceptions) { + // Commit first as there may be cached records that have not been flushed yet. + commitOffsets(rethrowExceptions); + // Close all processors in topology order + closeAllTasks(); + // flush state + flushAllState(rethrowExceptions); + // flush out any extra data sent during close + producer.flush(); + // Close all task state managers + closeAllStateManagers(rethrowExceptions); + } + + interface AbstractTaskAction { + void apply(final AbstractTask task); + } + + private void performOnAllTasks(final AbstractTaskAction action, + final String exceptionMessage, + final boolean throwExceptions) { + final List<AbstractTask> allTasks = new ArrayList<AbstractTask>(activeTasks.values()); + allTasks.addAll(standbyTasks.values()); + for (final AbstractTask task : allTasks) { + try { + action.apply(task); + } catch (KafkaException e) { + log.error(String.format("stream-thread [%s] Failed to %s for %s %s: ", + StreamThread.this.getName(), + exceptionMessage, + task.getClass().getSimpleName(), + task.id()), + e); + if (throwExceptions) { + throw e; + } + } + } + } + + private void closeAllStateManagers(final boolean throwExceptions) { + performOnAllTasks(new AbstractTaskAction() { + @Override + public void apply(final AbstractTask task) { + task.closeStateManager(); + } + }, "close state manager", throwExceptions); + } + + private void commitOffsets(final boolean throwExceptions) { + // Exceptions should not prevent this call from going through all shutdown steps + performOnAllTasks(new AbstractTaskAction() { + @Override + public void apply(final AbstractTask task) { + task.commitOffsets(); + } + }, "commit consumer offsets", throwExceptions); + } + + private void flushAllState(final boolean throwExceptions) { + performOnAllTasks(new AbstractTaskAction() { + @Override + public void apply(final AbstractTask task) { + task.flushState(); + } + }, "flush state", throwExceptions); + } + /** * Compute the latency based on the current marked timestamp, * and update the marked timestamp with the current system timestamp. @@ -593,9 +651,6 @@ public class StreamThread extends Thread { private void removeStreamTasks() { try { - for (StreamTask task : activeTasks.values()) { - closeOne(task); - } prevTasks.clear(); prevTasks.addAll(activeTasks.keySet()); @@ -607,16 +662,6 @@ public class StreamThread extends Thread { } } - private void closeOne(AbstractTask task) { - log.info("stream-thread [{}] Removing a task {}", this.getName(), task.id()); - try { - task.close(); - } catch (StreamsException e) { - log.error(String.format("stream-thread [%s] Failed to close a %s %s: ", this.getName(), task.getClass().getSimpleName(), task.id()), e); - } - sensors.taskDestructionSensor.record(); - } - protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) { sensors.taskCreationSensor.record(); @@ -701,12 +746,19 @@ public class StreamThread extends Thread { return sb.toString(); } + private void closeAllTasks() { + performOnAllTasks(new AbstractTaskAction() { + @Override + public void apply(final AbstractTask task) { + log.info("stream-thread [{}] Removing a task {}", StreamThread.this.getName(), task.id()); + task.close(); + sensors.taskDestructionSensor.record(); + } + }, "close", false); + } private void removeStandbyTasks() { try { - for (StandbyTask task : standbyTasks.values()) { - closeOne(task); - } standbyTasks.clear(); standbyTasksByPartition.clear(); standbyRecords.clear(); http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 7bd1020..e27ffd8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -395,7 +395,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } open = false; closeOpenIterators(); - flush(); options.close(); wOptions.close(); fOptions.close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index b069cd9..7cd0b8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -77,6 +77,16 @@ public class AbstractTaskTest { public void commit() { // do nothing } + + @Override + public void close() { + + } + + @Override + public void commitOffsets() { + // do nothing + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 7c22202..9198fa9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -31,8 +31,11 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.MockStateStoreSupplier; +import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -408,7 +411,7 @@ public class ProcessorStateManagerTest { } @Test - public void testClose() throws IOException { + public void testFlushAndClose() throws IOException { final TaskId taskId = new TaskId(0, 1); File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); // write an empty checkpoint file @@ -445,6 +448,7 @@ public class ProcessorStateManagerTest { stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); } finally { // close the state manager with the ack'ed offsets + stateMgr.flush(new MockProcessorContext(StateSerdes.withBuiltinTypes("foo", String.class, String.class), new NoOpRecordCollector())); stateMgr.close(ackedOffsets); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 268697c..afd9bb6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -199,7 +199,7 @@ public class StandbyTaskTest { assertEquals(Collections.emptyList(), store1.keys); assertEquals(Utils.mkList(1, 2, 3), store2.keys); - task.close(); + task.closeStateManager(); File taskDir = stateDirectory.directoryForTask(taskId); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); @@ -292,7 +292,7 @@ public class StandbyTaskTest { remaining = task.update(ktable, remaining); assertNull(remaining); - task.close(); + task.closeStateManager(); File taskDir = stateDirectory.directoryForTask(taskId); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index c7e9daa..2f252e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -143,6 +143,12 @@ public class StreamThreadTest { } @Override + public void commitOffsets() { + super.commitOffsets(); + committed = true; + } + + @Override protected void initializeOffsetLimits() { // do nothing } http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java index 1788159..d4cc99b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java @@ -16,18 +16,15 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; import java.util.Collections; @@ -49,32 +46,6 @@ public class StateStoreTestUtils { } - static class NoOpRecordCollector extends RecordCollector { - public NoOpRecordCollector() { - super(null, "StateStoreTestUtils"); - } - - @Override - public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { - // no-op - } - - @Override - public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V> partitioner) { - // no-op - } - - @Override - public void flush() { - //no-op - } - - @Override - public void close() { - //no-op - } - } - static class NoOpReadOnlyStore<K, V> implements ReadOnlyKeyValueStore<K, V>, StateStore { http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ea35e6/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java new file mode 100644 index 0000000..880a93b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.internals.RecordCollector; + +public class NoOpRecordCollector extends RecordCollector { + public NoOpRecordCollector() { + super(null, "NoOpRecordCollector"); + } + + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { + // no-op + } + + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V> partitioner) { + // no-op + } + + @Override + public void flush() { + //no-op + } + + @Override + public void close() { + //no-op + } +}
