Repository: kafka Updated Branches: refs/heads/trunk f4d1305bf -> f8498ec9e
MINOR: updated configs to use one try/catch for serdes removed `try/catch` from `keySerde` and `valueSerde` methods so only the `try\catch` blocks in `defaultKeySerde` and `defaultValueSerde` perform error handling resulting in correct error message. Author: Bill Bejeck <[email protected]> Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]> Closes #3568 from bbejeck/MINOR_ensure_correct_error_messages_for_configs Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f8498ec9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f8498ec9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f8498ec9 Branch: refs/heads/trunk Commit: f8498ec9e27ca0f08e3791d7a19ad8c6a97e210f Parents: f4d1305 Author: Bill Bejeck <[email protected]> Authored: Wed Jul 26 12:58:33 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Jul 26 12:58:33 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/StreamsConfig.java | 20 +++---- .../apache/kafka/streams/StreamsConfigTest.java | 55 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f8498ec9/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 94ad87b..e4869ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -746,11 +746,7 @@ public class StreamsConfig extends AbstractConfig { */ @Deprecated public Serde keySerde() { - try { - return defaultKeySerde(); - } catch (final Exception e) { - throw new StreamsException(String.format("Failed to configure key serde %s", get(KEY_SERDE_CLASS_CONFIG)), e); - } + return defaultKeySerde(); } /** @@ -760,16 +756,18 @@ public class StreamsConfig extends AbstractConfig { * @return an configured instance of key Serde class */ public Serde defaultKeySerde() { + Object keySerdeConfigSetting = get(KEY_SERDE_CLASS_CONFIG); try { Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class); if (serde == null) { + keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG); serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class); } serde.configure(originals(), true); return serde; } catch (final Exception e) { throw new StreamsException( - String.format("Failed to configure key serde %s", get(DEFAULT_KEY_SERDE_CLASS_CONFIG)), e); + String.format("Failed to configure key serde %s", keySerdeConfigSetting), e); } } @@ -781,11 +779,7 @@ public class StreamsConfig extends AbstractConfig { */ @Deprecated public Serde valueSerde() { - try { - return defaultValueSerde(); - } catch (final Exception e) { - throw new StreamsException(String.format("Failed to configure value serde %s", get(VALUE_SERDE_CLASS_CONFIG)), e); - } + return defaultValueSerde(); } /** @@ -795,9 +789,11 @@ public class StreamsConfig extends AbstractConfig { * @return an configured instance of value Serde class */ public Serde defaultValueSerde() { + Object valueSerdeConfigSetting = get(VALUE_SERDE_CLASS_CONFIG); try { Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class); if (serde == null) { + valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG); serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class); } serde.configure(originals(), false); @@ -805,7 +801,7 @@ public class StreamsConfig extends AbstractConfig { return serde; } catch (final Exception e) { throw new StreamsException( - String.format("Failed to configure value serde %s", get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)), e); + String.format("Failed to configure value serde %s", valueSerdeConfigSetting), e); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f8498ec9/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 9f0f67a..3bbd69e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -50,6 +50,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class StreamsConfigTest { @@ -428,6 +429,60 @@ public class StreamsConfigTest { assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp); } + @Test + public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() { + final Properties props = minimalStreamsConfig(); + props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); + final StreamsConfig config = new StreamsConfig(props); + try { + config.keySerde(); + fail("Test should throw a StreamsException"); + } catch (StreamsException e) { + assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage()); + } + } + + @Test + public void shouldSpecifyCorrectKeySerdeClassOnError() { + final Properties props = minimalStreamsConfig(); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); + final StreamsConfig config = new StreamsConfig(props); + try { + config.keySerde(); + fail("Test should throw a StreamsException"); + } catch (StreamsException e) { + assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage()); + } + } + + @Test + public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() { + final Properties props = minimalStreamsConfig(); + props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); + final StreamsConfig config = new StreamsConfig(props); + try { + config.valueSerde(); + fail("Test should throw a StreamsException"); + } catch (StreamsException e) { + assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage()); + } + } + + @Test + public void shouldSpecifyCorrectValueSerdeClassOnError() { + final Properties props = minimalStreamsConfig(); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); + final StreamsConfig config = new StreamsConfig(props); + try { + config.valueSerde(); + fail("Test should throw a StreamsException"); + } catch (StreamsException e) { + assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage()); + } + } + + + static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) {
