Repository: kafka
Updated Branches:
  refs/heads/trunk 212bce6e3 -> 3f155eaa2


MINOR: Log encountered exception during rebalance

Some other minor changes:

1. Do not throw the exception form callback as it would only be swallowed by 
consumer coordinator; remembering it and re-throw in the next loop is good 
enough.
2. Change Creating to Defining in Stores to avoid confusions that the stores 
have already been successfully created at that time.
3. Do not need unAssignChangeLogPartitions as the restore consumer will be 
unassigned already inside changelog reader.

Author: Guozhang Wang <wangg...@gmail.com>

Reviewers: Matthias J. Sax <matth...@confluent.io>, Damian Guy 
<damian....@gmail.com>

Closes #3769 from guozhangwang/KMinor-logging-before-throwing


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f155eaa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f155eaa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f155eaa

Branch: refs/heads/trunk
Commit: 3f155eaa23c6081e4afa3b49f0f8a65a16b8e05c
Parents: 212bce6
Author: Guozhang Wang <wangg...@gmail.com>
Authored: Wed Sep 6 18:38:08 2017 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Wed Sep 6 18:38:08 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 21 ++--------
 .../org/apache/kafka/streams/state/Stores.java  |  4 +-
 .../processor/internals/StreamThreadTest.java   | 41 ++++++--------------
 3 files changed, 17 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3f155eaa/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 818992e..6a45ef7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -256,14 +256,11 @@ public class StreamThread extends Thread implements 
ThreadDataProvider {
                     return;
                 }
                 taskManager.createTasks(assignment);
-                final RuntimeException exception = 
streamThread.unAssignChangeLogPartitions();
-                if (exception != null) {
-                    throw exception;
-                }
                 streamThread.refreshMetadataState();
             } catch (final Throwable t) {
+                log.error("{} Error caught during partition assignment, " +
+                        "will abort the current process and re-throw at the 
end of rebalance: {}", logPrefix, t.getMessage());
                 streamThread.setRebalanceException(t);
-                throw t;
             } finally {
                 log.info("{} partition assignment took {} ms.\n" +
                                  "\tcurrent active tasks: {}\n" +
@@ -294,8 +291,9 @@ public class StreamThread extends Thread implements 
ThreadDataProvider {
                     // suspend active tasks
                     taskManager.suspendTasksAndState();
                 } catch (final Throwable t) {
+                    log.error("{} Error caught during partition revocation, " +
+                            "will abort the current process and re-throw at 
the end of rebalance: {}", logPrefix, t.getMessage());
                     streamThread.setRebalanceException(t);
-                    throw t;
                 } finally {
                     streamThread.refreshMetadataState();
                     streamThread.clearStandbyRecords();
@@ -1163,17 +1161,6 @@ public class StreamThread extends Thread implements 
ThreadDataProvider {
         log.info("{} Shutdown complete", logPrefix);
     }
 
-    private RuntimeException unAssignChangeLogPartitions() {
-        try {
-            // un-assign the change log partitions
-            restoreConsumer.assign(Collections.<TopicPartition>emptyList());
-        } catch (final RuntimeException e) {
-            log.error("{} Failed to un-assign change log partitions due to the 
following error:", logPrefix, e);
-            return e;
-        }
-        return null;
-    }
-
     private void clearStandbyRecords() {
         standbyRecords.clear();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f155eaa/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index fef4ade..3cf22c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -90,7 +90,7 @@ public class Stores {
 
                                     @Override
                                     public StateStoreSupplier build() {
-                                        log.trace("Creating InMemory Store 
name={} capacity={} logged={}", name, capacity, logged);
+                                        log.trace("Defining InMemory Store 
name={} capacity={} logged={}", name, capacity, logged);
                                         if (capacity < Integer.MAX_VALUE) {
                                             return new 
InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, 
logConfig);
                                         }
@@ -154,7 +154,7 @@ public class Stores {
 
                                     @Override
                                     public StateStoreSupplier build() {
-                                        log.trace("Creating RocksDb Store 
name={} numSegments={} logged={}", name, numSegments, logged);
+                                        log.trace("Defining RocksDb Store 
name={} numSegments={} logged={}", name, numSegments, logged);
                                         if (sessionWindows) {
                                             return new 
RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, 
logged, logConfig, cachingEnabled);
                                         } else if (numSegments > 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f155eaa/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 51e6568..b82c1ac 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
@@ -72,7 +71,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 public class StreamThreadTest {
 
@@ -105,25 +103,11 @@ public class StreamThreadTest {
     private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
     private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
 
-    private final List<PartitionInfo> infos = Arrays.asList(
-        new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new 
Node[0]),
-        new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new 
Node[0]),
-        new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new 
Node[0]),
-        new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new 
Node[0]),
-        new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new 
Node[0]),
-        new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new 
Node[0]),
-        new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new 
Node[0]),
-        new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new 
Node[0]),
-        new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
-    );
-
-
     // task0 is unused
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
-    private final TaskId task3 = new TaskId(0, 3);
-    private final TaskId task4 = new TaskId(1, 1);
-    private final TaskId task5 = new TaskId(1, 2);
+    private final TaskId task3 = new TaskId(1, 1);
+    private final TaskId task4 = new TaskId(1, 2);
 
     private Properties configProps(final boolean enableEos) {
         return new Properties() {
@@ -281,10 +265,10 @@ public class StreamThreadTest {
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
         thread.runOnce(-1);
 
+        assertTrue(thread.tasks().containsKey(task3));
         assertTrue(thread.tasks().containsKey(task4));
-        assertTrue(thread.tasks().containsKey(task5));
-        assertEquals(expectedGroup1, thread.tasks().get(task4).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task5).partitions());
+        assertEquals(expectedGroup1, thread.tasks().get(task3).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
         assertEquals(2, thread.tasks().size());
 
         // revoke four partitions and assign three partitions of both 
subtopologies
@@ -300,9 +284,9 @@ public class StreamThreadTest {
         thread.runOnce(-1);
 
         assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task4));
+        assertTrue(thread.tasks().containsKey(task3));
         assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(task3).partitions());
         assertEquals(2, thread.tasks().size());
 
         // revoke all three partitons and reassign the same three partitions 
(from different subtopologies)
@@ -315,9 +299,9 @@ public class StreamThreadTest {
         thread.runOnce(-1);
 
         assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task4));
+        assertTrue(thread.tasks().containsKey(task3));
         assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(task3).partitions());
         assertEquals(2, thread.tasks().size());
 
         // revoke all partitions and assign nothing
@@ -905,11 +889,8 @@ public class StreamThreadTest {
 
         thread.rebalanceListener.onPartitionsRevoked(null);
         clientSupplier.producers.get(0).fenceProducer();
-        try {
-            thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
-            thread.runOnce(-1);
-            fail("should have thrown " + 
ProducerFencedException.class.getSimpleName());
-        } catch (final ProducerFencedException e) { }
+        thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        thread.runOnce(-1);
 
         assertTrue(thread.tasks().isEmpty());
     }

Reply via email to