KAFKA-3595: window stores use compact,delete config for changelogs

changelogs of window stores now configure cleanup.policy=compact,delete with 
retention.ms set to window maintainMs + 
StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
StoreChangeLogger produces messages with context.timestamp().

Author: Damian Guy <damian....@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #1792 from dguy/kafka-3595


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69ebf6f7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69ebf6f7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69ebf6f7

Branch: refs/heads/trunk
Commit: 69ebf6f7be2fc0e471ebd5b7a166468017ff2651
Parents: eba0ede
Author: Damian Guy <damian....@gmail.com>
Authored: Wed Sep 7 18:02:24 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Wed Sep 7 18:02:24 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java |  12 +-
 .../streams/kstream/internals/KTableImpl.java   |  11 +-
 .../kstream/internals/KTableStoreSupplier.java  |  58 ----
 .../streams/processor/StateStoreSupplier.java   |  16 +
 .../streams/processor/TimestampExtractor.java   |   8 +-
 .../streams/processor/TopologyBuilder.java      |  71 +++--
 .../internals/InternalTopicConfig.java          | 110 +++++++
 .../internals/InternalTopicManager.java         |  65 ++--
 .../internals/ProcessorContextImpl.java         |  56 ++--
 .../internals/StreamPartitionAssignor.java      | 106 ++++---
 .../streams/processor/internals/StreamTask.java |   7 +-
 .../org/apache/kafka/streams/state/Stores.java  |  87 +++++-
 .../state/internals/AbstractStoreSupplier.java  |  53 ++++
 .../InMemoryKeyValueStoreSupplier.java          |  24 +-
 .../InMemoryLRUCacheStoreSupplier.java          |  29 +-
 .../internals/RocksDBKeyValueStoreSupplier.java |  29 +-
 .../internals/RocksDBWindowStoreSupplier.java   |  29 +-
 .../state/internals/StoreChangeLogger.java      |   2 +-
 .../InternalTopicIntegrationTest.java           | 110 +++++--
 .../streams/processor/TopologyBuilderTest.java  | 101 +++++-
 .../internals/InternalTopicConfigTest.java      | 122 ++++++++
 .../internals/StreamPartitionAssignorTest.java  |   8 +-
 .../apache/kafka/streams/state/StoresTest.java  |  84 +++++
 .../internals/AbstractKeyValueStoreTest.java    | 312 +++++++++----------
 .../internals/InMemoryLRUCacheStoreTest.java    |  82 +++--
 .../state/internals/RocksDBWindowStoreTest.java |  10 +-
 .../state/internals/StateStoreTestUtils.java    |   8 +-
 .../state/internals/StoreChangeLoggerTest.java  |   4 +-
 .../kafka/test/MockStateStoreSupplier.java      |  12 +
 29 files changed, 1094 insertions(+), 532 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 41498cf..e972887 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -128,9 +128,13 @@ public class StreamsConfig extends AbstractConfig {
     /** <code>client.id</code> */
     public static final String CLIENT_ID_CONFIG = 
CommonClientConfigs.CLIENT_ID_CONFIG;
 
+    /** <code>rocksdb.config.setter</code> */
     public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = 
"rocksdb.config.setter";
     public static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB 
config setter class that implements the <code>RocksDBConfigSetter</code> 
interface";
 
+    /** <code>windowstore.changelog.additional.retention.ms</code> */
+    public static final String 
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = 
"windowstore.changelog.additional.retention.ms";
+    public static final String 
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows 
maintainMs to ensure data is not deleted from the log prematurely. Allows for 
clock drift. Default is 1 day";
 
 
     static {
@@ -238,7 +242,12 @@ public class StreamsConfig extends AbstractConfig {
                                         Type.CLASS,
                                         null,
                                         Importance.LOW,
-                                        ROCKSDB_CONFIG_SETTER_CLASS_DOC);
+                                        ROCKSDB_CONFIG_SETTER_CLASS_DOC)
+                                
.define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
+                                        Type.LONG,
+                                        24 * 60 * 60 * 1000,
+                                        Importance.MEDIUM,
+                                        
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC);
     }
 
     // this is the list of configs for underlying clients
@@ -325,6 +334,7 @@ public class StreamsConfig extends AbstractConfig {
         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));
         props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
StreamPartitionAssignor.class.getName());
+        
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 
getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
         if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals(""))
             props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
