Repository: kafka Updated Branches: refs/heads/0.10.2 65c8f680e -> 2dfeff064
MINOR: Improve log4j on stream thread and stream process Cherry pick from https://github.com/apache/kafka/pull/2685 Author: Eno Thereska <eno.there...@gmail.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #2817 from enothereska/KMinor-improve-log4j-0.10.2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2dfeff06 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2dfeff06 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2dfeff06 Branch: refs/heads/0.10.2 Commit: 2dfeff064a5f0a5adfb5adb29015e0fdcc85c45a Parents: 65c8f68 Author: Eno Thereska <eno.there...@gmail.com> Authored: Thu Apr 6 12:28:40 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Thu Apr 6 12:28:40 2017 +0100 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 24 ++++++++---- .../processor/internals/StreamThread.java | 40 ++++++++------------ 2 files changed, 32 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2dfeff06/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 68157b7..cd0e861 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -132,6 +132,7 @@ public class KafkaStreams { // of the co-location of stream thread's consumers. It is for internal // usage only and should not be exposed to users at all. private final UUID processId; + private final String logPrefix; private final StreamsMetadataState streamsMetadataState; private final StreamsConfig config; @@ -218,9 +219,13 @@ public class KafkaStreams { private synchronized void setState(final State newState) { final State oldState = state; if (!state.isValidTransition(newState)) { - log.warn("Unexpected state transition from {} to {}.", oldState, newState); + log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState); + } else { + log.info("{} State transition from {} to {}.", logPrefix, oldState, newState); } + state = newState; + if (stateListener != null) { stateListener.onChange(state, oldState); } @@ -311,6 +316,8 @@ public class KafkaStreams { if (clientId.length() <= 0) clientId = applicationId + "-" + processId; + this.logPrefix = String.format("stream-client [%s]", clientId); + final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); @@ -330,7 +337,7 @@ public class KafkaStreams { final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology(); if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) { - log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes."); + log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix); } final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / @@ -396,7 +403,7 @@ public class KafkaStreams { try { client.close(); } catch (final IOException e) { - log.warn("Could not close StreamKafkaClient.", e); + log.warn("{} Could not close StreamKafkaClient.", logPrefix, e); } } @@ -412,7 +419,7 @@ public class KafkaStreams { * @throws StreamsException if the Kafka brokers have version 0.10.0.x */ public synchronized void start() throws IllegalStateException, StreamsException { - log.debug("Starting Kafka Stream process."); + log.debug("{} Starting Kafka Stream process.", logPrefix); if (state == State.CREATED) { checkBrokerVersionCompatibility(); @@ -426,7 +433,7 @@ public class KafkaStreams { thread.start(); } - log.info("Started Kafka Stream process"); + log.info("{} Started Kafka Stream process", logPrefix); } else { throw new IllegalStateException("Cannot start again."); } @@ -451,7 +458,7 @@ public class KafkaStreams { * before all threads stopped */ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { - log.debug("Stopping Kafka Stream process."); + log.debug("{} Stopping Kafka Stream process.", logPrefix); if (state.isCreatedOrRunning()) { setState(State.PENDING_SHUTDOWN); // save the current thread so that if it is a stream thread @@ -487,7 +494,7 @@ public class KafkaStreams { } metrics.close(); - log.info("Stopped Kafka Streams process."); + log.info("{} Stopped Kafka Streams process.", logPrefix); } }, "kafka-streams-close-thread"); shutdown.setDaemon(true); @@ -557,7 +564,8 @@ public class KafkaStreams { final String stateDir = config.getString(StreamsConfig.STATE_DIR_CONFIG); final String localApplicationDir = stateDir + File.separator + appId; - log.debug("Removing local Kafka Streams application data in {} for application {}.", + log.debug("{} Removing local Kafka Streams application data in {} for application {}.", + logPrefix, localApplicationDir, appId); http://git-wip-us.apache.org/repos/asf/kafka/blob/2dfeff06/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d053431..5e64515 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -158,8 +158,11 @@ public class StreamThread extends Thread { private synchronized void setState(State newState) { State oldState = state; if (!state.isValidTransition(newState)) { - log.warn("Unexpected state transition from " + state + " to " + newState); + log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState); + } else { + log.info("{} State transition from {} to {}.", logPrefix, oldState, newState); } + state = newState; if (stateListener != null) { stateListener.onChange(this, state, oldState); @@ -222,13 +225,7 @@ public class StreamThread extends Thread { public void onPartitionsAssigned(Collection<TopicPartition> assignment) { try { - if (state == State.PENDING_SHUTDOWN) { - log.info("stream-thread [{}] New partitions [{}] assigned while shutting down.", - StreamThread.this.getName(), assignment); - } - log.info("stream-thread [{}] New partitions [{}] assigned at the end of consumer rebalance.", - StreamThread.this.getName(), assignment); - + log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.", logPrefix, state, assignment); setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS); // do this first as we may have suspended standby tasks that // will become active or vice versa @@ -248,12 +245,7 @@ public class StreamThread extends Thread { @Override public void onPartitionsRevoked(Collection<TopicPartition> assignment) { try { - if (state == State.PENDING_SHUTDOWN) { - log.info("stream-thread [{}] New partitions [{}] revoked while shutting down.", - StreamThread.this.getName(), assignment); - } - log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.", - StreamThread.this.getName(), assignment); + log.info("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.", logPrefix, state, assignment); setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED); lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned // suspend active tasks @@ -348,7 +340,7 @@ public class StreamThread extends Thread { this.timerStartedMs = time.milliseconds(); this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment this.lastCommitMs = timerStartedMs; - setState(state.RUNNING); + setState(State.RUNNING); } public void partitionAssignor(StreamPartitionAssignor partitionAssignor) { @@ -417,7 +409,7 @@ public class StreamThread extends Thread { try { partitionAssignor.close(); } catch (Throwable e) { - log.error("stream-thread [{}] Failed to close KafkaStreamClient: ", this.getName(), e); + log.error("{} Failed to close KafkaStreamClient: ", logPrefix, e); } removeStreamTasks(); @@ -444,7 +436,7 @@ public class StreamThread extends Thread { @SuppressWarnings("ThrowableNotThrown") private void shutdownTasksAndState() { - log.debug("{} shutdownTasksAndState: shutting down all active tasks [{}] and standby tasks [{}]", logPrefix, + log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby tasks {}", logPrefix, activeTasks.keySet(), standbyTasks.keySet()); final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); @@ -469,7 +461,7 @@ public class StreamThread extends Thread { * soon the tasks will be assigned again */ private void suspendTasksAndState() { - log.debug("{} suspendTasksAndState: suspending all active tasks [{}] and standby tasks [{}]", logPrefix, + log.debug("{} suspendTasksAndState: suspending all active tasks {} and standby tasks {}", logPrefix, activeTasks.keySet(), standbyTasks.keySet()); final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); // Close all topology nodes @@ -762,7 +754,7 @@ public class StreamThread extends Thread { * Commit the states of all its tasks */ private void commitAll() { - log.trace("stream-thread [{}] Committing all its owned tasks", this.getName()); + log.trace("{} Committing all its owned tasks", logPrefix); for (StreamTask task : activeTasks.values()) { commitOne(task); } @@ -828,7 +820,7 @@ public class StreamThread extends Thread { } protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) { - log.info("{} Creating active task {} with assigned partitions [{}]", logPrefix, id, partitions); + log.info("{} Creating active task {} with assigned partitions {}", logPrefix, id, partitions); streamsMetrics.taskCreatedSensor.record(); @@ -941,7 +933,7 @@ public class StreamThread extends Thread { } StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) { - log.info("{} Creating new standby task {} with assigned partitions [{}]", logPrefix, id, partitions); + log.info("{} Creating new standby task {} with assigned partitions {}", logPrefix, id, partitions); streamsMetrics.taskCreatedSensor.record(); @@ -1012,14 +1004,14 @@ public class StreamThread extends Thread { } private void updateSuspendedTasks() { - log.info("{} Updating suspended tasks to contain active tasks [{}]", logPrefix, activeTasks.keySet()); + log.info("{} Updating suspended tasks to contain active tasks {}", logPrefix, activeTasks.keySet()); suspendedTasks.clear(); suspendedTasks.putAll(activeTasks); suspendedStandbyTasks.putAll(standbyTasks); } private void removeStreamTasks() { - log.info("{} Removing all active tasks [{}]", logPrefix, activeTasks.keySet()); + log.info("{} Removing all active tasks {}", logPrefix, activeTasks.keySet()); try { prevTasks.clear(); @@ -1034,7 +1026,7 @@ public class StreamThread extends Thread { } private void removeStandbyTasks() { - log.info("{} Removing all standby tasks [{}]", logPrefix, standbyTasks.keySet()); + log.info("{} Removing all standby tasks {}", logPrefix, standbyTasks.keySet()); standbyTasks.clear(); standbyTasksByPartition.clear();