Repository: kafka Updated Branches: refs/heads/trunk 46e4d4013 -> ff7b0f5b4
HOTFIX: make sure to go through all shutdown steps Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #928 from ymatsuda/shutdown Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff7b0f5b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff7b0f5b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff7b0f5b Branch: refs/heads/trunk Commit: ff7b0f5b467bdf553584fb253b00f460dfbe8943 Parents: 46e4d40 Author: Yasuhiro Matsuda <[email protected]> Authored: Mon Feb 22 13:16:06 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Mon Feb 22 13:16:06 2016 -0800 ---------------------------------------------------------------------- .../internals/ProcessorStateManager.java | 70 ++++++++++---------- .../streams/processor/internals/StreamTask.java | 4 +- .../processor/internals/StreamThread.java | 60 +++++++++-------- 3 files changed, 71 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ff7b0f5b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index c3bd82a..d449d04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -298,45 +298,47 @@ public class ProcessorStateManager { } public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException { - if (!stores.isEmpty()) { - log.debug("Closing stores."); - for (Map.Entry<String, StateStore> entry : stores.entrySet()) { - log.debug("Closing storage engine {}", entry.getKey()); - entry.getValue().flush(); - entry.getValue().close(); - } + try { + if (!stores.isEmpty()) { + log.debug("Closing stores."); + for (Map.Entry<String, StateStore> entry : stores.entrySet()) { + log.debug("Closing storage engine {}", entry.getKey()); + entry.getValue().flush(); + entry.getValue().close(); + } - Map<TopicPartition, Long> checkpointOffsets = new HashMap<>(); - for (String storeName : stores.keySet()) { - TopicPartition part; - if (loggingEnabled.contains(storeName)) - part = new TopicPartition(storeChangelogTopic(jobId, storeName), getPartition(storeName)); - else - part = new TopicPartition(storeName, getPartition(storeName)); - - // only checkpoint the offset to the offsets file if it is persistent; - if (stores.get(storeName).persistent()) { - Long offset = ackedOffsets.get(part); - - if (offset != null) { - // store the last offset + 1 (the log position after restoration) - checkpointOffsets.put(part, offset + 1); - } else { - // if no record was produced. we need to check the restored offset. - offset = restoredOffsets.get(part); - if (offset != null) - checkpointOffsets.put(part, offset); + Map<TopicPartition, Long> checkpointOffsets = new HashMap<>(); + for (String storeName : stores.keySet()) { + TopicPartition part; + if (loggingEnabled.contains(storeName)) + part = new TopicPartition(storeChangelogTopic(jobId, storeName), getPartition(storeName)); + else + part = new TopicPartition(storeName, getPartition(storeName)); + + // only checkpoint the offset to the offsets file if it is persistent; + if (stores.get(storeName).persistent()) { + Long offset = ackedOffsets.get(part); + + if (offset != null) { + // store the last offset + 1 (the log position after restoration) + checkpointOffsets.put(part, offset + 1); + } else { + // if no record was produced. we need to check the restored offset. + offset = restoredOffsets.get(part); + if (offset != null) + checkpointOffsets.put(part, offset); + } } } - } - // write the checkpoint file before closing, to indicate clean shutdown - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); - checkpoint.write(checkpointOffsets); + // write the checkpoint file before closing, to indicate clean shutdown + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); + checkpoint.write(checkpointOffsets); + } + } finally { + // release the state directory directoryLock + directoryLock.release(); } - - // release the state directory directoryLock - directoryLock.release(); } private int getPartition(String topic) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ff7b0f5b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index ec3d011..4d66324 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -300,10 +300,10 @@ public class StreamTask extends AbstractTask implements Punctuator { } } + super.close(); + if (exception != null) throw exception; - - super.close(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/ff7b0f5b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3fc9407..70e24d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -138,14 +138,15 @@ public class StreamThread extends Thread { public void onPartitionsRevoked(Collection<TopicPartition> assignment) { try { commitAll(); - // TODO: right now upon partition revocation, we always remove all the tasks; - // this behavior can be optimized to only remove affected tasks in the future - removeStreamTasks(); - removeStandbyTasks(); lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned } catch (Throwable t) { rebalanceException = t; throw t; + } finally { + // TODO: right now upon partition revocation, we always remove all the tasks; + // this behavior can be optimized to only remove affected tasks in the future + removeStreamTasks(); + removeStandbyTasks(); } } }; @@ -273,6 +274,8 @@ public class StreamThread extends Thread { private void shutdown() { log.info("Shutting down stream thread [" + this.getName() + "]"); + // Exceptions should not prevent this call from going through all shutdown steps + try { commitAll(); } catch (Throwable e) { @@ -299,13 +302,8 @@ public class StreamThread extends Thread { log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e); } - // Exceptions should not prevent this call from going through all shutdown steps - try { - removeStreamTasks(); - removeStandbyTasks(); - } catch (Throwable e) { - // already logged in removeStreamTasks() and removeStandbyTasks() - } + removeStreamTasks(); + removeStandbyTasks(); log.info("Stream thread shutdown complete [" + this.getName() + "]"); } @@ -627,15 +625,19 @@ public class StreamThread extends Thread { } private void removeStreamTasks() { - for (StreamTask task : activeTasks.values()) { - closeOne(task); - } + try { + for (StreamTask task : activeTasks.values()) { + closeOne(task); + } + prevTasks.clear(); + prevTasks.addAll(activeTasks.keySet()); - prevTasks.clear(); - prevTasks.addAll(activeTasks.keySet()); + activeTasks.clear(); + activeTasksByPartition.clear(); - activeTasks.clear(); - activeTasksByPartition.clear(); + } catch (Exception e) { + log.error("Failed to remove stream tasks in thread [" + this.getName() + "]: ", e); + } } private void closeOne(AbstractTask task) { @@ -644,7 +646,6 @@ public class StreamThread extends Thread { task.close(); } catch (StreamsException e) { log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e); - throw e; } sensors.taskDestructionSensor.record(); } @@ -701,15 +702,20 @@ public class StreamThread extends Thread { private void removeStandbyTasks() { - for (StandbyTask task : standbyTasks.values()) { - closeOne(task); - } - // un-assign the change log partitions - restoreConsumer.assign(Collections.<TopicPartition>emptyList()); + try { + for (StandbyTask task : standbyTasks.values()) { + closeOne(task); + } + standbyTasks.clear(); + standbyTasksByPartition.clear(); + standbyRecords.clear(); - standbyTasks.clear(); - standbyTasksByPartition.clear(); - standbyRecords.clear(); + // un-assign the change log partitions + restoreConsumer.assign(Collections.<TopicPartition>emptyList()); + + } catch (Exception e) { + log.error("Failed to remove standby tasks in thread [" + this.getName() + "]: ", e); + } } private void ensureCopartitioning(Collection<Set<String>> copartitionGroups) {