getString(ZOOKEEPER_CONNECT_CONFIG));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 2f36183..6c73b11 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -33,10 +33,12 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 
@@ -416,10 +418,13 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K> implements KTable<K,
     public void materialize(KTableSource<K, ?> source) {
         synchronized (source) {
             if (!source.isMaterialized()) {
-                StateStoreSupplier storeSupplier =
-                        new KTableStoreSupplier<>(source.storeName, keySerde, 
valSerde, null);
+                StateStoreSupplier storeSupplier = new 
RocksDBKeyValueStoreSupplier<>(source.storeName,
+                                                                               
       keySerde,
+                                                                               
       valSerde,
+                                                                               
       false,
+                                                                               
       Collections.<String, String>emptyMap());
                 // mark this state as non internal hence it is read directly 
from a user topic
-                topology.addStateStore(storeSupplier, false, name);
+                topology.addStateStore(storeSupplier, name);
                 source.materialize();
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
deleted file mode 100644
index ff118da..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
-import org.apache.kafka.streams.state.internals.RocksDBStore;
-
-/**
- * A KTable storage. It stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- */
-public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
-
-    private final String name;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-    private final Time time;
-
-    protected KTableStoreSupplier(String name,
-                                  Serde<K> keySerde,
-                                  Serde<V> valueSerde,
-                                  Time time) {
-        this.name = name;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-        this.time = time;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public StateStore get() {
-        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, keySerde, 
valueSerde), "rocksdb-state", time);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
index f2ae020..d3b0a1b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.processor;
 
+import java.util.Map;
+
 /**
  * A state store supplier which can create one or more {@link StateStore} 
instances.
  */
@@ -35,4 +37,18 @@ public interface StateStoreSupplier {
      * @return  a new {@link StateStore} instance
      */
     StateStore get();
+
+    /**
+     * Returns a Map containing any log configs that will be used when 
creating the changelog for the {@link StateStore}
+     *
+     * Note: any unrecognized configs will be ignored by the Kafka brokers.
+     * @return Map containing any log configs to be used when creating the 
changelog for the {@link StateStore}
+     * If {@code loggingEnabled} returns false, this function will always 
return an empty map
+     */
+    Map<String, String> logConfig();
+
+    /**
+     * @return true if the {@link StateStore} should have logging enabled
+     */
+    boolean loggingEnabled();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
index c872fa1..c55518b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.kstream.KTable;
 
 /**
  * An interface that allows the Kafka Streams framework to extract a timestamp 
from an instance of {@link ConsumerRecord}.
@@ -28,7 +29,12 @@ public interface TimestampExtractor {
     /**
      * Extracts a timestamp from a record.
      * <p>
-     * Typically, the timestamp represents the milliseconds since midnight, 
January 1, 1970 UTC.
+     * The extracted timestamp MUST represent the milliseconds since midnight, 
January 1, 1970 UTC.
+     *
+     * It is important to note that this timestamp may become the message 
timestamp for any messages sent to changelogs updated by {@link KTable}s
+     * and joins. The message timestamp is used for log retention and log 
rolling, so using nonsensical values may result in
+     * excessive log rolling and therefore broker performance degradation.
+     *
      *
      * @param record  a data record
      * @return        the timestamp of the record

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index bcdb54a..ee61e73 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -28,6 +29,7 @@ import 
org.apache.kafka.streams.processor.internals.QuickUnion;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -53,7 +55,6 @@ import java.util.regex.Pattern;
  * instance that will then {@link 
org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and 
producing records}.
  */
 public class TopologyBuilder {
-
     // node factories in a topological order
     private final LinkedHashMap<String, NodeFactory> nodeFactories = new 
LinkedHashMap<>();
 
@@ -103,11 +104,9 @@ public class TopologyBuilder {
     private static class StateStoreFactory {
         public final Set<String> users;
 
-        public final boolean isInternal;
         public final StateStoreSupplier supplier;
 
-        StateStoreFactory(boolean isInternal, StateStoreSupplier supplier) {
-            this.isInternal = isInternal;
+        StateStoreFactory(StateStoreSupplier supplier) {
             this.supplier = supplier;
             this.users = new HashSet<>();
         }
@@ -224,10 +223,10 @@ public class TopologyBuilder {
     public static class TopicsInfo {
         public Set<String> sinkTopics;
         public Set<String> sourceTopics;
-        public Set<String> interSourceTopics;
-        public Set<String> stateChangelogTopics;
+        public Map<String, InternalTopicConfig> interSourceTopics;
+        public Map<String, InternalTopicConfig> stateChangelogTopics;
 
-        public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, 
Set<String> interSourceTopics, Set<String> stateChangelogTopics) {
+        public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, 
Map<String, InternalTopicConfig> interSourceTopics, Map<String, 
InternalTopicConfig> stateChangelogTopics) {
             this.sinkTopics = sinkTopics;
             this.sourceTopics = sourceTopics;
             this.interSourceTopics = interSourceTopics;
@@ -249,6 +248,16 @@ public class TopologyBuilder {
             long n = ((long) sourceTopics.hashCode() << 32) | (long) 
stateChangelogTopics.hashCode();
             return (int) (n % 0xFFFFFFFFL);
         }
+
+        @Override
+        public String toString() {
+            return "TopicsInfo{" +
+                    "sinkTopics=" + sinkTopics +
+                    ", sourceTopics=" + sourceTopics +
+                    ", interSourceTopics=" + interSourceTopics +
+                    ", stateChangelogTopics=" + stateChangelogTopics +
+                    '}';
+        }
     }
 
     /**
@@ -544,13 +553,13 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      * @throws TopologyBuilderException if state store supplier is already 
added
      */
-    public synchronized final TopologyBuilder addStateStore(StateStoreSupplier 
supplier, boolean isInternal, String... processorNames) {
+    public synchronized final TopologyBuilder addStateStore(StateStoreSupplier 
supplier, String... processorNames) {
         Objects.requireNonNull(supplier, "supplier can't be null");
         if (stateFactories.containsKey(supplier.name())) {
             throw new TopologyBuilderException("StateStore " + supplier.name() 
+ " is already added.");
         }
 
-        stateFactories.put(supplier.name(), new StateStoreFactory(isInternal, 
supplier));
+        stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
 
         if (processorNames != null) {
             for (String processorName : processorNames) {
@@ -561,15 +570,6 @@ public class TopologyBuilder {
         return this;
     }
 
-    /**
-     * Adds a state store
-     *
-     * @param supplier the supplier used to obtain this state store {@link 
StateStore} instance
-     * @return this builder instance so methods can be chained together; never 
null
-     */
-    public synchronized final TopologyBuilder addStateStore(StateStoreSupplier 
supplier, String... processorNames) {
-        return this.addStateStore(supplier, true, processorNames);
-    }
 
     /**
      * Connects the processor and the state stores
@@ -594,7 +594,6 @@ public class TopologyBuilder {
             throw new TopologyBuilderException("Source store " + 
sourceStoreName + " is already added.");
         }
         sourceStoreToSourceTopic.put(sourceStoreName, topic);
-
         return this;
     }
 
@@ -842,8 +841,8 @@ public class TopologyBuilder {
         for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
             Set<String> sinkTopics = new HashSet<>();
             Set<String> sourceTopics = new HashSet<>();
-            Set<String> internalSourceTopics = new HashSet<>();
-            Set<String> stateChangelogTopics = new HashSet<>();
+            Map<String, InternalTopicConfig> internalSourceTopics = new 
HashMap<>();
+            Map<String, InternalTopicConfig> stateChangelogTopics = new 
HashMap<>();
             for (String node : entry.getValue()) {
                 // if the node is a source node, add to the source topics
                 String[] topics = nodeToSourceTopics.get(node);
@@ -853,7 +852,9 @@ public class TopologyBuilder {
                         if (this.internalTopicNames.contains(topic)) {
                             // prefix the internal topic name with the 
application id
                             String internalTopic = decorateTopic(topic);
-                            internalSourceTopics.add(internalTopic);
+                            internalSourceTopics.put(internalTopic, new 
InternalTopicConfig(internalTopic,
+                                                                               
             Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                                                               
             Collections.<String, String>emptyMap()));
                             sourceTopics.add(internalTopic);
                         } else {
                             sourceTopics.add(topic);
@@ -874,22 +875,38 @@ public class TopologyBuilder {
 
                 // if the node is connected to a state, add to the state topics
                 for (StateStoreFactory stateFactory : stateFactories.values()) 
{
-                    if (stateFactory.isInternal && 
stateFactory.users.contains(node)) {
-                        // prefix the change log topic name with the 
application id
-                        
stateChangelogTopics.add(ProcessorStateManager.storeChangelogTopic(applicationId,
 stateFactory.supplier.name()));
+                    final StateStoreSupplier supplier = stateFactory.supplier;
+                    if (supplier.loggingEnabled() && 
stateFactory.users.contains(node)) {
+                        final String name = 
ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name());
+                        final InternalTopicConfig internalTopicConfig = 
createInternalTopicConfig(supplier, name);
+                        stateChangelogTopics.put(name, internalTopicConfig);
                     }
                 }
             }
             topicGroups.put(entry.getKey(), new TopicsInfo(
                     Collections.unmodifiableSet(sinkTopics),
                     Collections.unmodifiableSet(sourceTopics),
-                    Collections.unmodifiableSet(internalSourceTopics),
-                    Collections.unmodifiableSet(stateChangelogTopics)));
+                    Collections.unmodifiableMap(internalSourceTopics),
+                    Collections.unmodifiableMap(stateChangelogTopics)));
         }
 
         return Collections.unmodifiableMap(topicGroups);
     }
 
+    private InternalTopicConfig createInternalTopicConfig(final 
StateStoreSupplier supplier, final String name) {
+        if (!(supplier instanceof RocksDBWindowStoreSupplier)) {
+            return new InternalTopicConfig(name, 
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), 
supplier.logConfig());
+        }
+
+        final RocksDBWindowStoreSupplier windowStoreSupplier = 
(RocksDBWindowStoreSupplier) supplier;
+        final InternalTopicConfig config = new InternalTopicConfig(name,
+                                                                   
Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
+                                                                               
InternalTopicConfig.CleanupPolicy.delete),
+                                                                   
supplier.logConfig());
+        config.setRetentionMs(windowStoreSupplier.retentionPeriod());
+        return config;
+    }
+
 
     /**
      * Get the names of topics that are to be consumed by the source nodes 
created by this builder.

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
new file mode 100644
index 0000000..45016c8
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * InternalTopicConfig captures the properties required for configuring
+ * the internal topics we create for change-logs and repartitioning etc.
+ */
+public class InternalTopicConfig {
+    public enum CleanupPolicy { compact, delete }
+
+    private final String name;
+    private final Map<String, String> logConfig;
+    private Long retentionMs;
+    private final Set<CleanupPolicy> cleanupPolicies;
+
+    public InternalTopicConfig(final String name, final Set<CleanupPolicy> 
defaultCleanupPolicies, final Map<String, String> logConfig) {
+        Objects.requireNonNull(name, "name can't be null");
+        if (defaultCleanupPolicies.isEmpty()) {
+            throw new IllegalArgumentException("Must provide at least one 
cleanup policy");
+        }
+        this.name = name;
+        this.cleanupPolicies = defaultCleanupPolicies;
+        this.logConfig = logConfig;
+    }
+
+    /* for test use only */
+    boolean isCompacted() {
+        return cleanupPolicies.contains(CleanupPolicy.compact);
+    }
+
+    private boolean isCompactDelete() {
+        return cleanupPolicies.contains(CleanupPolicy.compact) && 
cleanupPolicies.contains(CleanupPolicy.delete);
+    }
+
+    /**
+     * Get the configured properties for this topic. If rententionMs is set 
then
+     * we add additionalRetentionMs to work out the desired retention when 
cleanup.policy=compact,delete
+     *
+     * @param additionalRetentionMs - added to retention to allow for clock 
drift etc
+     * @return Properties to be used when creating the topic
+     */
+    public Properties toProperties(final long additionalRetentionMs) {
+        final Properties result = new Properties();
+        for (Map.Entry<String, String> configEntry : logConfig.entrySet()) {
+            result.put(configEntry.getKey(), configEntry.getValue());
+        }
+        if (retentionMs != null && isCompactDelete()) {
+            result.put(InternalTopicManager.RETENTION_MS, 
String.valueOf(retentionMs + additionalRetentionMs));
+        }
+
+        if (!logConfig.containsKey(InternalTopicManager.CLEANUP_POLICY_PROP)) {
+            final StringBuilder builder = new StringBuilder();
+            for (CleanupPolicy cleanupPolicy : cleanupPolicies) {
+                builder.append(cleanupPolicy.name()).append(",");
+            }
+            builder.deleteCharAt(builder.length() - 1);
+
+            result.put(InternalTopicManager.CLEANUP_POLICY_PROP, 
builder.toString());
+        }
+
+
+        return result;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public void setRetentionMs(final long retentionMs) {
+        if (!logConfig.containsKey(InternalTopicManager.RETENTION_MS)) {
+            this.retentionMs = retentionMs;
+        }
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final InternalTopicConfig that = (InternalTopicConfig) o;
+        return Objects.equals(name, that.name) &&
+                Objects.equals(logConfig, that.logConfig) &&
+                Objects.equals(retentionMs, that.retentionMs) &&
+                Objects.equals(cleanupPolicies, that.cleanupPolicies);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, logConfig, retentionMs, cleanupPolicies);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 4477fb7..44de757 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -24,6 +24,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
@@ -35,8 +36,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
-import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 public class InternalTopicManager {
 
@@ -48,11 +51,27 @@ public class InternalTopicManager {
     private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
     private static final String ZK_ENTITY_CONFIG_PATH = "/config/topics";
     // TODO: the following LogConfig dependency should be removed after KIP-4
-    private static final String CLEANUP_POLICY_PROP = "cleanup.policy";
-    private static final String COMPACT = "compact";
+    public static final String CLEANUP_POLICY_PROP = "cleanup.policy";
+    private static final Set<String> CLEANUP_POLICIES = Utils.mkSet("compact", 
"delete");
+    public static final String RETENTION_MS = "retention.ms";
+    public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
 
     private final ZkClient zkClient;
     private final int replicationFactor;
+    private final long windowChangeLogAdditionalRetention;
+
+    public static boolean isValidCleanupPolicy(final String cleanupPolicy) {
+        if (cleanupPolicy == null) {
+            return false;
+        }
+        final String[] policies = 
cleanupPolicy.toLowerCase(Locale.ROOT).split(",");
+        for (String policy : policies) {
+            if (!CLEANUP_POLICIES.contains(policy.trim())) {
+                return false;
+            }
+        }
+        return true;
+    }
 
     private class ZKStringSerializer implements ZkSerializer {
 
@@ -87,22 +106,24 @@ public class InternalTopicManager {
     public InternalTopicManager() {
         this.zkClient = null;
         this.replicationFactor = 0;
+        this.windowChangeLogAdditionalRetention = 
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
     }
 
-    public InternalTopicManager(String zkConnect, int replicationFactor) {
+    public InternalTopicManager(String zkConnect, final int replicationFactor, 
long windowChangeLogAdditionalRetention) {
         this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new 
ZKStringSerializer());
         this.replicationFactor = replicationFactor;
+        this.windowChangeLogAdditionalRetention = 
windowChangeLogAdditionalRetention;
     }
 
-    public void makeReady(String topic, int numPartitions, boolean 
compactTopic) {
+    public void makeReady(InternalTopicConfig topic, int numPartitions) {
         boolean topicNotReady = true;
 
         while (topicNotReady) {
-            Map<Integer, List<Integer>> topicMetadata = 
getTopicMetadata(topic);
+            Map<Integer, List<Integer>> topicMetadata = 
getTopicMetadata(topic.name());
 
             if (topicMetadata == null) {
                 try {
-                    createTopic(topic, numPartitions, replicationFactor, 
compactTopic);
+                    createTopic(topic, numPartitions, replicationFactor);
                 } catch (ZkNodeExistsException e) {
                     // ignore and continue
                 }
@@ -110,14 +131,14 @@ public class InternalTopicManager {
                 if (topicMetadata.size() > numPartitions) {
                     // else if topic exists with more #.partitions than 
needed, delete in order to re-create it
                     try {
-                        deleteTopic(topic);
+                        deleteTopic(topic.name());
                     } catch (ZkNodeExistsException e) {
                         // ignore and continue
                     }
                 } else if (topicMetadata.size() < numPartitions) {
                     // else if topic exists with less #.partitions than 
needed, add partitions
                     try {
-                        addPartitions(topic, numPartitions - 
topicMetadata.size(), replicationFactor, topicMetadata);
+                        addPartitions(topic.name(), numPartitions - 
topicMetadata.size(), replicationFactor, topicMetadata);
                     } catch (ZkNoNodeException e) {
                         // ignore and continue
                     }
@@ -163,9 +184,8 @@ public class InternalTopicManager {
         }
     }
 
-    private void createTopic(String topic, int numPartitions, int 
replicationFactor, boolean compactTopic) throws ZkNodeExistsException {
-        log.debug("Creating topic {} with {} partitions from ZK in partition 
assignor.", topic, numPartitions);
-        Properties prop = new Properties();
+    private void createTopic(InternalTopicConfig topic, int numPartitions, int 
replicationFactor) throws ZkNodeExistsException {
+        log.debug("Creating topic {} with {} partitions from ZK in partition 
assignor.", topic.name(), numPartitions);
         ObjectMapper mapper = new ObjectMapper();
         List<Integer> brokers = getBrokers();
         int numBrokers = brokers.size();
@@ -185,17 +205,14 @@ public class InternalTopicManager {
             assignment.put(i, brokerList);
         }
         // write out config first just like in AdminUtils.scala 
createOrUpdateTopicPartitionAssignmentPathInZK()
-        if (compactTopic) {
-            prop.put(CLEANUP_POLICY_PROP, COMPACT);
-            try {
-                Map<String, Object> dataMap = new HashMap<>();
-                dataMap.put("version", 1);
-                dataMap.put("config", prop);
-                String data = mapper.writeValueAsString(dataMap);
-                zkClient.createPersistent(ZK_ENTITY_CONFIG_PATH + "/" + topic, 
data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
-            } catch (JsonProcessingException e) {
-                throw new StreamsException("Error while creating topic config 
in ZK for internal topic " + topic, e);
-            }
+        try {
+            Map<String, Object> dataMap = new HashMap<>();
+            dataMap.put("version", 1);
+            dataMap.put("config", 
topic.toProperties(windowChangeLogAdditionalRetention));
+            String data = mapper.writeValueAsString(dataMap);
+            zkClient.createPersistent(ZK_ENTITY_CONFIG_PATH + "/" + 
topic.name(), data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+        } catch (JsonProcessingException e) {
+            throw new StreamsException("Error while creating topic config in 
ZK for internal topic " + topic, e);
         }
 
         // try to write to ZK with open ACL
@@ -205,7 +222,7 @@ public class InternalTopicManager {
             dataMap.put("partitions", assignment);
             String data = mapper.writeValueAsString(dataMap);
 
-            zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, 
ZooDefs.Ids.OPEN_ACL_UNSAFE);
+            zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic.name(), 
data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
         } catch (JsonProcessingException e) {
             throw new StreamsException("Error while creating topic metadata in 
ZK for internal topic " + topic, e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 00ffb20..a38839f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -44,6 +44,10 @@ public class ProcessorContextImpl implements 
ProcessorContext, RecordCollector.S
     private final Serde<?> valSerde;
 
     private boolean initialized;
+    private Long timestamp;
+    private String topic;
+    private Long offset;
+    private Integer partition;
 
     @SuppressWarnings("unchecked")
     public ProcessorContextImpl(TaskId id,
@@ -136,53 +140,46 @@ public class ProcessorContextImpl implements 
ProcessorContext, RecordCollector.S
         return stateMgr.getStore(name);
     }
 
-    /**
-     * @throws IllegalStateException if the task's record is null
-     */
-    @Override
-    public String topic() {
-        if (task.record() == null)
-            throw new IllegalStateException("This should not happen as topic() 
should only be called while a record is processed");
 
-        String topic = task.record().topic();
-
-        if (topic.equals(NONEXIST_TOPIC))
+    @Override
+    public synchronized String topic() {
+        if (topic == null || topic.equals(NONEXIST_TOPIC))
             return null;
         else
             return topic;
     }
 
     /**
-     * @throws IllegalStateException if the task's record is null
+     * @throws IllegalStateException if partition is null
      */
     @Override
-    public int partition() {
-        if (task.record() == null)
+    public synchronized int partition() {
+        if (partition == null) {
             throw new IllegalStateException("This should not happen as 
partition() should only be called while a record is processed");
-
-        return task.record().partition();
+        }
+        return partition;
     }
 
     /**
-     * @throws IllegalStateException if the task's record is null
+     * @throws IllegalStateException if offset is null
      */
     @Override
-    public long offset() {
-        if (this.task.record() == null)
+    public synchronized long offset() {
+        if (offset == null) {
             throw new IllegalStateException("This should not happen as 
offset() should only be called while a record is processed");
-
-        return this.task.record().offset();
+        }
+        return offset;
     }
 
     /**
-     * @throws IllegalStateException if the task's record is null
+     * @throws IllegalStateException if timestamp is null
      */
     @Override
-    public long timestamp() {
-        if (task.record() == null)
-            throw new IllegalStateException("This should not happen as 
timestamp() should only be called while a record is processed");
-
-        return task.record().timestamp;
+    public synchronized long timestamp() {
+        if (timestamp == null) {
+            throw new IllegalStateException("This should not happen as 
timestamp should be set during record processing");
+        }
+        return timestamp;
     }
 
     @Override
@@ -219,4 +216,11 @@ public class ProcessorContextImpl implements 
ProcessorContext, RecordCollector.S
     public Map<String, Object> appConfigsWithPrefix(String prefix) {
         return config.originalsWithPrefix(prefix);
     }
+
+    public synchronized void update(final StampedRecord record) {
+        this.timestamp = record.timestamp;
+        this.partition = record.partition();
+        this.offset = record.offset();
+        this.topic = record.topic();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index bb8379c..65eda80 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -52,6 +52,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import static 
org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
+
 public class StreamPartitionAssignor implements PartitionAssignor, 
Configurable {
 
     private static final Logger log = 
LoggerFactory.getLogger(StreamPartitionAssignor.class);
@@ -93,8 +95,8 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
     private int numStandbyReplicas;
     private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
     private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
-    private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
-    private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
+    private Map<InternalTopicConfig, Set<TaskId>> stateChangelogTopicToTaskIds;
+    private Map<InternalTopicConfig, Set<TaskId>> internalSourceTopicToTaskIds;
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
 
     private InternalTopicManager internalTopicManager;
@@ -109,6 +111,7 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
     public void configure(Map<String, ?> configs) {
         numStandbyReplicas = (Integer) 
configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
+
         Object o = 
configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
         if (o == null) {
             KafkaException ex = new KafkaException("StreamThread is not 
specified");
@@ -147,7 +150,10 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
             internalTopicManager = new InternalTopicManager(
                     (String) 
configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
-                    
configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) 
configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1);
+                    
configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) 
configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
+                    
configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
 ?
+                            (Long) 
configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
+                            : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
         } else {
             log.info("stream-thread [{}] Config '{}' isn't supplied and hence 
no internal topics will be created.",  streamThread.getName(), 
StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
         }
@@ -176,23 +182,20 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
     /**
      * Internal helper function that creates a Kafka topic
      * @param topicToTaskIds Map that contains the topic names to be created
-     * @param compactTopic If true, the topic should be a compacted topic. 
This is used for
-     *                     change log topics usually.
      * @param postPartitionPhase If true, the computation for calculating the 
number of partitions
      *                           is slightly different. Set to true after the 
initial topic-to-partition
      *                           assignment.
      * @return
      */
-    private Map<TopicPartition, PartitionInfo> prepareTopic(Map<String, 
Set<TaskId>> topicToTaskIds,
-                                                            boolean 
compactTopic,
+    private Map<TopicPartition, PartitionInfo> 
prepareTopic(Map<InternalTopicConfig, Set<TaskId>> topicToTaskIds,
                                                             boolean 
postPartitionPhase) {
         Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>();
         // if ZK is specified, prepare the internal source topic before 
calling partition grouper
         if (internalTopicManager != null) {
             log.debug("stream-thread [{}] Starting to validate internal topics 
in partition assignor.", streamThread.getName());
 
-            for (Map.Entry<String, Set<TaskId>> entry : 
topicToTaskIds.entrySet()) {
-                String topic = entry.getKey();
+            for (Map.Entry<InternalTopicConfig, Set<TaskId>> entry : 
topicToTaskIds.entrySet()) {
+                InternalTopicConfig topic = entry.getKey();
                 int numPartitions = 0;
                 if (postPartitionPhase) {
                     // the expected number of partitions is the max value of 
TaskId.partition + 1
@@ -208,12 +211,12 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
                     }
                 }
 
-                internalTopicManager.makeReady(topic, numPartitions, 
compactTopic);
+                internalTopicManager.makeReady(topic, numPartitions);
 
                 // wait until the topic metadata has been propagated to all 
brokers
                 List<PartitionInfo> partitions;
                 do {
-                    partitions = 
streamThread.restoreConsumer.partitionsFor(topic);
+                    partitions = 
streamThread.restoreConsumer.partitionsFor(topic.name());
                 } while (partitions == null || partitions.size() != 
numPartitions);
 
                 for (PartitionInfo partition : partitions)
@@ -223,10 +226,10 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
             log.info("stream-thread [{}] Completed validating internal topics 
in partition assignor", streamThread.getName());
         } else {
             List<String> missingTopics = new ArrayList<>();
-            for (String topic : topicToTaskIds.keySet()) {
-                List<PartitionInfo> partitions = 
streamThread.restoreConsumer.partitionsFor(topic);
+            for (InternalTopicConfig topic : topicToTaskIds.keySet()) {
+                List<PartitionInfo> partitions = 
streamThread.restoreConsumer.partitionsFor(topic.name());
                 if (partitions == null) {
-                    missingTopics.add(topic);
+                    missingTopics.add(topic.name());
                 }
             }
             if (!missingTopics.isEmpty()) {
@@ -295,21 +298,21 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         // and enforce the number of partitions for those internal topics.
         internalSourceTopicToTaskIds = new HashMap<>();
         Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
-        Map<Integer, Set<String>> internalSourceTopicGroups = new HashMap<>();
+        Map<Integer, Collection<InternalTopicConfig>> 
internalSourceTopicGroups = new HashMap<>();
         for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : 
topicGroups.entrySet()) {
             sourceTopicGroups.put(entry.getKey(), 
entry.getValue().sourceTopics);
-            internalSourceTopicGroups.put(entry.getKey(), 
entry.getValue().interSourceTopics);
+            internalSourceTopicGroups.put(entry.getKey(), 
entry.getValue().interSourceTopics.values());
         }
 
 
         // for all internal source topics
         // set the number of partitions to the maximum of the depending 
sub-topologies source topics
         Map<TopicPartition, PartitionInfo> internalPartitionInfos = new 
HashMap<>();
-        Set<String> allInternalTopicNames = new HashSet<>();
+        Map<String, InternalTopicConfig> allInternalTopics = new HashMap<>();
         for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : 
topicGroups.entrySet()) {
-            Set<String> internalTopics = entry.getValue().interSourceTopics;
-            allInternalTopicNames.addAll(internalTopics);
-            for (String internalTopic : internalTopics) {
+            Map<String, InternalTopicConfig> internalTopics = 
entry.getValue().interSourceTopics;
+            allInternalTopics.putAll(internalTopics);
+            for (InternalTopicConfig internalTopic : internalTopics.values()) {
                 Set<TaskId> tasks = 
internalSourceTopicToTaskIds.get(internalTopic);
 
                 if (tasks == null) {
@@ -317,13 +320,13 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
                     for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> other 
: topicGroups.entrySet()) {
                         Set<String> otherSinkTopics = 
other.getValue().sinkTopics;
 
-                        if (otherSinkTopics.contains(internalTopic)) {
+                        if (otherSinkTopics.contains(internalTopic.name())) {
                             for (String topic : other.getValue().sourceTopics) 
{
                                 Integer partitions = null;
                                 // It is possible the sourceTopic is another 
internal topic, i.e,
                                 // map().join().join(map())
-                                if (allInternalTopicNames.contains(topic)) {
-                                    Set<TaskId> taskIds = 
internalSourceTopicToTaskIds.get(topic);
+                                if (allInternalTopics.containsKey(topic)) {
+                                    Set<TaskId> taskIds = 
internalSourceTopicToTaskIds.get(allInternalTopics.get(topic));
                                     if (taskIds != null) {
                                         for (TaskId taskId : taskIds) {
                                             partitions = taskId.partition;
@@ -340,8 +343,8 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
                     }
                     internalSourceTopicToTaskIds.put(internalTopic, 
Collections.singleton(new TaskId(entry.getKey(), numPartitions)));
                     for (int partition = 0; partition < numPartitions; 
partition++) {
-                        internalPartitionInfos.put(new 
TopicPartition(internalTopic, partition),
-                                                   new 
PartitionInfo(internalTopic, partition, null, new Node[0], new Node[0]));
+                        internalPartitionInfos.put(new 
TopicPartition(internalTopic.name(), partition),
+                                                   new 
PartitionInfo(internalTopic.name(), partition, null, new Node[0], new Node[0]));
                     }
                 }
             }
@@ -353,7 +356,7 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
                              metadata.withPartitions(internalPartitionInfos));
 
 
-        internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, 
false, false);
+        internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, 
false);
         internalSourceTopicToTaskIds.clear();
 
         metadataWithInternalTopics = metadata;
@@ -367,21 +370,23 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         // add tasks to state change log topic subscribers
         stateChangelogTopicToTaskIds = new HashMap<>();
         for (TaskId task : partitionsForTask.keySet()) {
-            for (String topicName : 
topicGroups.get(task.topicGroupId).stateChangelogTopics) {
-                Set<TaskId> tasks = 
stateChangelogTopicToTaskIds.get(topicName);
+            final Map<String, InternalTopicConfig> stateChangelogTopics = 
topicGroups.get(task.topicGroupId).stateChangelogTopics;
+            for (InternalTopicConfig topic : stateChangelogTopics.values()) {
+                Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topic);
                 if (tasks == null) {
                     tasks = new HashSet<>();
-                    stateChangelogTopicToTaskIds.put(topicName, tasks);
+                    stateChangelogTopicToTaskIds.put(topic, tasks);
                 }
 
                 tasks.add(task);
             }
 
-            for (String topicName : 
topicGroups.get(task.topicGroupId).interSourceTopics) {
-                Set<TaskId> tasks = 
internalSourceTopicToTaskIds.get(topicName);
+            final Map<String, InternalTopicConfig> interSourceTopics = 
topicGroups.get(task.topicGroupId).interSourceTopics;
+            for (InternalTopicConfig topic : interSourceTopics.values()) {
+                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topic);
                 if (tasks == null) {
                     tasks = new HashSet<>();
-                    internalSourceTopicToTaskIds.put(topicName, tasks);
+                    internalSourceTopicToTaskIds.put(topic, tasks);
                 }
 
                 tasks.add(task);
@@ -461,9 +466,9 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         }
 
         // if ZK is specified, validate the internal topics again
-        prepareTopic(internalSourceTopicToTaskIds, false /* compactTopic */, 
true);
+        prepareTopic(internalSourceTopicToTaskIds,  /* compactTopic */ true);
         // change log topics should be compacted
-        prepareTopic(stateChangelogTopicToTaskIds, true /* compactTopic */, 
true);
+        prepareTopic(stateChangelogTopicToTaskIds,  /* compactTopic */ true);
 
         Map<String, Assignment> assignment = new HashMap<>();
         for (AssignmentSupplier assignmentSupplier : assignmentSuppliers) {
@@ -563,21 +568,24 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         return metadataWithInternalTopics;
     }
 
-    private void ensureCopartitioning(Collection<Set<String>> 
copartitionGroups, Map<Integer, Set<String>> internalTopicGroups, Cluster 
metadata) {
-        Set<String> internalTopics = new HashSet<>();
-        for (Set<String> topics : internalTopicGroups.values())
-            internalTopics.addAll(topics);
+    private void ensureCopartitioning(Collection<Set<String>> 
copartitionGroups, Map<Integer, Collection<InternalTopicConfig>> 
internalTopicGroups, Cluster metadata) {
+        Map<String, InternalTopicConfig> internalTopics = new HashMap<>();
+        for (Collection<InternalTopicConfig> topics : 
internalTopicGroups.values()) {
+            for (InternalTopicConfig topic : topics) {
+                internalTopics.put(topic.name(), topic);
+            }
+        }
 
         for (Set<String> copartitionGroup : copartitionGroups) {
             ensureCopartitioning(copartitionGroup, internalTopics, metadata);
         }
     }
 
-    private void ensureCopartitioning(Set<String> copartitionGroup, 
Set<String> internalTopics, Cluster metadata) {
+    private void ensureCopartitioning(Set<String> copartitionGroup, 
Map<String, InternalTopicConfig> internalTopics, Cluster metadata) {
         int numPartitions = -1;
 
         for (String topic : copartitionGroup) {
-            if (!internalTopics.contains(topic)) {
+            if (!internalTopics.containsKey(topic)) {
                 List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
 
                 if (infos == null)
@@ -594,9 +602,9 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         }
 
         if (numPartitions == -1) {
-            for (String topic : internalTopics) {
-                if (copartitionGroup.contains(topic)) {
-                    Integer partitions = 
metadata.partitionCountForTopic(topic);
+            for (InternalTopicConfig topic : internalTopics.values()) {
+                if (copartitionGroup.contains(topic.name())) {
+                    Integer partitions = 
metadata.partitionCountForTopic(topic.name());
                     if (partitions != null && partitions > numPartitions) {
                         numPartitions = partitions;
                     }
@@ -604,8 +612,8 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
             }
         }
         // enforce co-partitioning restrictions to internal topics reusing 
internalSourceTopicToTaskIds
-        for (String topic : internalTopics) {
-            if (copartitionGroup.contains(topic)) {
+        for (InternalTopicConfig topic : internalTopics.values()) {
+            if (copartitionGroup.contains(topic.name())) {
                 internalSourceTopicToTaskIds
                     .put(topic, Collections.singleton(new TaskId(-1, 
numPartitions)));
             }
@@ -614,7 +622,13 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
 
     /* For Test Only */
     public Set<TaskId> tasksForState(String stateName) {
-        return 
stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.applicationId,
 stateName));
+        final String changeLogName = 
ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, 
stateName);
+        for (InternalTopicConfig internalTopicConfig : 
stateChangelogTopicToTaskIds.keySet()) {
+            if (internalTopicConfig.name().equals(changeLogName)) {
+                return stateChangelogTopicToTaskIds.get(internalTopicConfig);
+            }
+        }
+        return Collections.emptySet();
     }
 
     public Set<TaskId> tasksForPartition(TopicPartition partition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 18ca0ee..476ec2e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -171,6 +171,7 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
 
             log.debug("task [{}] Start processing one record [{}]", id(), 
currRecord);
 
+            updateContext(currRecord);
             this.currNode.process(currRecord.key(), currRecord.value());
 
             log.debug("task [{}] Completed processing one record [{}]", id(), 
currRecord);
@@ -226,7 +227,7 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
 
         currNode = node;
         currRecord = new StampedRecord(DUMMY_RECORD, timestamp);
-
+        updateContext(currRecord);
         try {
             node.processor().punctuate(timestamp);
         } finally {
@@ -235,6 +236,10 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
         }
     }
 
+    private void updateContext(final StampedRecord record) {
+        ((ProcessorContextImpl) processorContext).update(record);
+    }
+
     public StampedRecord record() {
         return this.currRecord;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 9f1e53c..03c0d02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -25,6 +25,8 @@ import 
org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Factory for creating state stores in Kafka Streams.
@@ -44,11 +46,15 @@ public class Stores {
                 return new ValueFactory<K>() {
                     @Override
                     public <V> KeyValueFactory<K, V> withValues(final Serde<V> 
valueSerde) {
+
                         return new KeyValueFactory<K, V>() {
+
                             @Override
                             public InMemoryKeyValueFactory<K, V> inMemory() {
                                 return new InMemoryKeyValueFactory<K, V>() {
                                     private int capacity = Integer.MAX_VALUE;
+                                    private final Map<String, String> 
logConfig = new HashMap<>();
+                                    private boolean logged = true;
 
                                     /**
                                      * @param capacity the maximum capacity of 
the in-memory cache; should be one less than a power of 2
@@ -62,11 +68,25 @@ public class Stores {
                                     }
 
                                     @Override
+                                    public InMemoryKeyValueFactory<K, V> 
enableLogging(final Map<String, String> config) {
+                                        logged = true;
+                                        logConfig.putAll(config);
+                                        return this;
+                                    }
+
+                                    @Override
+                                    public InMemoryKeyValueFactory<K, V> 
disableLogging() {
+                                        logged = false;
+                                        logConfig.clear();
+                                        return this;
+                                    }
+
+                                    @Override
                                     public StateStoreSupplier build() {
                                         if (capacity < Integer.MAX_VALUE) {
-                                            return new 
InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde);
+                                            return new 
InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, 
logConfig);
                                         }
-                                        return new 
InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde);
+                                        return new 
InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig);
                                     }
                                 };
                             }
@@ -74,9 +94,11 @@ public class Stores {
                             @Override
                             public PersistentKeyValueFactory<K, V> 
persistent() {
                                 return new PersistentKeyValueFactory<K, V>() {
+                                    private final Map<String, String> 
logConfig = new HashMap<>();
                                     private int numSegments = 0;
                                     private long retentionPeriod = 0L;
                                     private boolean retainDuplicates = false;
+                                    private boolean logged = true;
 
                                     @Override
                                     public PersistentKeyValueFactory<K, V> 
windowed(long retentionPeriod, int numSegments, boolean retainDuplicates) {
@@ -88,15 +110,31 @@ public class Stores {
                                     }
 
                                     @Override
+                                    public PersistentKeyValueFactory<K, V> 
enableLogging(final Map<String, String> config) {
+                                        logged = true;
+                                        logConfig.putAll(config);
+                                        return this;
+                                    }
+
+                                    @Override
+                                    public PersistentKeyValueFactory<K, V> 
disableLogging() {
+                                        logged = false;
+                                        logConfig.clear();
+                                        return this;
+                                    }
+
+                                    @Override
                                     public StateStoreSupplier build() {
                                         if (numSegments > 0) {
-                                            return new 
RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, 
retainDuplicates, keySerde, valueSerde);
+                                            return new 
RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, 
retainDuplicates, keySerde, valueSerde, logged, logConfig);
                                         }
 
-                                        return new 
RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde);
+                                        return new 
RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig);
                                     }
                                 };
                             }
+
+
                         };
                     }
                 };
@@ -104,6 +142,7 @@ public class Stores {
         };
     }
 
+
     public static abstract class StoreFactory {
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will 
be {@link String}s.
@@ -257,12 +296,7 @@ public class Stores {
         public abstract <V> KeyValueFactory<K, V> withValues(Serde<V> 
valueSerde);
     }
 
-    /**
-     * The interface used to specify the different kinds of key-value stores.
-     *
-     * @param <K> the type of keys
-     * @param <V> the type of values
-     */
+
     public interface KeyValueFactory<K, V> {
         /**
          * Keep all key-value entries in-memory, although for durability all 
entries are recorded in a Kafka topic that can be
@@ -299,6 +333,23 @@ public class Stores {
         InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
 
         /**
+         * Indicates that a changelog should be created for the store. The 
changelog will be created
+         * with the provided cleanupPolicy and configs.
+         *
+         * Note: Any unrecognized configs will be ignored.
+         * @param config    any configs that should be applied to the changelog
+         * @return  the factory to create an in-memory key-value store
+         */
+        InMemoryKeyValueFactory<K, V> enableLogging(final Map<String, String> 
config);
+
+        /**
+         * Indicates that a changelog should not be created for the key-value 
store
+         * @return the factory to create an in-memory key-value store
+         */
+        InMemoryKeyValueFactory<K, V> disableLogging();
+
+
+        /**
          * Return the instance of StateStoreSupplier of new key-value store.
          * @return the state store supplier; never null
          */
@@ -323,6 +374,22 @@ public class Stores {
         PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int 
numSegments, boolean retainDuplicates);
 
         /**
+         * Indicates that a changelog should be created for the store. The 
changelog will be created
+         * with the provided cleanupPolicy and configs.
+         *
+         * Note: Any unrecognized configs will be ignored.
+         * @param config            any configs that should be applied to the 
changelog
+         * @return  the factory to create a persistent key-value store
+         */
+        PersistentKeyValueFactory<K, V> enableLogging(final Map<String, 
String> config);
+
+        /**
+         * Indicates that a changelog should not be created for the key-value 
store
+         * @return the factory to create a persistent key-value store
+         */
+        PersistentKeyValueFactory<K, V> disableLogging();
+
+        /**
          * Return the instance of StateStoreSupplier of new key-value store.
          * @return the key-value store; never null
          */

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
new file mode 100644
index 0000000..64d6e07
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Map;
+
+public abstract class AbstractStoreSupplier<K, V> implements 
StateStoreSupplier {
+    protected final String name;
+    protected final Serde<K> keySerde;
+    protected final Serde<V> valueSerde;
+    protected final Time time;
+    protected final boolean logged;
+    protected final Map<String, String> logConfig;
+
+    public AbstractStoreSupplier(String name, Serde<K> keySerde, Serde<V> 
valueSerde, Time time, boolean logged, Map<String, String> logConfig) {
+        this.time = time;
+        this.name = name;
+        this.valueSerde = valueSerde;
+        this.keySerde = keySerde;
+        this.logged = logged;
+        this.logConfig = logConfig;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public Map<String, String> logConfig() {
+        return logConfig;
+    }
+
+    public boolean loggingEnabled() {
+        return logged;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 3953fd0..c05ebb2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -46,32 +45,21 @@ import java.util.TreeMap;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier 
{
+public class InMemoryKeyValueStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V> {
 
-    private final String name;
-    private final Time time;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
 
-    public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde) {
-        this(name, keySerde, valueSerde, null);
+    public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
+        this(name, keySerde, valueSerde, null, logged, logConfig);
     }
 
-    public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, Time time) {
-        this.name = name;
-        this.time = time;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-    }
-
-    public String name() {
-        return name;
+    public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) {
+        super(name, keySerde, valueSerde, time, logged, logConfig);
     }
 
     public StateStore get() {
         MemoryStore<K, V> store = new MemoryStore<>(name, keySerde, 
valueSerde);
 
-        return new MeteredKeyValueStore<>(store.enableLogging(), 
"in-memory-state", time);
+        return new MeteredKeyValueStore<>(logged ? store.enableLogging() : 
store, "in-memory-state", time);
     }
 
     private static class MemoryStore<K, V> implements KeyValueStore<K, V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 20a7333..45bcca3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -19,7 +19,8 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Map;
 
 /**
  * An in-memory key-value store that is limited in size and retains a maximum 
number of most recently used entries.
@@ -28,34 +29,22 @@ import 
org.apache.kafka.streams.processor.StateStoreSupplier;
  * @param <V> The value type
  *
  */
-public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier 
{
+public class InMemoryLRUCacheStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V> {
 
-    private final String name;
     private final int capacity;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-    private final Time time;
 
-    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> 
keySerde, Serde<V> valueSerde) {
-        this(name, capacity, keySerde, valueSerde, null);
+    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> 
keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
+        this(name, capacity, keySerde, valueSerde, null, logged, logConfig);
     }
 
-    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> 
keySerde, Serde<V> valueSerde, Time time) {
-        this.name = name;
+    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> 
keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> 
logConfig) {
+        super(name, keySerde, valueSerde, time, logged, logConfig);
         this.capacity = capacity;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-        this.time = time;
-    }
-
-    public String name() {
-        return name;
     }
 
     public StateStore get() {
         MemoryNavigableLRUCache<K, V> cache = new 
MemoryNavigableLRUCache<>(name, capacity, keySerde, valueSerde);
-        InMemoryKeyValueLoggedStore<K, V> loggedCache = 
(InMemoryKeyValueLoggedStore<K, V>) cache.enableLogging();
-
-        return new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", 
time);
+        return new MeteredKeyValueStore<>(logged ? cache.enableLogging() : 
cache, "in-memory-lru-state", time);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 16111ad..c10b7e1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -20,7 +20,8 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Map;
 
 /**
  * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all 
entries in a local RocksDB database.
@@ -30,31 +31,19 @@ import 
org.apache.kafka.streams.processor.StateStoreSupplier;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
-
-    private final String name;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-    private final Time time;
-
-    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde) {
-        this(name, keySerde, valueSerde, null);
-    }
+public class RocksDBKeyValueStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V> {
 
-    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, Time time) {
-        this.name = name;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-        this.time = time;
+    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
+        this(name, keySerde, valueSerde, null, logged, logConfig);
     }
 
-    public String name() {
-        return name;
+    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) {
+        super(name, keySerde, valueSerde, time, logged, logConfig);
     }
 
     public StateStore get() {
         RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, 
valueSerde);
-
-        return new MeteredKeyValueStore<>(store.enableLogging(), 
"rocksdb-state", time);
+        return new MeteredKeyValueStore<>(logged ? store.enableLogging() : 
store, "rocksdb-state", time);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 3a1bd59..107a5e4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -20,7 +20,8 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Map;
 
 /**
  * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all 
entries in a local RocksDB database.
@@ -30,38 +31,30 @@ import 
org.apache.kafka.streams.processor.StateStoreSupplier;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
+public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, 
V> {
 
-    private final String name;
     private final long retentionPeriod;
     private final boolean retainDuplicates;
     private final int numSegments;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-    private final Time time;
 
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int 
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) {
-        this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, 
valueSerde, null);
+    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int 
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, 
boolean logged, Map<String, String> logConfig) {
+        this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, 
valueSerde, null, logged, logConfig);
     }
 
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int 
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, 
Time time) {
-        this.name = name;
+    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int 
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, 
Time time, boolean logged, Map<String, String> logConfig) {
+        super(name, keySerde, valueSerde, time, logged, logConfig);
         this.retentionPeriod = retentionPeriod;
         this.retainDuplicates = retainDuplicates;
         this.numSegments = numSegments;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-        this.time = time;
-    }
-
-    public String name() {
-        return name;
     }
 
     public StateStore get() {
         RocksDBWindowStore<K, V> store = new RocksDBWindowStore<>(name, 
retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde);
 
-        return new MeteredWindowStore<>(store.enableLogging(), 
"rocksdb-window", time);
+        return new MeteredWindowStore<>(logged ? store.enableLogging() : 
store, "rocksdb-window", time);
     }
 
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 3f848fe..41f9ae2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -109,7 +109,7 @@ public class StoreChangeLogger<K, V> {
             }
             for (K k : this.dirty) {
                 V v = getter.get(k);
-                collector.send(new ProducerRecord<>(this.topic, 
this.partition, k, v), keySerializer, valueSerializer);
+                collector.send(new ProducerRecord<>(this.topic, 
this.partition, context.timestamp(), k, v), keySerializer, valueSerializer);
             }
             this.removed.clear();
             this.dirty.clear();

Reply via email to