This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new da6063b MINOR: Add unit tests to verify setting of serdes in timestamped key-value store (#6825) da6063b is described below commit da6063be4ff2499ea2c65fec2a7975e0cf48e7b7 Author: cadonna <br...@confluent.io> AuthorDate: Wed May 29 15:25:07 2019 +0200 MINOR: Add unit tests to verify setting of serdes in timestamped key-value store (#6825) Guozhang Wang <wangg...@gmail.com>, Bill Bejeck <bbej...@gmail.com> --- .../MeteredTimestampedKeyValueStoreTest.java | 52 ++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 0f60d24..606428a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -21,10 +21,12 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; @@ -57,6 +59,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class MeteredTimestampedKeyValueStoreTest { @@ -254,4 +257,53 @@ public class MeteredTimestampedKeyValueStoreTest { return this.metrics.metric(metricName); } + @Test + @SuppressWarnings("unchecked") + public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() { + expect(context.keySerde()).andStubReturn((Serde) Serdes.String()); + expect(context.valueSerde()).andStubReturn((Serde) Serdes.Long()); + final MeteredTimestampedKeyValueStore<String, Long> store = new MeteredTimestampedKeyValueStore<>( + inner, + "scope", + new MockTime(), + null, + null + ); + replay(inner, context); + store.init(context, inner); + + try { + store.put("key", ValueAndTimestamp.make(42L, 60000)); + } catch (final StreamsException exception) { + if (exception.getCause() instanceof ClassCastException) { + fail("Serdes are not correctly set from processor context."); + } + throw exception; + } + } + + @Test + @SuppressWarnings("unchecked") + public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() { + expect(context.keySerde()).andStubReturn((Serde) Serdes.String()); + expect(context.valueSerde()).andStubReturn((Serde) Serdes.Long()); + final MeteredTimestampedKeyValueStore<String, Long> store = new MeteredTimestampedKeyValueStore<>( + inner, + "scope", + new MockTime(), + Serdes.String(), + new ValueAndTimestampSerde<>(Serdes.Long()) + ); + replay(inner, context); + store.init(context, inner); + + try { + store.put("key", ValueAndTimestamp.make(42L, 60000)); + } catch (final StreamsException exception) { + if (exception.getCause() instanceof ClassCastException) { + fail("Serdes are not correctly set from constructor parameters."); + } + throw exception; + } + } } \ No newline at end of file