Repository: kafka Updated Branches: refs/heads/trunk 962c378cc -> b9f812491
MINOR: Improve log4j on stream thread and stream process Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Matthias J. Sax <matth...@confluent.io>, Eno Thereska <e...@confluent.io>, Damian Guy <damian....@gmail.com>, Jason Gustafson <ja...@confluent.io> Closes #2685 from guozhangwang/KMinor-improve-log4j Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9f81249 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9f81249 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9f81249 Branch: refs/heads/trunk Commit: b9f812491f5eb06ffe5b68f5e53df4302d2f68a8 Parents: 962c378 Author: Guozhang Wang <wangg...@gmail.com> Authored: Wed Mar 15 10:46:57 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Mar 15 10:46:57 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 24 ++++++++---- .../processor/internals/StreamThread.java | 39 ++++++++------------ 2 files changed, 32 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b9f81249/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 8d8626d..b23d244 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -131,6 +131,7 @@ public class KafkaStreams { // of the co-location of stream thread's consumers. It is for internal // usage only and should not be exposed to users at all. private final UUID processId; + private final String logPrefix; private final StreamsMetadataState streamsMetadataState; private final StreamsConfig config; @@ -217,9 +218,13 @@ public class KafkaStreams { private synchronized void setState(final State newState) { final State oldState = state; if (!state.isValidTransition(newState)) { - log.warn("Unexpected state transition from {} to {}.", oldState, newState); + log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState); + } else { + log.info("{} State transition from {} to {}.", logPrefix, oldState, newState); } + state = newState; + if (stateListener != null) { stateListener.onChange(state, oldState); } @@ -310,6 +315,8 @@ public class KafkaStreams { if (clientId.length() <= 0) clientId = applicationId + "-" + processId; + this.logPrefix = String.format("stream-client [%s]", clientId); + final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); @@ -329,7 +336,7 @@ public class KafkaStreams { final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology(); if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) { - log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes."); + log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix); } final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / @@ -395,7 +402,7 @@ public class KafkaStreams { try { client.close(); } catch (final IOException e) { - log.warn("Could not close StreamKafkaClient.", e); + log.warn("{} Could not close StreamKafkaClient.", logPrefix, e); } } @@ -411,7 +418,7 @@ public class KafkaStreams { * @throws StreamsException if the Kafka brokers have version 0.10.0.x */ public synchronized void start() throws IllegalStateException, StreamsException { - log.debug("Starting Kafka Stream process."); + log.debug("{} Starting Kafka Stream process.", logPrefix); if (state == State.CREATED) { checkBrokerVersionCompatibility(); @@ -425,7 +432,7 @@ public class KafkaStreams { thread.start(); } - log.info("Started Kafka Stream process"); + log.info("{} Started Kafka Stream process", logPrefix); } else { throw new IllegalStateException("Cannot start again."); } @@ -450,7 +457,7 @@ public class KafkaStreams { * before all threads stopped */ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { - log.debug("Stopping Kafka Stream process."); + log.debug("{} Stopping Kafka Stream process.", logPrefix); if (state.isCreatedOrRunning()) { setState(State.PENDING_SHUTDOWN); // save the current thread so that if it is a stream thread @@ -486,7 +493,7 @@ public class KafkaStreams { } metrics.close(); - log.info("Stopped Kafka Streams process."); + log.info("{} Stopped Kafka Streams process.", logPrefix); } }, "kafka-streams-close-thread"); shutdown.setDaemon(true); @@ -556,7 +563,8 @@ public class KafkaStreams { final String stateDir = config.getString(StreamsConfig.STATE_DIR_CONFIG); final String localApplicationDir = stateDir + File.separator + appId; - log.debug("Removing local Kafka Streams application data in {} for application {}.", + log.debug("{} Removing local Kafka Streams application data in {} for application {}.", + logPrefix, localApplicationDir, appId); http://git-wip-us.apache.org/repos/asf/kafka/blob/b9f81249/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 9a2c3fa..6a6b508 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -156,8 +156,11 @@ public class StreamThread extends Thread { private synchronized void setState(State newState) { State oldState = state; if (!state.isValidTransition(newState)) { - log.warn("Unexpected state transition from " + state + " to " + newState); + log.warn("Unexpected state transition from {} to {}.", logPrefix, oldState, newState); + } else { + log.info("{} State transition from {} to {}.", logPrefix, oldState, newState); } + state = newState; if (stateListener != null) { stateListener.onChange(this, state, oldState); @@ -296,7 +299,7 @@ public class StreamThread extends Thread { this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment this.lastCommitMs = timerStartedMs; this.rebalanceListener = new RebalanceListener(time, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); - setState(state.RUNNING); + setState(State.RUNNING); } @@ -366,7 +369,7 @@ public class StreamThread extends Thread { try { partitionAssignor.close(); } catch (Throwable e) { - log.error("stream-thread [{}] Failed to close KafkaStreamClient: ", this.getName(), e); + log.error("{} Failed to close KafkaStreamClient: ", logPrefix, e); } removeStreamTasks(); @@ -393,7 +396,7 @@ public class StreamThread extends Thread { @SuppressWarnings("ThrowableNotThrown") private void shutdownTasksAndState() { - log.debug("{} shutdownTasksAndState: shutting down all active tasks [{}] and standby tasks [{}]", logPrefix, + log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby tasks {}", logPrefix, activeTasks.keySet(), standbyTasks.keySet()); final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); @@ -418,7 +421,7 @@ public class StreamThread extends Thread { * soon the tasks will be assigned again */ private void suspendTasksAndState() { - log.debug("{} suspendTasksAndState: suspending all active tasks [{}] and standby tasks [{}]", logPrefix, + log.debug("{} suspendTasksAndState: suspending all active tasks {} and standby tasks {}", logPrefix, activeTasks.keySet(), standbyTasks.keySet()); final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); // Close all topology nodes @@ -709,7 +712,7 @@ public class StreamThread extends Thread { * Commit the states of all its tasks */ private void commitAll() { - log.trace("stream-thread [{}] Committing all its owned tasks", this.getName()); + log.trace("{} Committing all its owned tasks", logPrefix); for (StreamTask task : activeTasks.values()) { commitOne(task); } @@ -775,7 +778,7 @@ public class StreamThread extends Thread { } protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) { - log.info("{} Creating active task {} with assigned partitions [{}]", logPrefix, id, partitions); + log.info("{} Creating active task {} with assigned partitions {}", logPrefix, id, partitions); streamsMetrics.taskCreatedSensor.record(); @@ -894,7 +897,7 @@ public class StreamThread extends Thread { } StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) { - log.info("{} Creating new standby task {} with assigned partitions [{}]", logPrefix, id, partitions); + log.info("{} Creating new standby task {} with assigned partitions {}", logPrefix, id, partitions); streamsMetrics.taskCreatedSensor.record(); @@ -965,14 +968,14 @@ public class StreamThread extends Thread { } private void updateSuspendedTasks() { - log.info("{} Updating suspended tasks to contain active tasks [{}]", logPrefix, activeTasks.keySet()); + log.info("{} Updating suspended tasks to contain active tasks {}", logPrefix, activeTasks.keySet()); suspendedTasks.clear(); suspendedTasks.putAll(activeTasks); suspendedStandbyTasks.putAll(standbyTasks); } private void removeStreamTasks() { - log.info("{} Removing all active tasks [{}]", logPrefix, activeTasks.keySet()); + log.info("{} Removing all active tasks {}", logPrefix, activeTasks.keySet()); try { prevActiveTasks.clear(); @@ -987,7 +990,7 @@ public class StreamThread extends Thread { } private void removeStandbyTasks() { - log.info("{} Removing all standby tasks [{}]", logPrefix, standbyTasks.keySet()); + log.info("{} Removing all standby tasks {}", logPrefix, standbyTasks.keySet()); standbyTasks.clear(); standbyTasksByPartition.clear(); @@ -1197,12 +1200,7 @@ public class StreamThread extends Thread { public void onPartitionsAssigned(Collection<TopicPartition> assignment) { final long start = time.milliseconds(); try { - if (state == State.PENDING_SHUTDOWN) { - log.info("stream-thread [{}] New partitions [{}] assigned while shutting down.", - StreamThread.this.getName(), assignment); - } - log.info("stream-thread [{}] New partitions [{}] assigned at the end of consumer rebalance.", - StreamThread.this.getName(), assignment); + log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.", logPrefix, state, assignment); storeChangelogReader = new StoreChangelogReader(restoreConsumer, time, requestTimeOut); setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS); // do this first as we may have suspended standby tasks that @@ -1226,12 +1224,7 @@ public class StreamThread extends Thread { @Override public void onPartitionsRevoked(Collection<TopicPartition> assignment) { try { - if (state == State.PENDING_SHUTDOWN) { - log.info("stream-thread [{}] New partitions [{}] revoked while shutting down.", - StreamThread.this.getName(), assignment); - } - log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.", - StreamThread.this.getName(), assignment); + log.info("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.", logPrefix, state, assignment); setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED); lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned // suspend active tasks