Repository: kafka Updated Branches: refs/heads/trunk e5e88f636 -> 89faed8d3
MINOR: enforce setting listeners in CREATE state. Author: Bill Bejeck <[email protected]> Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>, Damian Guy <[email protected]> Closes #3569 from bbejeck/MINOR_enforce_adding_listeners_only_created_state Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89faed8d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89faed8d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89faed8d Branch: refs/heads/trunk Commit: 89faed8d30eb441fac7f1564edd40115006f5051 Parents: e5e88f6 Author: Bill Bejeck <[email protected]> Authored: Wed Jul 26 09:08:14 2017 +0100 Committer: Damian Guy <[email protected]> Committed: Wed Jul 26 09:08:14 2017 +0100 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 26 ++++++++++++++----- .../apache/kafka/streams/KafkaStreamsTest.java | 27 ++++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/89faed8d/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index c7c67d5..f06a7e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -74,6 +74,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; +import static org.apache.kafka.streams.KafkaStreams.State.CREATED; import static org.apache.kafka.streams.KafkaStreams.State.ERROR; import static org.apache.kafka.streams.KafkaStreams.State.NOT_RUNNING; import static org.apache.kafka.streams.KafkaStreams.State.PENDING_SHUTDOWN; @@ -228,10 +229,17 @@ public class KafkaStreams { /** * An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes. + * * @param listener a new state listener */ public void setStateListener(final KafkaStreams.StateListener listener) { - stateListener = listener; + synchronized (stateLock) { + if (state == CREATED) { + stateListener = listener; + } else { + throw new IllegalStateException("Can only set StateListener in CREATED state."); + } + } } /** @@ -737,12 +745,18 @@ public class KafkaStreams { * @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler */ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) { - for (final StreamThread thread : threads) { - thread.setUncaughtExceptionHandler(eh); - } + synchronized (stateLock) { + if (state == CREATED) { + for (final StreamThread thread : threads) { + thread.setUncaughtExceptionHandler(eh); + } - if (globalStreamThread != null) { - globalStreamThread.setUncaughtExceptionHandler(eh); + if (globalStreamThread != null) { + globalStreamThread.setUncaughtExceptionHandler(eh); + } + } else { + throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state."); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/89faed8d/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 467f8b8..e3443fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @Category({IntegrationTest.class}) public class KafkaStreamsTest { @@ -255,6 +256,32 @@ public class KafkaStreamsTest { } @Test + public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() { + streams.start(); + try { + streams.setUncaughtExceptionHandler(null); + fail("Should throw IllegalStateException"); + } catch (final IllegalStateException e) { + Assert.assertEquals("Can only set UncaughtExceptionHandler in CREATED state.", e.getMessage()); + } finally { + streams.close(); + } + } + + @Test + public void shouldThrowExceptionSettingStateListenerNotInCreateState() { + streams.start(); + try { + streams.setStateListener(null); + fail("Should throw IllegalStateException"); + } catch (final IllegalStateException e) { + Assert.assertEquals("Can only set StateListener in CREATED state.", e.getMessage()); + } finally { + streams.close(); + } + } + + @Test public void testNumberDefaultMetrics() { final KafkaStreams streams = createKafkaStreams(); final Map<MetricName, ? extends Metric> metrics = streams.metrics();
