Repository: kafka Updated Branches: refs/heads/0.11.0 7e0ec9c76 -> 452e2eedb
HOTFIX: handle commit failed exception on stream thread's suspend task 1. Capture `CommitFailedException` in `StreamThread#suspendTasksAndState`. 2. Remove `Cache` from AbstractTask as it is not needed any more; remove not used cleanup related variables from StreamThread (cc dguy to double check). 3. Also fix log4j outputs for error and warn, such that for WARN we do not print stack trace, and for ERROR we remove the dangling colon since the exception stack trace will start in newline. 4. Update one log4j entry to always print as WARN for errors closing a zombie task (cc mjsax ). Author: Guozhang Wang <[email protected]> Reviewers: Matthias J. Sax <[email protected]>, Damian Guy <[email protected]> Closes #3574 from guozhangwang/KHotfix-handle-commit-failed-exception-in-suspend (cherry picked from commit 228a4fdb6dc15d1b8615615e00825e7ce53a41fa) Signed-off-by: Guozhang Wang <[email protected]> fix unit test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/452e2eed Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/452e2eed Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/452e2eed Branch: refs/heads/0.11.0 Commit: 452e2eedb84b77c6f16085cf5b59d9cadb609149 Parents: 7e0ec9c Author: Guozhang Wang <[email protected]> Authored: Tue Aug 1 12:28:24 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Aug 1 14:51:52 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/AbstractTask.java | 15 ++--- .../processor/internals/StandbyTask.java | 2 +- .../streams/processor/internals/StreamTask.java | 39 ++++++------ .../processor/internals/StreamThread.java | 67 +++++++++++--------- .../processor/internals/AbstractTaskTest.java | 3 - .../processor/internals/StreamThreadTest.java | 56 +++++++++++++++- 6 files changed, 115 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 02129ab..8427e11 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -27,7 +27,6 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,14 +43,14 @@ public abstract class AbstractTask { final TaskId id; final String applicationId; final ProcessorTopology topology; - final Consumer consumer; final ProcessorStateManager stateMgr; final Set<TopicPartition> partitions; - InternalProcessorContext processorContext; - private final ThreadCache cache; + final Consumer consumer; final String logPrefix; final boolean eosEnabled; + InternalProcessorContext processorContext; + /** * @throws ProcessorStateException if the state manager cannot be created */ @@ -63,15 +62,13 @@ public abstract class AbstractTask { final ChangelogReader changelogReader, final boolean isStandby, final StateDirectory stateDirectory, - final ThreadCache cache, final StreamsConfig config) { this.id = id; this.applicationId = applicationId; this.partitions = new HashSet<>(partitions); this.topology = topology; this.consumer = consumer; - this.cache = cache; - eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); + this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); logPrefix = String.format("%s [%s]", isStandby ? "standby-task" : "task", id()); @@ -116,10 +113,6 @@ public abstract class AbstractTask { return processorContext; } - public final ThreadCache cache() { - return cache; - } - public StateStore getStore(final String name) { return stateMgr.getStore(name); } http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 8d518ae..df3bea4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -59,7 +59,7 @@ public class StandbyTask extends AbstractTask { final StreamsConfig config, final StreamsMetrics metrics, final StateDirectory stateDirectory) { - super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, null, config); + super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, config); // initialize the topology with its own context processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 71c9e69..c337911 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -108,7 +108,7 @@ public class StreamTask extends AbstractTask implements Punctuator { final ThreadCache cache, final Time time, final Producer<byte[], byte[]> producer) { - super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache, config); + super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, config); punctuationQueue = new PunctuationQueue(); maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); this.metrics = new TaskMetrics(metrics); @@ -254,7 +254,6 @@ public class StreamTask extends AbstractTask implements Punctuator { @Override public void commit() { commit(true); - } // visible for testing @@ -312,7 +311,7 @@ public class StreamTask extends AbstractTask implements Punctuator { try { consumer.commitSync(consumedOffsetsAndMetadata); } catch (final CommitFailedException e) { - log.warn("{} Failed offset commits {}: ", logPrefix, consumedOffsetsAndMetadata, e); + log.warn("{} Failed offset commits {} due to CommitFailedException", logPrefix, consumedOffsetsAndMetadata); throw e; } } @@ -401,7 +400,7 @@ public class StreamTask extends AbstractTask implements Punctuator { if (firstException == null) { firstException = e; } - log.error("{} Could not close state manager: ", logPrefix, e); + log.error("{} Could not close state manager due to the following error:", logPrefix, e); } try { @@ -420,7 +419,7 @@ public class StreamTask extends AbstractTask implements Punctuator { try { recordCollector.close(); } catch (final Throwable e) { - log.error("{} Failed to close producer: ", logPrefix, e); + log.error("{} Failed to close producer due to the following error:", logPrefix, e); } } } @@ -430,20 +429,20 @@ public class StreamTask extends AbstractTask implements Punctuator { } } - /** - * <pre> - * - {@link #suspend(boolean) suspend(clean)} - * - close topology - * - if (clean) {@link #commit()} - * - flush state and producer - * - commit offsets - * - close state - * - if (clean) write checkpoint - * - if (eos) close producer - * </pre> - * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} -- - * otherwise, just close open resources - */ + /** + * <pre> + * - {@link #suspend(boolean) suspend(clean)} + * - close topology + * - if (clean) {@link #commit()} + * - flush state and producer + * - commit offsets + * - close state + * - if (clean) write checkpoint + * - if (eos) close producer + * </pre> + * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} -- + * otherwise, just close open resources + */ @Override public void close(boolean clean) { log.debug("{} Closing", logPrefix); @@ -454,7 +453,7 @@ public class StreamTask extends AbstractTask implements Punctuator { } catch (final RuntimeException e) { clean = false; firstException = e; - log.error("{} Could not close task: ", logPrefix, e); + log.error("{} Could not close task due to the following error:", logPrefix, e); } closeSuspended(clean, firstException); http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index db42061..9774627 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -257,7 +257,7 @@ public class StreamThread extends Thread { } catch (final LockException e) { // ignore and retry if (!retryingTasks.contains(taskId)) { - log.warn("{} Could not create task {}. Will retry: ", logPrefix, taskId, e); + log.warn("{} Could not create task {} due to {}; will retry", logPrefix, taskId, e); retryingTasks.add(taskId); } } @@ -455,7 +455,7 @@ public class StreamThread extends Thread { streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + threadClientId, Collections.singletonMap("client-id", threadClientId)); if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) { - log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes.", logPrefix); + log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes", logPrefix); } cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics); eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); @@ -501,7 +501,6 @@ public class StreamThread extends Thread { lastCommitMs = timerStartedMs; rebalanceListener = new RebalanceListener(time, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); setState(State.RUNNING); - } /** @@ -524,7 +523,7 @@ public class StreamThread extends Thread { } catch (final Exception e) { // we have caught all Kafka related exceptions, and other runtime exceptions // should be due to user application errors - log.error("{} Streams application error during processing: ", logPrefix, e); + log.error("{} Encountered the following error during processing:", logPrefix, e); throw e; } finally { shutdown(cleanRun); @@ -668,6 +667,9 @@ public class StreamThread extends Thread { try { // we processed one record, // if more are buffered waiting for the next round + + // TODO: We should check for stream time punctuation right after each process call + // of the task instead of only calling it after all records being processed if (task.process()) { totalProcessedEachRound++; totalProcessedSinceLastMaybeCommit++; @@ -714,6 +716,7 @@ public class StreamThread extends Thread { } } }); + if (e != null) { throw e; } @@ -729,7 +732,7 @@ public class StreamThread extends Thread { streamsMetrics.punctuateTimeSensor.record(computeLatency(), timerStartedMs); } } catch (final KafkaException e) { - log.error("{} Failed to punctuate active task {}: ", logPrefix, task.id(), e); + log.error("{} Failed to punctuate active task {} due to the following error:", logPrefix, task.id(), e); throw e; } } @@ -819,11 +822,11 @@ public class StreamThread extends Thread { try { task.commit(); } catch (final CommitFailedException e) { - // commit failed. Just log it. - log.warn("{} Failed to commit {} {} state: ", logPrefix, task.getClass().getSimpleName(), task.id(), e); + // commit failed. This is already logged inside the task as WARN and we can just log it again here. + log.warn("{} Failed to commit {} {} state due to CommitFailedException; this task may be no longer owned by the thread", logPrefix, task.getClass().getSimpleName(), task.id()); } catch (final KafkaException e) { // commit failed due to an unexpected exception. Log it and rethrow the exception. - log.error("{} Failed to commit {} {} state: ", logPrefix, task.getClass().getSimpleName(), task.id(), e); + log.error("{} Failed to commit {} {} state due to the following error:", logPrefix, task.getClass().getSimpleName(), task.id(), e); throw e; } @@ -1052,23 +1055,23 @@ public class StreamThread extends Thread { try { threadProducer.close(); } catch (final Throwable e) { - log.error("{} Failed to close producer: ", logPrefix, e); + log.error("{} Failed to close producer due to the following error:", logPrefix, e); } } try { consumer.close(); } catch (final Throwable e) { - log.error("{} Failed to close consumer: ", logPrefix, e); + log.error("{} Failed to close consumer due to the following error:", logPrefix, e); } try { restoreConsumer.close(); } catch (final Throwable e) { - log.error("{} Failed to close restore consumer: ", logPrefix, e); + log.error("{} Failed to close restore consumer due to the following error:", logPrefix, e); } try { partitionAssignor.close(); } catch (final Throwable e) { - log.error("{} Failed to close KafkaStreamClient: ", logPrefix, e); + log.error("{} Failed to close KafkaStreamClient due to the following error:", logPrefix, e); } removeStreamTasks(); @@ -1091,7 +1094,7 @@ public class StreamThread extends Thread { try { task.close(cleanRun); } catch (final RuntimeException e) { - log.error("{} Failed while closing {} {}: ", + log.error("{} Failed while closing {} {} due to the following error:", logPrefix, task.getClass().getSimpleName(), task.id(), @@ -1123,11 +1126,15 @@ public class StreamThread extends Thread { public void apply(final StreamTask task) { try { task.suspend(); + } catch (final CommitFailedException e) { + // commit failed during suspension. Just log it. + log.warn("{} Failed to commit task {} state when suspending due to CommitFailedException", logPrefix, task.id); } catch (final Exception e) { + log.error("{} Suspending task {} failed due to the following error:", logPrefix, task.id, e); try { task.close(false); } catch (final Exception f) { - log.error("{} Closing task {} failed: ", logPrefix, task.id, f); + log.error("{} After suspending failed, closing the same task {} failed again due to the following error:", logPrefix, task.id, f); } throw e; } @@ -1139,10 +1146,11 @@ public class StreamThread extends Thread { try { task.suspend(); } catch (final Exception e) { + log.error("{} Suspending standby task {} failed due to the following error:", logPrefix, task.id, e); try { task.close(false); } catch (final Exception f) { - log.error("{} Closing standby task {} failed: ", logPrefix, task.id, f); + log.error("{} After suspending failed, closing the same standby task {} failed again due to the following error:", logPrefix, task.id, f); } throw e; } @@ -1166,7 +1174,7 @@ public class StreamThread extends Thread { // un-assign the change log partitions restoreConsumer.assign(Collections.<TopicPartition>emptyList()); } catch (final RuntimeException e) { - log.error("{} Failed to un-assign change log partitions: ", logPrefix, e); + log.error("{} Failed to un-assign change log partitions due to the following error:", logPrefix, e); return e; } return null; @@ -1193,7 +1201,7 @@ public class StreamThread extends Thread { private StreamTask findMatchingSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) { if (suspendedTasks.containsKey(taskId)) { final StreamTask task = suspendedTasks.get(taskId); - if (task.partitions.equals(partitions)) { + if (task.partitions().equals(partitions)) { return task; } } @@ -1203,7 +1211,7 @@ public class StreamThread extends Thread { private StandbyTask findMatchingSuspendedStandbyTask(final TaskId taskId, final Set<TopicPartition> partitions) { if (suspendedStandbyTasks.containsKey(taskId)) { final StandbyTask task = suspendedStandbyTasks.get(taskId); - if (task.partitions.equals(partitions)) { + if (task.partitions().equals(partitions)) { return task; } } @@ -1218,11 +1226,11 @@ public class StreamThread extends Thread { final StreamTask task = next.getValue(); final Set<TopicPartition> assignedPartitionsForTask = newTaskAssignment.get(next.getKey()); if (!task.partitions().equals(assignedPartitionsForTask)) { - log.debug("{} Closing suspended non-assigned active task {}", logPrefix, task.id()); + log.debug("{} Closing suspended and not re-assigned task {}", logPrefix, task.id()); try { task.closeSuspended(true, null); } catch (final Exception e) { - log.error("{} Failed to remove suspended task {}: ", logPrefix, next.getKey(), e); + log.error("{} Failed to close suspended task {} due to the following error:", logPrefix, next.getKey(), e); } finally { suspendedTaskIterator.remove(); } @@ -1237,11 +1245,11 @@ public class StreamThread extends Thread { final Map.Entry<TaskId, StandbyTask> suspendedTask = standByTaskIterator.next(); if (!currentSuspendedTaskIds.contains(suspendedTask.getKey())) { final StandbyTask task = suspendedTask.getValue(); - log.debug("{} Closing suspended non-assigned standby task {}", logPrefix, task.id()); + log.debug("{} Closing suspended and not re-assigned standby task {}", logPrefix, task.id()); try { task.close(true); } catch (final Exception e) { - log.error("{} Failed to remove suspended standby task {}: ", logPrefix, task.id(), e); + log.error("{} Failed to remove suspended standby task {} due to the following error:", logPrefix, task.id(), e); } finally { standByTaskIterator.remove(); } @@ -1321,7 +1329,7 @@ public class StreamThread extends Thread { newTasks.put(taskId, partitions); } } catch (final StreamsException e) { - log.error("{} Failed to create an active task {}: ", logPrefix, taskId, e); + log.error("{} Failed to create an active task {} due to the following error:", logPrefix, taskId, e); throw e; } } else { @@ -1434,7 +1442,7 @@ public class StreamThread extends Thread { activeTasks.clear(); activeTasksByPartition.clear(); } catch (final Exception e) { - log.error("{} Failed to remove stream tasks: ", logPrefix, e); + log.error("{} Failed to remove stream tasks due to the following error:", logPrefix, e); } } @@ -1447,14 +1455,11 @@ public class StreamThread extends Thread { } private void closeZombieTask(final StreamTask task) { - log.warn("{} Producer of task {} fenced; closing zombie task.", logPrefix, task.id); + log.warn("{} Producer of task {} fenced; closing zombie task", logPrefix, task.id); try { task.close(false); - } catch (final Exception f) { - if (!log.isDebugEnabled() && !log.isTraceEnabled()) { - log.warn("{} Failed to close zombie task: {}", logPrefix, f.getMessage()); - } - log.debug("{} Failed to close zombie task: ", logPrefix, f); + } catch (final Exception e) { + log.warn("{} Failed to close zombie task due to {}, ignore and proceed", logPrefix, e); } activeTasks.remove(task.id); } @@ -1471,7 +1476,7 @@ public class StreamThread extends Thread { closeZombieTask(task); it.remove(); } catch (final RuntimeException t) { - log.error("{} Failed to {} stream task {}: ", + log.error("{} Failed to {} stream task {} due to the following error:", logPrefix, action.name(), task.id(), http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index ba3230a..123cbf0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -24,14 +24,12 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -80,7 +78,6 @@ public class AbstractTaskTest { new StoreChangelogReader(consumer, Time.SYSTEM, 5000), false, new StateDirectory("app", TestUtils.tempDirectory().getPath(), time), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())), config) { @Override public void resume() {} http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index a0882cf..d8d2e4f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -93,7 +94,6 @@ public class StreamThreadTest { private final StreamsConfig config = new StreamsConfig(configProps(false)); - @Before public void setUp() throws Exception { processId = UUID.randomUUID(); @@ -1533,6 +1533,60 @@ public class StreamThreadTest { } @Test + public void shouldCaptureCommitFailedExceptionOnTaskSuspension() throws Exception { + builder.stream("t1"); + + final TestStreamTask testStreamTask = new TestStreamTask( + new TaskId(0, 0), + applicationId, + Utils.mkSet(new TopicPartition("t1", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.getProducer(new HashMap<String, Object>()), + clientSupplier.restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime)) { + + @Override + public void suspend() { + throw new CommitFailedException(); + } + }; + + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0) { + + @Override + protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { + return testStreamTask; + } + }; + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + activeTasks.put(testStreamTask.id(), testStreamTask.partitions); + + thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); + thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions); + + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); + + assertFalse(testStreamTask.committed); + } + + + @Test public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); builder.setApplicationId(applicationId);
