Repository: kafka
Updated Branches:
  refs/heads/0.10.2 65c8f680e -> 2dfeff064


MINOR: Improve log4j on stream thread and stream process

Cherry pick from https://github.com/apache/kafka/pull/2685

Author: Eno Thereska <eno.there...@gmail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #2817 from enothereska/KMinor-improve-log4j-0.10.2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2dfeff06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2dfeff06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2dfeff06

Branch: refs/heads/0.10.2
Commit: 2dfeff064a5f0a5adfb5adb29015e0fdcc85c45a
Parents: 65c8f68
Author: Eno Thereska <eno.there...@gmail.com>
Authored: Thu Apr 6 12:28:40 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Thu Apr 6 12:28:40 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 24 ++++++++----
 .../processor/internals/StreamThread.java       | 40 ++++++++------------
 2 files changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2dfeff06/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 68157b7..cd0e861 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -132,6 +132,7 @@ public class KafkaStreams {
     // of the co-location of stream thread's consumers. It is for internal
     // usage only and should not be exposed to users at all.
     private final UUID processId;
+    private final String logPrefix;
     private final StreamsMetadataState streamsMetadataState;
 
     private final StreamsConfig config;
@@ -218,9 +219,13 @@ public class KafkaStreams {
     private synchronized void setState(final State newState) {
         final State oldState = state;
         if (!state.isValidTransition(newState)) {
-            log.warn("Unexpected state transition from {} to {}.", oldState, 
newState);
+            log.warn("{} Unexpected state transition from {} to {}.", 
logPrefix, oldState, newState);
+        } else {
+            log.info("{} State transition from {} to {}.", logPrefix, 
oldState, newState);
         }
+
         state = newState;
+
         if (stateListener != null) {
             stateListener.onChange(state, oldState);
         }
@@ -311,6 +316,8 @@ public class KafkaStreams {
         if (clientId.length() <= 0)
             clientId = applicationId + "-" + processId;
 
+        this.logPrefix = String.format("stream-client [%s]", clientId);
+
         final List<MetricsReporter> reporters = 
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
             MetricsReporter.class);
         reporters.add(new JmxReporter(JMX_PREFIX));
@@ -330,7 +337,7 @@ public class KafkaStreams {
         final ProcessorTopology globalTaskTopology = 
builder.buildGlobalStateTopology();
 
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 
0) {
-            log.warn("Negative cache size passed in. Reverting to cache size 
of 0 bytes.");
+            log.warn("{} Negative cache size passed in. Reverting to cache 
size of 0 bytes.", logPrefix);
         }
 
         final long cacheSizeBytes = Math.max(0, 
config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
@@ -396,7 +403,7 @@ public class KafkaStreams {
         try {
             client.close();
         } catch (final IOException e) {
-            log.warn("Could not close StreamKafkaClient.", e);
+            log.warn("{} Could not close StreamKafkaClient.", logPrefix, e);
         }
 
     }
@@ -412,7 +419,7 @@ public class KafkaStreams {
      * @throws StreamsException if the Kafka brokers have version 0.10.0.x
      */
     public synchronized void start() throws IllegalStateException, 
StreamsException {
-        log.debug("Starting Kafka Stream process.");
+        log.debug("{} Starting Kafka Stream process.", logPrefix);
 
         if (state == State.CREATED) {
             checkBrokerVersionCompatibility();
@@ -426,7 +433,7 @@ public class KafkaStreams {
                 thread.start();
             }
 
-            log.info("Started Kafka Stream process");
+            log.info("{} Started Kafka Stream process", logPrefix);
         } else {
             throw new IllegalStateException("Cannot start again.");
         }
@@ -451,7 +458,7 @@ public class KafkaStreams {
      * before all threads stopped
      */
     public synchronized boolean close(final long timeout, final TimeUnit 
timeUnit) {
-        log.debug("Stopping Kafka Stream process.");
+        log.debug("{} Stopping Kafka Stream process.", logPrefix);
         if (state.isCreatedOrRunning()) {
             setState(State.PENDING_SHUTDOWN);
             // save the current thread so that if it is a stream thread
@@ -487,7 +494,7 @@ public class KafkaStreams {
                     }
 
                     metrics.close();
-                    log.info("Stopped Kafka Streams process.");
+                    log.info("{} Stopped Kafka Streams process.", logPrefix);
                 }
             }, "kafka-streams-close-thread");
             shutdown.setDaemon(true);
@@ -557,7 +564,8 @@ public class KafkaStreams {
         final String stateDir = 
config.getString(StreamsConfig.STATE_DIR_CONFIG);
 
         final String localApplicationDir = stateDir + File.separator + appId;
-        log.debug("Removing local Kafka Streams application data in {} for 
application {}.",
+        log.debug("{} Removing local Kafka Streams application data in {} for 
application {}.",
+            logPrefix,
             localApplicationDir,
             appId);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2dfeff06/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d053431..5e64515 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -158,8 +158,11 @@ public class StreamThread extends Thread {
     private synchronized void setState(State newState) {
         State oldState = state;
         if (!state.isValidTransition(newState)) {
-            log.warn("Unexpected state transition from " + state + " to " + 
newState);
+            log.warn("{} Unexpected state transition from {} to {}.", 
logPrefix, oldState, newState);
+        } else {
+            log.info("{} State transition from {} to {}.", logPrefix, 
oldState, newState);
         }
+
         state = newState;
         if (stateListener != null) {
             stateListener.onChange(this, state, oldState);
@@ -222,13 +225,7 @@ public class StreamThread extends Thread {
         public void onPartitionsAssigned(Collection<TopicPartition> 
assignment) {
 
             try {
-                if (state == State.PENDING_SHUTDOWN) {
-                    log.info("stream-thread [{}] New partitions [{}] assigned 
while shutting down.",
-                        StreamThread.this.getName(), assignment);
-                }
-                log.info("stream-thread [{}] New partitions [{}] assigned at 
the end of consumer rebalance.",
-                    StreamThread.this.getName(), assignment);
-
+                log.info("{} at state {}: new partitions {} assigned at the 
end of consumer rebalance.", logPrefix, state, assignment);
                 setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
                 // do this first as we may have suspended standby tasks that
                 // will become active or vice versa
@@ -248,12 +245,7 @@ public class StreamThread extends Thread {
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) 
{
             try {
-                if (state == State.PENDING_SHUTDOWN) {
-                    log.info("stream-thread [{}] New partitions [{}] revoked 
while shutting down.",
-                             StreamThread.this.getName(), assignment);
-                }
-                log.info("stream-thread [{}] partitions [{}] revoked at the 
beginning of consumer rebalance.",
-                         StreamThread.this.getName(), assignment);
+                log.info("{} at state {}: partitions {} revoked at the 
beginning of consumer rebalance.", logPrefix, state, assignment);
                 setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
                 lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until 
partitions are assigned
                 // suspend active tasks
@@ -348,7 +340,7 @@ public class StreamThread extends Thread {
         this.timerStartedMs = time.milliseconds();
         this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start 
until partition assignment
         this.lastCommitMs = timerStartedMs;
-        setState(state.RUNNING);
+        setState(State.RUNNING);
     }
 
     public void partitionAssignor(StreamPartitionAssignor partitionAssignor) {
@@ -417,7 +409,7 @@ public class StreamThread extends Thread {
         try {
             partitionAssignor.close();
         } catch (Throwable e) {
-            log.error("stream-thread [{}] Failed to close KafkaStreamClient: 
", this.getName(), e);
+            log.error("{} Failed to close KafkaStreamClient: ", logPrefix, e);
         }
 
         removeStreamTasks();
@@ -444,7 +436,7 @@ public class StreamThread extends Thread {
 
     @SuppressWarnings("ThrowableNotThrown")
     private void shutdownTasksAndState() {
-        log.debug("{} shutdownTasksAndState: shutting down all active tasks 
[{}] and standby tasks [{}]", logPrefix,
+        log.debug("{} shutdownTasksAndState: shutting down all active tasks {} 
and standby tasks {}", logPrefix,
             activeTasks.keySet(), standbyTasks.keySet());
 
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
@@ -469,7 +461,7 @@ public class StreamThread extends Thread {
      * soon the tasks will be assigned again
      */
     private void suspendTasksAndState()  {
-        log.debug("{} suspendTasksAndState: suspending all active tasks [{}] 
and standby tasks [{}]", logPrefix,
+        log.debug("{} suspendTasksAndState: suspending all active tasks {} and 
standby tasks {}", logPrefix,
             activeTasks.keySet(), standbyTasks.keySet());
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
         // Close all topology nodes
@@ -762,7 +754,7 @@ public class StreamThread extends Thread {
      * Commit the states of all its tasks
      */
     private void commitAll() {
-        log.trace("stream-thread [{}] Committing all its owned tasks", 
this.getName());
+        log.trace("{} Committing all its owned tasks", logPrefix);
         for (StreamTask task : activeTasks.values()) {
             commitOne(task);
         }
@@ -828,7 +820,7 @@ public class StreamThread extends Thread {
     }
 
     protected StreamTask createStreamTask(TaskId id, 
Collection<TopicPartition> partitions) {
-        log.info("{} Creating active task {} with assigned partitions [{}]", 
logPrefix, id, partitions);
+        log.info("{} Creating active task {} with assigned partitions {}", 
logPrefix, id, partitions);
 
         streamsMetrics.taskCreatedSensor.record();
 
@@ -941,7 +933,7 @@ public class StreamThread extends Thread {
     }
 
     StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> 
partitions) {
-        log.info("{} Creating new standby task {} with assigned partitions 
[{}]", logPrefix, id, partitions);
+        log.info("{} Creating new standby task {} with assigned partitions 
{}", logPrefix, id, partitions);
 
         streamsMetrics.taskCreatedSensor.record();
 
@@ -1012,14 +1004,14 @@ public class StreamThread extends Thread {
     }
 
     private void updateSuspendedTasks() {
-        log.info("{} Updating suspended tasks to contain active tasks [{}]", 
logPrefix, activeTasks.keySet());
+        log.info("{} Updating suspended tasks to contain active tasks {}", 
logPrefix, activeTasks.keySet());
         suspendedTasks.clear();
         suspendedTasks.putAll(activeTasks);
         suspendedStandbyTasks.putAll(standbyTasks);
     }
 
     private void removeStreamTasks() {
-        log.info("{} Removing all active tasks [{}]", logPrefix, 
activeTasks.keySet());
+        log.info("{} Removing all active tasks {}", logPrefix, 
activeTasks.keySet());
 
         try {
             prevTasks.clear();
@@ -1034,7 +1026,7 @@ public class StreamThread extends Thread {
     }
 
     private void removeStandbyTasks() {
-        log.info("{} Removing all standby tasks [{}]", logPrefix, 
standbyTasks.keySet());
+        log.info("{} Removing all standby tasks {}", logPrefix, 
standbyTasks.keySet());
 
         standbyTasks.clear();
         standbyTasksByPartition.clear();

Reply via email to