Repository: kafka
Updated Branches:
  refs/heads/1.0 aa1e4c235 -> 12c77b4b0


http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index e75ef5b..619ee96 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -143,6 +143,20 @@ public class SessionWindowedKStreamImplTest {
 
     @SuppressWarnings("unchecked")
     @Test
+    public void shouldMaterializeWithoutSpecifyingSerdes() {
+        stream.count(Materialized.<String, Long, SessionStore<Bytes, 
byte[]>>as("count-store"));
+
+        processData();
+        final SessionStore<String, Long> store = (SessionStore<String, Long>) 
driver.allStateStores().get("count-store");
+        final List<KeyValue<Windowed<String>, Long>> data = 
StreamsTestUtils.toList(store.fetch("1", "2"));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 
2L),
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 
600)), 1L),
+                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 
600)), 1L))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
     public void shouldMaterializeReduced() {
         stream.reduce(MockReducer.STRING_ADDER,
                       Materialized.<String, String, SessionStore<Bytes, 
byte[]>>as("reduced")
@@ -249,7 +263,7 @@ public class SessionWindowedKStreamImplTest {
     }
 
     private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), 
Serdes.String(), 0);
+        driver.setUp(builder, TestUtils.tempDirectory(), 0);
         driver.setTime(10);
         driver.process(TOPIC, "1", "1");
         driver.setTime(15);

http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 93bcf33..286a823 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -227,7 +227,7 @@ public class TimeWindowedKStreamImplTest {
     }
 
     private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), 
Serdes.String(), 0);
+        driver.setUp(builder, TestUtils.tempDirectory(), 0);
         driver.setTime(10);
         driver.process(TOPIC, "1", "1");
         driver.setTime(15);

http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 21a5d57..9ba86ac 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
 import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.StateStore;
@@ -32,20 +33,30 @@ import 
org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
 import org.hamcrest.CoreMatchers;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
 
-
+@RunWith(EasyMockRunner.class)
 public class KeyValueStoreMaterializerTest {
 
+    private final String storePrefix = "prefix";
+    @Mock(type = MockType.NICE)
+    private InternalNameProvider nameProvider;
+
     @Test
     public void 
shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("store"));
+                = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("store"),
+                                             nameProvider,
+                                             storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -60,7 +71,7 @@ public class KeyValueStoreMaterializerTest {
     public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized
                 = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("store")
-                                                     .withCachingDisabled());
+                                                     .withCachingDisabled(), 
nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -72,7 +83,7 @@ public class KeyValueStoreMaterializerTest {
     public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized
                 = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("store")
-                                                     .withLoggingDisabled());
+                                                     .withLoggingDisabled(), 
nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -86,7 +97,7 @@ public class KeyValueStoreMaterializerTest {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized
                 = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("store")
                                                      .withCachingDisabled()
-                                                     .withLoggingDisabled());
+                                                     .withLoggingDisabled(), 
nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -104,7 +115,7 @@ public class KeyValueStoreMaterializerTest {
         EasyMock.replay(supplier);
 
         final MaterializedInternal<String, Integer, KeyValueStore<Bytes, 
byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, 
Integer>as(supplier));
+                = new MaterializedInternal<>(Materialized.<String, 
Integer>as(supplier), nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, Integer> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, Integer>> builder = 
materializer.materialize();
         final KeyValueStore<String, Integer> built = builder.build();

Reply via email to