Repository: kafka Updated Branches: refs/heads/trunk 212bce6e3 -> 3f155eaa2
MINOR: Log encountered exception during rebalance Some other minor changes: 1. Do not throw the exception form callback as it would only be swallowed by consumer coordinator; remembering it and re-throw in the next loop is good enough. 2. Change Creating to Defining in Stores to avoid confusions that the stores have already been successfully created at that time. 3. Do not need unAssignChangeLogPartitions as the restore consumer will be unassigned already inside changelog reader. Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Matthias J. Sax <matth...@confluent.io>, Damian Guy <damian....@gmail.com> Closes #3769 from guozhangwang/KMinor-logging-before-throwing Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f155eaa Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f155eaa Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f155eaa Branch: refs/heads/trunk Commit: 3f155eaa23c6081e4afa3b49f0f8a65a16b8e05c Parents: 212bce6 Author: Guozhang Wang <wangg...@gmail.com> Authored: Wed Sep 6 18:38:08 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Sep 6 18:38:08 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/StreamThread.java | 21 ++-------- .../org/apache/kafka/streams/state/Stores.java | 4 +- .../processor/internals/StreamThreadTest.java | 41 ++++++-------------- 3 files changed, 17 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3f155eaa/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 818992e..6a45ef7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -256,14 +256,11 @@ public class StreamThread extends Thread implements ThreadDataProvider { return; } taskManager.createTasks(assignment); - final RuntimeException exception = streamThread.unAssignChangeLogPartitions(); - if (exception != null) { - throw exception; - } streamThread.refreshMetadataState(); } catch (final Throwable t) { + log.error("{} Error caught during partition assignment, " + + "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage()); streamThread.setRebalanceException(t); - throw t; } finally { log.info("{} partition assignment took {} ms.\n" + "\tcurrent active tasks: {}\n" + @@ -294,8 +291,9 @@ public class StreamThread extends Thread implements ThreadDataProvider { // suspend active tasks taskManager.suspendTasksAndState(); } catch (final Throwable t) { + log.error("{} Error caught during partition revocation, " + + "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage()); streamThread.setRebalanceException(t); - throw t; } finally { streamThread.refreshMetadataState(); streamThread.clearStandbyRecords(); @@ -1163,17 +1161,6 @@ public class StreamThread extends Thread implements ThreadDataProvider { log.info("{} Shutdown complete", logPrefix); } - private RuntimeException unAssignChangeLogPartitions() { - try { - // un-assign the change log partitions - restoreConsumer.assign(Collections.<TopicPartition>emptyList()); - } catch (final RuntimeException e) { - log.error("{} Failed to un-assign change log partitions due to the following error:", logPrefix, e); - return e; - } - return null; - } - private void clearStandbyRecords() { standbyRecords.clear(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3f155eaa/streams/src/main/java/org/apache/kafka/streams/state/Stores.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index fef4ade..3cf22c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -90,7 +90,7 @@ public class Stores { @Override public StateStoreSupplier build() { - log.trace("Creating InMemory Store name={} capacity={} logged={}", name, capacity, logged); + log.trace("Defining InMemory Store name={} capacity={} logged={}", name, capacity, logged); if (capacity < Integer.MAX_VALUE) { return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig); } @@ -154,7 +154,7 @@ public class Stores { @Override public StateStoreSupplier build() { - log.trace("Creating RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged); + log.trace("Defining RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged); if (sessionWindows) { return new RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, logged, logConfig, cachingEnabled); } else if (numSegments > 0) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3f155eaa/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 51e6568..b82c1ac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; @@ -72,7 +71,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class StreamThreadTest { @@ -105,25 +103,11 @@ public class StreamThreadTest { private final TopicPartition t3p1 = new TopicPartition("topic3", 1); private final TopicPartition t3p2 = new TopicPartition("topic3", 2); - private final List<PartitionInfo> infos = Arrays.asList( - new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]) - ); - - // task0 is unused private final TaskId task1 = new TaskId(0, 1); private final TaskId task2 = new TaskId(0, 2); - private final TaskId task3 = new TaskId(0, 3); - private final TaskId task4 = new TaskId(1, 1); - private final TaskId task5 = new TaskId(1, 2); + private final TaskId task3 = new TaskId(1, 1); + private final TaskId task4 = new TaskId(1, 2); private Properties configProps(final boolean enableEos) { return new Properties() { @@ -281,10 +265,10 @@ public class StreamThreadTest { rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.runOnce(-1); + assertTrue(thread.tasks().containsKey(task3)); assertTrue(thread.tasks().containsKey(task4)); - assertTrue(thread.tasks().containsKey(task5)); - assertEquals(expectedGroup1, thread.tasks().get(task4).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(task5).partitions()); + assertEquals(expectedGroup1, thread.tasks().get(task3).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(task4).partitions()); assertEquals(2, thread.tasks().size()); // revoke four partitions and assign three partitions of both subtopologies @@ -300,9 +284,9 @@ public class StreamThreadTest { thread.runOnce(-1); assertTrue(thread.tasks().containsKey(task1)); - assertTrue(thread.tasks().containsKey(task4)); + assertTrue(thread.tasks().containsKey(task3)); assertEquals(expectedGroup1, thread.tasks().get(task1).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(task4).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(task3).partitions()); assertEquals(2, thread.tasks().size()); // revoke all three partitons and reassign the same three partitions (from different subtopologies) @@ -315,9 +299,9 @@ public class StreamThreadTest { thread.runOnce(-1); assertTrue(thread.tasks().containsKey(task1)); - assertTrue(thread.tasks().containsKey(task4)); + assertTrue(thread.tasks().containsKey(task3)); assertEquals(expectedGroup1, thread.tasks().get(task1).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(task4).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(task3).partitions()); assertEquals(2, thread.tasks().size()); // revoke all partitions and assign nothing @@ -905,11 +889,8 @@ public class StreamThreadTest { thread.rebalanceListener.onPartitionsRevoked(null); clientSupplier.producers.get(0).fenceProducer(); - try { - thread.rebalanceListener.onPartitionsAssigned(task0Assignment); - thread.runOnce(-1); - fail("should have thrown " + ProducerFencedException.class.getSimpleName()); - } catch (final ProducerFencedException e) { } + thread.rebalanceListener.onPartitionsAssigned(task0Assignment); + thread.runOnce(-1); assertTrue(thread.tasks().isEmpty()); }