Repository: kafka Updated Branches: refs/heads/0.11.0 90b6d978e -> 426057094
MINOR: logging improvements on StreamThread This is a manual cherry-pick of https://github.com/apache/kafka/pull/3769 for 0.11.0 Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Damian Guy <damian....@gmail.com>, Bill Bejeck <b...@confluent.io>, Matthias J. Sax <matth...@confluent.io> Closes #3771 from guozhangwang/KMinor-logging-improvements Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/42605709 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/42605709 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/42605709 Branch: refs/heads/0.11.0 Commit: 426057094cc2211578b8546708c7a9b8818aeb05 Parents: 90b6d97 Author: Guozhang Wang <wangg...@gmail.com> Authored: Tue Sep 5 16:07:13 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Sep 5 16:07:13 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/AssignedTasks.java | 4 +- .../processor/internals/StreamThread.java | 42 +++++++++----------- .../processor/internals/StreamThreadTest.java | 5 +-- 3 files changed, 21 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/42605709/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 815d80a..1e9ec60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -62,7 +62,7 @@ class AssignedTasks<T extends AbstractTask> { } void addNewTask(final T task) { - log.trace("{} add new {} {}", logPrefix, taskTypeName, task.id()); + log.trace("{} Add newly created {} {} with assigned partitions {}", logPrefix, taskTypeName, task.id(), task.partitions()); created.put(task.id(), task); } @@ -219,7 +219,7 @@ class AssignedTasks<T extends AbstractTask> { final T task = suspended.get(taskId); if (task.partitions().equals(partitions)) { suspended.remove(taskId); - log.trace("{} Resuming suspended {} {}", logPrefix, taskTypeName, taskId); + log.trace("{} Resuming suspended {} {} with assigned partitions {}", logPrefix, taskTypeName, taskId, partitions); task.resume(); transitionToRunning(task); return true; http://git-wip-us.apache.org/repos/asf/kafka/blob/42605709/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 40d741f..69cb328 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -189,8 +189,9 @@ public class StreamThread extends Thread { log.trace("{} pausing partitions: {}", logPrefix, partitions); consumer.pause(partitions); } catch (final Throwable t) { + log.error("{} Error caught during partition assignment, " + + "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage()); rebalanceException = t; - throw t; } finally { log.info("{} partition assignment took {} ms.\n" + "\tcurrent active tasks: {}\n" + @@ -220,8 +221,9 @@ public class StreamThread extends Thread { // suspend active tasks suspendTasksAndState(); } catch (final Throwable t) { + log.error("{} Error caught during partition revocation, " + + "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage()); rebalanceException = t; - throw t; } finally { streamsMetadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), partitionAssignor.clusterMetadata()); standbyRecords.clear(); @@ -1074,23 +1076,19 @@ public class StreamThread extends Thread { protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { streamsMetrics.taskCreatedSensor.record(); - try { - return new StreamTask( - id, - applicationId, - partitions, - builder.build(id.topicGroupId), - consumer, - storeChangelogReader, - config, - streamsMetrics, - stateDirectory, - cache, - time, - createProducer(id)); - } finally { - log.trace("{} Created active task {} with assigned partitions {}", logPrefix, id, partitions); - } + return new StreamTask( + id, + applicationId, + partitions, + builder.build(id.topicGroupId), + consumer, + storeChangelogReader, + config, + streamsMetrics, + stateDirectory, + cache, + time, + createProducer(id)); } private Producer<byte[], byte[]> createProducer(final TaskId id) { @@ -1154,11 +1152,7 @@ public class StreamThread extends Thread { final ProcessorTopology topology = builder.build(id.topicGroupId); if (!topology.stateStores().isEmpty()) { - try { - return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory); - } finally { - log.trace("{} Created standby task {} with assigned partitions {}", logPrefix, id, partitions); - } + return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory); } else { log.trace("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", logPrefix, id, partitions); http://git-wip-us.apache.org/repos/asf/kafka/blob/42605709/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 07849c3..b13613a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1347,10 +1347,7 @@ public class StreamThreadTest { thread.rebalanceListener.onPartitionsRevoked(null); clientSupplier.producers.get(0).fenceProducer(); - try { - thread.rebalanceListener.onPartitionsAssigned(task0Assignment); - fail("should have thrown " + ProducerFencedException.class.getSimpleName()); - } catch (final ProducerFencedException e) { } + thread.rebalanceListener.onPartitionsAssigned(task0Assignment); assertTrue(thread.tasks().isEmpty()); }