Repository: kafka Updated Branches: refs/heads/1.0 aa1e4c235 -> 12c77b4b0
http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index e75ef5b..619ee96 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -143,6 +143,20 @@ public class SessionWindowedKStreamImplTest { @SuppressWarnings("unchecked") @Test + public void shouldMaterializeWithoutSpecifyingSerdes() { + stream.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("count-store")); + + processData(); + final SessionStore<String, Long> store = (SessionStore<String, Long>) driver.allStateStores().get("count-store"); + final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2")); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L), + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L), + KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L)))); + } + + @SuppressWarnings("unchecked") + @Test public void shouldMaterializeReduced() { stream.reduce(MockReducer.STRING_ADDER, Materialized.<String, String, SessionStore<Bytes, byte[]>>as("reduced") @@ -249,7 +263,7 @@ public class SessionWindowedKStreamImplTest { } private void processData() { - driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0); + driver.setUp(builder, TestUtils.tempDirectory(), 0); driver.setTime(10); driver.process(TOPIC, "1", "1"); driver.setTime(15); http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java index 93bcf33..286a823 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java @@ -227,7 +227,7 @@ public class TimeWindowedKStreamImplTest { } private void processData() { - driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0); + driver.setUp(builder, TestUtils.tempDirectory(), 0); driver.setTime(10); driver.process(TOPIC, "1", "1"); driver.setTime(15); http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java index 21a5d57..9ba86ac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.internals.InternalNameProvider; import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.StateStore; @@ -32,20 +33,30 @@ import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; import org.hamcrest.CoreMatchers; import org.junit.Test; +import org.junit.runner.RunWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsNot.not; - +@RunWith(EasyMockRunner.class) public class KeyValueStoreMaterializerTest { + private final String storePrefix = "prefix"; + @Mock(type = MockType.NICE) + private InternalNameProvider nameProvider; + @Test public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() { final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")); + = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"), + nameProvider, + storePrefix); final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); @@ -60,7 +71,7 @@ public class KeyValueStoreMaterializerTest { public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() { final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store") - .withCachingDisabled()); + .withCachingDisabled(), nameProvider, storePrefix); final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); @@ -72,7 +83,7 @@ public class KeyValueStoreMaterializerTest { public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() { final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store") - .withLoggingDisabled()); + .withLoggingDisabled(), nameProvider, storePrefix); final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); @@ -86,7 +97,7 @@ public class KeyValueStoreMaterializerTest { final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store") .withCachingDisabled() - .withLoggingDisabled()); + .withLoggingDisabled(), nameProvider, storePrefix); final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); @@ -104,7 +115,7 @@ public class KeyValueStoreMaterializerTest { EasyMock.replay(supplier); final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, Integer>as(supplier)); + = new MaterializedInternal<>(Materialized.<String, Integer>as(supplier), nameProvider, storePrefix); final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize(); final KeyValueStore<String, Integer> built = builder.build();