KAFKA-3595: window stores use compact,delete config for changelogs changelogs of window stores now configure cleanup.policy=compact,delete with retention.ms set to window maintainMs + StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG StoreChangeLogger produces messages with context.timestamp().
Author: Damian Guy <damian....@gmail.com> Reviewers: Eno Thereska, Guozhang Wang Closes #1792 from dguy/kafka-3595 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69ebf6f7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69ebf6f7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69ebf6f7 Branch: refs/heads/trunk Commit: 69ebf6f7be2fc0e471ebd5b7a166468017ff2651 Parents: eba0ede Author: Damian Guy <damian....@gmail.com> Authored: Wed Sep 7 18:02:24 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Sep 7 18:02:24 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/StreamsConfig.java | 12 +- .../streams/kstream/internals/KTableImpl.java | 11 +- .../kstream/internals/KTableStoreSupplier.java | 58 ---- .../streams/processor/StateStoreSupplier.java | 16 + .../streams/processor/TimestampExtractor.java | 8 +- .../streams/processor/TopologyBuilder.java | 71 +++-- .../internals/InternalTopicConfig.java | 110 +++++++ .../internals/InternalTopicManager.java | 65 ++-- .../internals/ProcessorContextImpl.java | 56 ++-- .../internals/StreamPartitionAssignor.java | 106 ++++--- .../streams/processor/internals/StreamTask.java | 7 +- .../org/apache/kafka/streams/state/Stores.java | 87 +++++- .../state/internals/AbstractStoreSupplier.java | 53 ++++ .../InMemoryKeyValueStoreSupplier.java | 24 +- .../InMemoryLRUCacheStoreSupplier.java | 29 +- .../internals/RocksDBKeyValueStoreSupplier.java | 29 +- .../internals/RocksDBWindowStoreSupplier.java | 29 +- .../state/internals/StoreChangeLogger.java | 2 +- .../InternalTopicIntegrationTest.java | 110 +++++-- .../streams/processor/TopologyBuilderTest.java | 101 +++++- .../internals/InternalTopicConfigTest.java | 122 ++++++++ .../internals/StreamPartitionAssignorTest.java | 8 +- .../apache/kafka/streams/state/StoresTest.java | 84 +++++ .../internals/AbstractKeyValueStoreTest.java | 312 +++++++++---------- .../internals/InMemoryLRUCacheStoreTest.java | 82 +++-- .../state/internals/RocksDBWindowStoreTest.java | 10 +- .../state/internals/StateStoreTestUtils.java | 8 +- .../state/internals/StoreChangeLoggerTest.java | 4 +- .../kafka/test/MockStateStoreSupplier.java | 12 + 29 files changed, 1094 insertions(+), 532 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 41498cf..e972887 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -128,9 +128,13 @@ public class StreamsConfig extends AbstractConfig { /** <code>client.id</code> */ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; + /** <code>rocksdb.config.setter</code> */ public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter"; public static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface"; + /** <code>windowstore.changelog.additional.retention.ms</code> */ + public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; + public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; static { @@ -238,7 +242,12 @@ public class StreamsConfig extends AbstractConfig { Type.CLASS, null, Importance.LOW, - ROCKSDB_CONFIG_SETTER_CLASS_DOC); + ROCKSDB_CONFIG_SETTER_CLASS_DOC) + .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, + Type.LONG, + 24 * 60 * 60 * 1000, + Importance.MEDIUM, + WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC); } // this is the list of configs for underlying clients @@ -325,6 +334,7 @@ public class StreamsConfig extends AbstractConfig { props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG)); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName()); + props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG)); http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 2f36183..6c73b11 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -33,10 +33,12 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.PrintStream; +import java.util.Collections; import java.util.Objects; import java.util.Set; @@ -416,10 +418,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, public void materialize(KTableSource<K, ?> source) { synchronized (source) { if (!source.isMaterialized()) { - StateStoreSupplier storeSupplier = - new KTableStoreSupplier<>(source.storeName, keySerde, valSerde, null); + StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(source.storeName, + keySerde, + valSerde, + false, + Collections.<String, String>emptyMap()); // mark this state as non internal hence it is read directly from a user topic - topology.addStateStore(storeSupplier, false, name); + topology.addStateStore(storeSupplier, name); source.materialize(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java deleted file mode 100644 index ff118da..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.state.internals.MeteredKeyValueStore; -import org.apache.kafka.streams.state.internals.RocksDBStore; - -/** - * A KTable storage. It stores all entries in a local RocksDB database. - * - * @param <K> the type of keys - * @param <V> the type of values - */ -public class KTableStoreSupplier<K, V> implements StateStoreSupplier { - - private final String name; - private final Serde<K> keySerde; - private final Serde<V> valueSerde; - private final Time time; - - protected KTableStoreSupplier(String name, - Serde<K> keySerde, - Serde<V> valueSerde, - Time time) { - this.name = name; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - this.time = time; - } - - public String name() { - return name; - } - - public StateStore get() { - return new MeteredKeyValueStore<>(new RocksDBStore<>(name, keySerde, valueSerde), "rocksdb-state", time); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java index f2ae020..d3b0a1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.processor; +import java.util.Map; + /** * A state store supplier which can create one or more {@link StateStore} instances. */ @@ -35,4 +37,18 @@ public interface StateStoreSupplier { * @return a new {@link StateStore} instance */ StateStore get(); + + /** + * Returns a Map containing any log configs that will be used when creating the changelog for the {@link StateStore} + * + * Note: any unrecognized configs will be ignored by the Kafka brokers. + * @return Map containing any log configs to be used when creating the changelog for the {@link StateStore} + * If {@code loggingEnabled} returns false, this function will always return an empty map + */ + Map<String, String> logConfig(); + + /** + * @return true if the {@link StateStore} should have logging enabled + */ + boolean loggingEnabled(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index c872fa1..c55518b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.kstream.KTable; /** * An interface that allows the Kafka Streams framework to extract a timestamp from an instance of {@link ConsumerRecord}. @@ -28,7 +29,12 @@ public interface TimestampExtractor { /** * Extracts a timestamp from a record. * <p> - * Typically, the timestamp represents the milliseconds since midnight, January 1, 1970 UTC. + * The extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC. + * + * It is important to note that this timestamp may become the message timestamp for any messages sent to changelogs updated by {@link KTable}s + * and joins. The message timestamp is used for log retention and log rolling, so using nonsensical values may result in + * excessive log rolling and therefore broker performance degradation. + * * * @param record a data record * @return the timestamp of the record http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index bcdb54a..ee61e73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.processor.internals.InternalTopicConfig; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; @@ -28,6 +29,7 @@ import org.apache.kafka.streams.processor.internals.QuickUnion; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates; +import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import java.util.ArrayList; import java.util.Arrays; @@ -53,7 +55,6 @@ import java.util.regex.Pattern; * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}. */ public class TopologyBuilder { - // node factories in a topological order private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>(); @@ -103,11 +104,9 @@ public class TopologyBuilder { private static class StateStoreFactory { public final Set<String> users; - public final boolean isInternal; public final StateStoreSupplier supplier; - StateStoreFactory(boolean isInternal, StateStoreSupplier supplier) { - this.isInternal = isInternal; + StateStoreFactory(StateStoreSupplier supplier) { this.supplier = supplier; this.users = new HashSet<>(); } @@ -224,10 +223,10 @@ public class TopologyBuilder { public static class TopicsInfo { public Set<String> sinkTopics; public Set<String> sourceTopics; - public Set<String> interSourceTopics; - public Set<String> stateChangelogTopics; + public Map<String, InternalTopicConfig> interSourceTopics; + public Map<String, InternalTopicConfig> stateChangelogTopics; - public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Set<String> interSourceTopics, Set<String> stateChangelogTopics) { + public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> interSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) { this.sinkTopics = sinkTopics; this.sourceTopics = sourceTopics; this.interSourceTopics = interSourceTopics; @@ -249,6 +248,16 @@ public class TopologyBuilder { long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode(); return (int) (n % 0xFFFFFFFFL); } + + @Override + public String toString() { + return "TopicsInfo{" + + "sinkTopics=" + sinkTopics + + ", sourceTopics=" + sourceTopics + + ", interSourceTopics=" + interSourceTopics + + ", stateChangelogTopics=" + stateChangelogTopics + + '}'; + } } /** @@ -544,13 +553,13 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if state store supplier is already added */ - public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) { + public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) { Objects.requireNonNull(supplier, "supplier can't be null"); if (stateFactories.containsKey(supplier.name())) { throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added."); } - stateFactories.put(supplier.name(), new StateStoreFactory(isInternal, supplier)); + stateFactories.put(supplier.name(), new StateStoreFactory(supplier)); if (processorNames != null) { for (String processorName : processorNames) { @@ -561,15 +570,6 @@ public class TopologyBuilder { return this; } - /** - * Adds a state store - * - * @param supplier the supplier used to obtain this state store {@link StateStore} instance - * @return this builder instance so methods can be chained together; never null - */ - public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) { - return this.addStateStore(supplier, true, processorNames); - } /** * Connects the processor and the state stores @@ -594,7 +594,6 @@ public class TopologyBuilder { throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added."); } sourceStoreToSourceTopic.put(sourceStoreName, topic); - return this; } @@ -842,8 +841,8 @@ public class TopologyBuilder { for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) { Set<String> sinkTopics = new HashSet<>(); Set<String> sourceTopics = new HashSet<>(); - Set<String> internalSourceTopics = new HashSet<>(); - Set<String> stateChangelogTopics = new HashSet<>(); + Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>(); + Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>(); for (String node : entry.getValue()) { // if the node is a source node, add to the source topics String[] topics = nodeToSourceTopics.get(node); @@ -853,7 +852,9 @@ public class TopologyBuilder { if (this.internalTopicNames.contains(topic)) { // prefix the internal topic name with the application id String internalTopic = decorateTopic(topic); - internalSourceTopics.add(internalTopic); + internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic, + Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), + Collections.<String, String>emptyMap())); sourceTopics.add(internalTopic); } else { sourceTopics.add(topic); @@ -874,22 +875,38 @@ public class TopologyBuilder { // if the node is connected to a state, add to the state topics for (StateStoreFactory stateFactory : stateFactories.values()) { - if (stateFactory.isInternal && stateFactory.users.contains(node)) { - // prefix the change log topic name with the application id - stateChangelogTopics.add(ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.supplier.name())); + final StateStoreSupplier supplier = stateFactory.supplier; + if (supplier.loggingEnabled() && stateFactory.users.contains(node)) { + final String name = ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name()); + final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(supplier, name); + stateChangelogTopics.put(name, internalTopicConfig); } } } topicGroups.put(entry.getKey(), new TopicsInfo( Collections.unmodifiableSet(sinkTopics), Collections.unmodifiableSet(sourceTopics), - Collections.unmodifiableSet(internalSourceTopics), - Collections.unmodifiableSet(stateChangelogTopics))); + Collections.unmodifiableMap(internalSourceTopics), + Collections.unmodifiableMap(stateChangelogTopics))); } return Collections.unmodifiableMap(topicGroups); } + private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier supplier, final String name) { + if (!(supplier instanceof RocksDBWindowStoreSupplier)) { + return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig()); + } + + final RocksDBWindowStoreSupplier windowStoreSupplier = (RocksDBWindowStoreSupplier) supplier; + final InternalTopicConfig config = new InternalTopicConfig(name, + Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact, + InternalTopicConfig.CleanupPolicy.delete), + supplier.logConfig()); + config.setRetentionMs(windowStoreSupplier.retentionPeriod()); + return config; + } + /** * Get the names of topics that are to be consumed by the source nodes created by this builder. http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java new file mode 100644 index 0000000..45016c8 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +/** + * InternalTopicConfig captures the properties required for configuring + * the internal topics we create for change-logs and repartitioning etc. + */ +public class InternalTopicConfig { + public enum CleanupPolicy { compact, delete } + + private final String name; + private final Map<String, String> logConfig; + private Long retentionMs; + private final Set<CleanupPolicy> cleanupPolicies; + + public InternalTopicConfig(final String name, final Set<CleanupPolicy> defaultCleanupPolicies, final Map<String, String> logConfig) { + Objects.requireNonNull(name, "name can't be null"); + if (defaultCleanupPolicies.isEmpty()) { + throw new IllegalArgumentException("Must provide at least one cleanup policy"); + } + this.name = name; + this.cleanupPolicies = defaultCleanupPolicies; + this.logConfig = logConfig; + } + + /* for test use only */ + boolean isCompacted() { + return cleanupPolicies.contains(CleanupPolicy.compact); + } + + private boolean isCompactDelete() { + return cleanupPolicies.contains(CleanupPolicy.compact) && cleanupPolicies.contains(CleanupPolicy.delete); + } + + /** + * Get the configured properties for this topic. If rententionMs is set then + * we add additionalRetentionMs to work out the desired retention when cleanup.policy=compact,delete + * + * @param additionalRetentionMs - added to retention to allow for clock drift etc + * @return Properties to be used when creating the topic + */ + public Properties toProperties(final long additionalRetentionMs) { + final Properties result = new Properties(); + for (Map.Entry<String, String> configEntry : logConfig.entrySet()) { + result.put(configEntry.getKey(), configEntry.getValue()); + } + if (retentionMs != null && isCompactDelete()) { + result.put(InternalTopicManager.RETENTION_MS, String.valueOf(retentionMs + additionalRetentionMs)); + } + + if (!logConfig.containsKey(InternalTopicManager.CLEANUP_POLICY_PROP)) { + final StringBuilder builder = new StringBuilder(); + for (CleanupPolicy cleanupPolicy : cleanupPolicies) { + builder.append(cleanupPolicy.name()).append(","); + } + builder.deleteCharAt(builder.length() - 1); + + result.put(InternalTopicManager.CLEANUP_POLICY_PROP, builder.toString()); + } + + + return result; + } + + public String name() { + return name; + } + + public void setRetentionMs(final long retentionMs) { + if (!logConfig.containsKey(InternalTopicManager.RETENTION_MS)) { + this.retentionMs = retentionMs; + } + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final InternalTopicConfig that = (InternalTopicConfig) o; + return Objects.equals(name, that.name) && + Objects.equals(logConfig, that.logConfig) && + Objects.equals(retentionMs, that.retentionMs) && + Objects.equals(cleanupPolicies, that.cleanupPolicies); + } + + @Override + public int hashCode() { + return Objects.hash(name, logConfig, retentionMs, cleanupPolicies); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 4477fb7..44de757 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -24,6 +24,7 @@ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; @@ -35,8 +36,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; -import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; public class InternalTopicManager { @@ -48,11 +51,27 @@ public class InternalTopicManager { private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics"; private static final String ZK_ENTITY_CONFIG_PATH = "/config/topics"; // TODO: the following LogConfig dependency should be removed after KIP-4 - private static final String CLEANUP_POLICY_PROP = "cleanup.policy"; - private static final String COMPACT = "compact"; + public static final String CLEANUP_POLICY_PROP = "cleanup.policy"; + private static final Set<String> CLEANUP_POLICIES = Utils.mkSet("compact", "delete"); + public static final String RETENTION_MS = "retention.ms"; + public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS); private final ZkClient zkClient; private final int replicationFactor; + private final long windowChangeLogAdditionalRetention; + + public static boolean isValidCleanupPolicy(final String cleanupPolicy) { + if (cleanupPolicy == null) { + return false; + } + final String[] policies = cleanupPolicy.toLowerCase(Locale.ROOT).split(","); + for (String policy : policies) { + if (!CLEANUP_POLICIES.contains(policy.trim())) { + return false; + } + } + return true; + } private class ZKStringSerializer implements ZkSerializer { @@ -87,22 +106,24 @@ public class InternalTopicManager { public InternalTopicManager() { this.zkClient = null; this.replicationFactor = 0; + this.windowChangeLogAdditionalRetention = WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT; } - public InternalTopicManager(String zkConnect, int replicationFactor) { + public InternalTopicManager(String zkConnect, final int replicationFactor, long windowChangeLogAdditionalRetention) { this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer()); this.replicationFactor = replicationFactor; + this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention; } - public void makeReady(String topic, int numPartitions, boolean compactTopic) { + public void makeReady(InternalTopicConfig topic, int numPartitions) { boolean topicNotReady = true; while (topicNotReady) { - Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic); + Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic.name()); if (topicMetadata == null) { try { - createTopic(topic, numPartitions, replicationFactor, compactTopic); + createTopic(topic, numPartitions, replicationFactor); } catch (ZkNodeExistsException e) { // ignore and continue } @@ -110,14 +131,14 @@ public class InternalTopicManager { if (topicMetadata.size() > numPartitions) { // else if topic exists with more #.partitions than needed, delete in order to re-create it try { - deleteTopic(topic); + deleteTopic(topic.name()); } catch (ZkNodeExistsException e) { // ignore and continue } } else if (topicMetadata.size() < numPartitions) { // else if topic exists with less #.partitions than needed, add partitions try { - addPartitions(topic, numPartitions - topicMetadata.size(), replicationFactor, topicMetadata); + addPartitions(topic.name(), numPartitions - topicMetadata.size(), replicationFactor, topicMetadata); } catch (ZkNoNodeException e) { // ignore and continue } @@ -163,9 +184,8 @@ public class InternalTopicManager { } } - private void createTopic(String topic, int numPartitions, int replicationFactor, boolean compactTopic) throws ZkNodeExistsException { - log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions); - Properties prop = new Properties(); + private void createTopic(InternalTopicConfig topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException { + log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic.name(), numPartitions); ObjectMapper mapper = new ObjectMapper(); List<Integer> brokers = getBrokers(); int numBrokers = brokers.size(); @@ -185,17 +205,14 @@ public class InternalTopicManager { assignment.put(i, brokerList); } // write out config first just like in AdminUtils.scala createOrUpdateTopicPartitionAssignmentPathInZK() - if (compactTopic) { - prop.put(CLEANUP_POLICY_PROP, COMPACT); - try { - Map<String, Object> dataMap = new HashMap<>(); - dataMap.put("version", 1); - dataMap.put("config", prop); - String data = mapper.writeValueAsString(dataMap); - zkClient.createPersistent(ZK_ENTITY_CONFIG_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE); - } catch (JsonProcessingException e) { - throw new StreamsException("Error while creating topic config in ZK for internal topic " + topic, e); - } + try { + Map<String, Object> dataMap = new HashMap<>(); + dataMap.put("version", 1); + dataMap.put("config", topic.toProperties(windowChangeLogAdditionalRetention)); + String data = mapper.writeValueAsString(dataMap); + zkClient.createPersistent(ZK_ENTITY_CONFIG_PATH + "/" + topic.name(), data, ZooDefs.Ids.OPEN_ACL_UNSAFE); + } catch (JsonProcessingException e) { + throw new StreamsException("Error while creating topic config in ZK for internal topic " + topic, e); } // try to write to ZK with open ACL @@ -205,7 +222,7 @@ public class InternalTopicManager { dataMap.put("partitions", assignment); String data = mapper.writeValueAsString(dataMap); - zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE); + zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic.name(), data, ZooDefs.Ids.OPEN_ACL_UNSAFE); } catch (JsonProcessingException e) { throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + topic, e); } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 00ffb20..a38839f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -44,6 +44,10 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S private final Serde<?> valSerde; private boolean initialized; + private Long timestamp; + private String topic; + private Long offset; + private Integer partition; @SuppressWarnings("unchecked") public ProcessorContextImpl(TaskId id, @@ -136,53 +140,46 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S return stateMgr.getStore(name); } - /** - * @throws IllegalStateException if the task's record is null - */ - @Override - public String topic() { - if (task.record() == null) - throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed"); - String topic = task.record().topic(); - - if (topic.equals(NONEXIST_TOPIC)) + @Override + public synchronized String topic() { + if (topic == null || topic.equals(NONEXIST_TOPIC)) return null; else return topic; } /** - * @throws IllegalStateException if the task's record is null + * @throws IllegalStateException if partition is null */ @Override - public int partition() { - if (task.record() == null) + public synchronized int partition() { + if (partition == null) { throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed"); - - return task.record().partition(); + } + return partition; } /** - * @throws IllegalStateException if the task's record is null + * @throws IllegalStateException if offset is null */ @Override - public long offset() { - if (this.task.record() == null) + public synchronized long offset() { + if (offset == null) { throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed"); - - return this.task.record().offset(); + } + return offset; } /** - * @throws IllegalStateException if the task's record is null + * @throws IllegalStateException if timestamp is null */ @Override - public long timestamp() { - if (task.record() == null) - throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed"); - - return task.record().timestamp; + public synchronized long timestamp() { + if (timestamp == null) { + throw new IllegalStateException("This should not happen as timestamp should be set during record processing"); + } + return timestamp; } @Override @@ -219,4 +216,11 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S public Map<String, Object> appConfigsWithPrefix(String prefix) { return config.originalsWithPrefix(prefix); } + + public synchronized void update(final StampedRecord record) { + this.timestamp = record.timestamp; + this.partition = record.partition(); + this.offset = record.offset(); + this.topic = record.topic(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index bb8379c..65eda80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -52,6 +52,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT; + public class StreamPartitionAssignor implements PartitionAssignor, Configurable { private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class); @@ -93,8 +95,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable private int numStandbyReplicas; private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups; private Map<TopicPartition, Set<TaskId>> partitionToTaskIds; - private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds; - private Map<String, Set<TaskId>> internalSourceTopicToTaskIds; + private Map<InternalTopicConfig, Set<TaskId>> stateChangelogTopicToTaskIds; + private Map<InternalTopicConfig, Set<TaskId>> internalSourceTopicToTaskIds; private Map<TaskId, Set<TopicPartition>> standbyTasks; private InternalTopicManager internalTopicManager; @@ -109,6 +111,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable public void configure(Map<String, ?> configs) { numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); + Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE); if (o == null) { KafkaException ex = new KafkaException("StreamThread is not specified"); @@ -147,7 +150,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) { internalTopicManager = new InternalTopicManager( (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), - configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1); + configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1, + configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ? + (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) + : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT); } else { log.info("stream-thread [{}] Config '{}' isn't supplied and hence no internal topics will be created.", streamThread.getName(), StreamsConfig.ZOOKEEPER_CONNECT_CONFIG); } @@ -176,23 +182,20 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable /** * Internal helper function that creates a Kafka topic * @param topicToTaskIds Map that contains the topic names to be created - * @param compactTopic If true, the topic should be a compacted topic. This is used for - * change log topics usually. * @param postPartitionPhase If true, the computation for calculating the number of partitions * is slightly different. Set to true after the initial topic-to-partition * assignment. * @return */ - private Map<TopicPartition, PartitionInfo> prepareTopic(Map<String, Set<TaskId>> topicToTaskIds, - boolean compactTopic, + private Map<TopicPartition, PartitionInfo> prepareTopic(Map<InternalTopicConfig, Set<TaskId>> topicToTaskIds, boolean postPartitionPhase) { Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>(); // if ZK is specified, prepare the internal source topic before calling partition grouper if (internalTopicManager != null) { log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName()); - for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) { - String topic = entry.getKey(); + for (Map.Entry<InternalTopicConfig, Set<TaskId>> entry : topicToTaskIds.entrySet()) { + InternalTopicConfig topic = entry.getKey(); int numPartitions = 0; if (postPartitionPhase) { // the expected number of partitions is the max value of TaskId.partition + 1 @@ -208,12 +211,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } } - internalTopicManager.makeReady(topic, numPartitions, compactTopic); + internalTopicManager.makeReady(topic, numPartitions); // wait until the topic metadata has been propagated to all brokers List<PartitionInfo> partitions; do { - partitions = streamThread.restoreConsumer.partitionsFor(topic); + partitions = streamThread.restoreConsumer.partitionsFor(topic.name()); } while (partitions == null || partitions.size() != numPartitions); for (PartitionInfo partition : partitions) @@ -223,10 +226,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName()); } else { List<String> missingTopics = new ArrayList<>(); - for (String topic : topicToTaskIds.keySet()) { - List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic); + for (InternalTopicConfig topic : topicToTaskIds.keySet()) { + List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic.name()); if (partitions == null) { - missingTopics.add(topic); + missingTopics.add(topic.name()); } } if (!missingTopics.isEmpty()) { @@ -295,21 +298,21 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable // and enforce the number of partitions for those internal topics. internalSourceTopicToTaskIds = new HashMap<>(); Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>(); - Map<Integer, Set<String>> internalSourceTopicGroups = new HashMap<>(); + Map<Integer, Collection<InternalTopicConfig>> internalSourceTopicGroups = new HashMap<>(); for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics); - internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics); + internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics.values()); } // for all internal source topics // set the number of partitions to the maximum of the depending sub-topologies source topics Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>(); - Set<String> allInternalTopicNames = new HashSet<>(); + Map<String, InternalTopicConfig> allInternalTopics = new HashMap<>(); for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { - Set<String> internalTopics = entry.getValue().interSourceTopics; - allInternalTopicNames.addAll(internalTopics); - for (String internalTopic : internalTopics) { + Map<String, InternalTopicConfig> internalTopics = entry.getValue().interSourceTopics; + allInternalTopics.putAll(internalTopics); + for (InternalTopicConfig internalTopic : internalTopics.values()) { Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic); if (tasks == null) { @@ -317,13 +320,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> other : topicGroups.entrySet()) { Set<String> otherSinkTopics = other.getValue().sinkTopics; - if (otherSinkTopics.contains(internalTopic)) { + if (otherSinkTopics.contains(internalTopic.name())) { for (String topic : other.getValue().sourceTopics) { Integer partitions = null; // It is possible the sourceTopic is another internal topic, i.e, // map().join().join(map()) - if (allInternalTopicNames.contains(topic)) { - Set<TaskId> taskIds = internalSourceTopicToTaskIds.get(topic); + if (allInternalTopics.containsKey(topic)) { + Set<TaskId> taskIds = internalSourceTopicToTaskIds.get(allInternalTopics.get(topic)); if (taskIds != null) { for (TaskId taskId : taskIds) { partitions = taskId.partition; @@ -340,8 +343,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions))); for (int partition = 0; partition < numPartitions; partition++) { - internalPartitionInfos.put(new TopicPartition(internalTopic, partition), - new PartitionInfo(internalTopic, partition, null, new Node[0], new Node[0])); + internalPartitionInfos.put(new TopicPartition(internalTopic.name(), partition), + new PartitionInfo(internalTopic.name(), partition, null, new Node[0], new Node[0])); } } } @@ -353,7 +356,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable metadata.withPartitions(internalPartitionInfos)); - internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, false); + internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false); internalSourceTopicToTaskIds.clear(); metadataWithInternalTopics = metadata; @@ -367,21 +370,23 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable // add tasks to state change log topic subscribers stateChangelogTopicToTaskIds = new HashMap<>(); for (TaskId task : partitionsForTask.keySet()) { - for (String topicName : topicGroups.get(task.topicGroupId).stateChangelogTopics) { - Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topicName); + final Map<String, InternalTopicConfig> stateChangelogTopics = topicGroups.get(task.topicGroupId).stateChangelogTopics; + for (InternalTopicConfig topic : stateChangelogTopics.values()) { + Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topic); if (tasks == null) { tasks = new HashSet<>(); - stateChangelogTopicToTaskIds.put(topicName, tasks); + stateChangelogTopicToTaskIds.put(topic, tasks); } tasks.add(task); } - for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) { - Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName); + final Map<String, InternalTopicConfig> interSourceTopics = topicGroups.get(task.topicGroupId).interSourceTopics; + for (InternalTopicConfig topic : interSourceTopics.values()) { + Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topic); if (tasks == null) { tasks = new HashSet<>(); - internalSourceTopicToTaskIds.put(topicName, tasks); + internalSourceTopicToTaskIds.put(topic, tasks); } tasks.add(task); @@ -461,9 +466,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } // if ZK is specified, validate the internal topics again - prepareTopic(internalSourceTopicToTaskIds, false /* compactTopic */, true); + prepareTopic(internalSourceTopicToTaskIds, /* compactTopic */ true); // change log topics should be compacted - prepareTopic(stateChangelogTopicToTaskIds, true /* compactTopic */, true); + prepareTopic(stateChangelogTopicToTaskIds, /* compactTopic */ true); Map<String, Assignment> assignment = new HashMap<>(); for (AssignmentSupplier assignmentSupplier : assignmentSuppliers) { @@ -563,21 +568,24 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable return metadataWithInternalTopics; } - private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Set<String>> internalTopicGroups, Cluster metadata) { - Set<String> internalTopics = new HashSet<>(); - for (Set<String> topics : internalTopicGroups.values()) - internalTopics.addAll(topics); + private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Collection<InternalTopicConfig>> internalTopicGroups, Cluster metadata) { + Map<String, InternalTopicConfig> internalTopics = new HashMap<>(); + for (Collection<InternalTopicConfig> topics : internalTopicGroups.values()) { + for (InternalTopicConfig topic : topics) { + internalTopics.put(topic.name(), topic); + } + } for (Set<String> copartitionGroup : copartitionGroups) { ensureCopartitioning(copartitionGroup, internalTopics, metadata); } } - private void ensureCopartitioning(Set<String> copartitionGroup, Set<String> internalTopics, Cluster metadata) { + private void ensureCopartitioning(Set<String> copartitionGroup, Map<String, InternalTopicConfig> internalTopics, Cluster metadata) { int numPartitions = -1; for (String topic : copartitionGroup) { - if (!internalTopics.contains(topic)) { + if (!internalTopics.containsKey(topic)) { List<PartitionInfo> infos = metadata.partitionsForTopic(topic); if (infos == null) @@ -594,9 +602,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } if (numPartitions == -1) { - for (String topic : internalTopics) { - if (copartitionGroup.contains(topic)) { - Integer partitions = metadata.partitionCountForTopic(topic); + for (InternalTopicConfig topic : internalTopics.values()) { + if (copartitionGroup.contains(topic.name())) { + Integer partitions = metadata.partitionCountForTopic(topic.name()); if (partitions != null && partitions > numPartitions) { numPartitions = partitions; } @@ -604,8 +612,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } } // enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds - for (String topic : internalTopics) { - if (copartitionGroup.contains(topic)) { + for (InternalTopicConfig topic : internalTopics.values()) { + if (copartitionGroup.contains(topic.name())) { internalSourceTopicToTaskIds .put(topic, Collections.singleton(new TaskId(-1, numPartitions))); } @@ -614,7 +622,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable /* For Test Only */ public Set<TaskId> tasksForState(String stateName) { - return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName)); + final String changeLogName = ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName); + for (InternalTopicConfig internalTopicConfig : stateChangelogTopicToTaskIds.keySet()) { + if (internalTopicConfig.name().equals(changeLogName)) { + return stateChangelogTopicToTaskIds.get(internalTopicConfig); + } + } + return Collections.emptySet(); } public Set<TaskId> tasksForPartition(TopicPartition partition) { http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 18ca0ee..476ec2e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -171,6 +171,7 @@ public class StreamTask extends AbstractTask implements Punctuator { log.debug("task [{}] Start processing one record [{}]", id(), currRecord); + updateContext(currRecord); this.currNode.process(currRecord.key(), currRecord.value()); log.debug("task [{}] Completed processing one record [{}]", id(), currRecord); @@ -226,7 +227,7 @@ public class StreamTask extends AbstractTask implements Punctuator { currNode = node; currRecord = new StampedRecord(DUMMY_RECORD, timestamp); - + updateContext(currRecord); try { node.processor().punctuate(timestamp); } finally { @@ -235,6 +236,10 @@ public class StreamTask extends AbstractTask implements Punctuator { } } + private void updateContext(final StampedRecord record) { + ((ProcessorContextImpl) processorContext).update(record); + } + public StampedRecord record() { return this.currRecord; } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/Stores.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 9f1e53c..03c0d02 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -25,6 +25,8 @@ import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; /** * Factory for creating state stores in Kafka Streams. @@ -44,11 +46,15 @@ public class Stores { return new ValueFactory<K>() { @Override public <V> KeyValueFactory<K, V> withValues(final Serde<V> valueSerde) { + return new KeyValueFactory<K, V>() { + @Override public InMemoryKeyValueFactory<K, V> inMemory() { return new InMemoryKeyValueFactory<K, V>() { private int capacity = Integer.MAX_VALUE; + private final Map<String, String> logConfig = new HashMap<>(); + private boolean logged = true; /** * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2 @@ -62,11 +68,25 @@ public class Stores { } @Override + public InMemoryKeyValueFactory<K, V> enableLogging(final Map<String, String> config) { + logged = true; + logConfig.putAll(config); + return this; + } + + @Override + public InMemoryKeyValueFactory<K, V> disableLogging() { + logged = false; + logConfig.clear(); + return this; + } + + @Override public StateStoreSupplier build() { if (capacity < Integer.MAX_VALUE) { - return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde); + return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig); } - return new InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde); + return new InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig); } }; } @@ -74,9 +94,11 @@ public class Stores { @Override public PersistentKeyValueFactory<K, V> persistent() { return new PersistentKeyValueFactory<K, V>() { + private final Map<String, String> logConfig = new HashMap<>(); private int numSegments = 0; private long retentionPeriod = 0L; private boolean retainDuplicates = false; + private boolean logged = true; @Override public PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int numSegments, boolean retainDuplicates) { @@ -88,15 +110,31 @@ public class Stores { } @Override + public PersistentKeyValueFactory<K, V> enableLogging(final Map<String, String> config) { + logged = true; + logConfig.putAll(config); + return this; + } + + @Override + public PersistentKeyValueFactory<K, V> disableLogging() { + logged = false; + logConfig.clear(); + return this; + } + + @Override public StateStoreSupplier build() { if (numSegments > 0) { - return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde); + return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, logged, logConfig); } - return new RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde); + return new RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig); } }; } + + }; } }; @@ -104,6 +142,7 @@ public class Stores { }; } + public static abstract class StoreFactory { /** * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link String}s. @@ -257,12 +296,7 @@ public class Stores { public abstract <V> KeyValueFactory<K, V> withValues(Serde<V> valueSerde); } - /** - * The interface used to specify the different kinds of key-value stores. - * - * @param <K> the type of keys - * @param <V> the type of values - */ + public interface KeyValueFactory<K, V> { /** * Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be @@ -299,6 +333,23 @@ public class Stores { InMemoryKeyValueFactory<K, V> maxEntries(int capacity); /** + * Indicates that a changelog should be created for the store. The changelog will be created + * with the provided cleanupPolicy and configs. + * + * Note: Any unrecognized configs will be ignored. + * @param config any configs that should be applied to the changelog + * @return the factory to create an in-memory key-value store + */ + InMemoryKeyValueFactory<K, V> enableLogging(final Map<String, String> config); + + /** + * Indicates that a changelog should not be created for the key-value store + * @return the factory to create an in-memory key-value store + */ + InMemoryKeyValueFactory<K, V> disableLogging(); + + + /** * Return the instance of StateStoreSupplier of new key-value store. * @return the state store supplier; never null */ @@ -323,6 +374,22 @@ public class Stores { PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int numSegments, boolean retainDuplicates); /** + * Indicates that a changelog should be created for the store. The changelog will be created + * with the provided cleanupPolicy and configs. + * + * Note: Any unrecognized configs will be ignored. + * @param config any configs that should be applied to the changelog + * @return the factory to create a persistent key-value store + */ + PersistentKeyValueFactory<K, V> enableLogging(final Map<String, String> config); + + /** + * Indicates that a changelog should not be created for the key-value store + * @return the factory to create a persistent key-value store + */ + PersistentKeyValueFactory<K, V> disableLogging(); + + /** * Return the instance of StateStoreSupplier of new key-value store. * @return the key-value store; never null */ http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java new file mode 100644 index 0000000..64d6e07 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.processor.StateStoreSupplier; + +import java.util.Map; + +public abstract class AbstractStoreSupplier<K, V> implements StateStoreSupplier { + protected final String name; + protected final Serde<K> keySerde; + protected final Serde<V> valueSerde; + protected final Time time; + protected final boolean logged; + protected final Map<String, String> logConfig; + + public AbstractStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) { + this.time = time; + this.name = name; + this.valueSerde = valueSerde; + this.keySerde = keySerde; + this.logged = logged; + this.logConfig = logConfig; + } + + public String name() { + return name; + } + + public Map<String, String> logConfig() { + return logConfig; + } + + public boolean loggingEnabled() { + return logged; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java index 3953fd0..c05ebb2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -23,7 +23,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -46,32 +45,21 @@ import java.util.TreeMap; * * @see org.apache.kafka.streams.state.Stores#create(String) */ -public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { +public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> { - private final String name; - private final Time time; - private final Serde<K> keySerde; - private final Serde<V> valueSerde; - public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde) { - this(name, keySerde, valueSerde, null); + public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) { + this(name, keySerde, valueSerde, null, logged, logConfig); } - public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time) { - this.name = name; - this.time = time; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - } - - public String name() { - return name; + public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) { + super(name, keySerde, valueSerde, time, logged, logConfig); } public StateStore get() { MemoryStore<K, V> store = new MemoryStore<>(name, keySerde, valueSerde); - return new MeteredKeyValueStore<>(store.enableLogging(), "in-memory-state", time); + return new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, "in-memory-state", time); } private static class MemoryStore<K, V> implements KeyValueStore<K, V> { http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java index 20a7333..45bcca3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java @@ -19,7 +19,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; + +import java.util.Map; /** * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries. @@ -28,34 +29,22 @@ import org.apache.kafka.streams.processor.StateStoreSupplier; * @param <V> The value type * */ -public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier { +public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> { - private final String name; private final int capacity; - private final Serde<K> keySerde; - private final Serde<V> valueSerde; - private final Time time; - public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde) { - this(name, capacity, keySerde, valueSerde, null); + public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) { + this(name, capacity, keySerde, valueSerde, null, logged, logConfig); } - public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, Time time) { - this.name = name; + public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) { + super(name, keySerde, valueSerde, time, logged, logConfig); this.capacity = capacity; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - this.time = time; - } - - public String name() { - return name; } public StateStore get() { MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<>(name, capacity, keySerde, valueSerde); - InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore<K, V>) cache.enableLogging(); - - return new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time); + return new MeteredKeyValueStore<>(logged ? cache.enableLogging() : cache, "in-memory-lru-state", time); } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java index 16111ad..c10b7e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java @@ -20,7 +20,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; + +import java.util.Map; /** * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database. @@ -30,31 +31,19 @@ import org.apache.kafka.streams.processor.StateStoreSupplier; * * @see org.apache.kafka.streams.state.Stores#create(String) */ -public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier { - - private final String name; - private final Serde<K> keySerde; - private final Serde<V> valueSerde; - private final Time time; - - public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde) { - this(name, keySerde, valueSerde, null); - } +public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> { - public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time) { - this.name = name; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - this.time = time; + public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) { + this(name, keySerde, valueSerde, null, logged, logConfig); } - public String name() { - return name; + public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) { + super(name, keySerde, valueSerde, time, logged, logConfig); } public StateStore get() { RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, valueSerde); - - return new MeteredKeyValueStore<>(store.enableLogging(), "rocksdb-state", time); + return new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, "rocksdb-state", time); } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java index 3a1bd59..107a5e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -20,7 +20,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; + +import java.util.Map; /** * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database. @@ -30,38 +31,30 @@ import org.apache.kafka.streams.processor.StateStoreSupplier; * * @see org.apache.kafka.streams.state.Stores#create(String) */ -public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier { +public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> { - private final String name; private final long retentionPeriod; private final boolean retainDuplicates; private final int numSegments; - private final Serde<K> keySerde; - private final Serde<V> valueSerde; - private final Time time; - public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) { - this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, null); + public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) { + this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, null, logged, logConfig); } - public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, Time time) { - this.name = name; + public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) { + super(name, keySerde, valueSerde, time, logged, logConfig); this.retentionPeriod = retentionPeriod; this.retainDuplicates = retainDuplicates; this.numSegments = numSegments; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - this.time = time; - } - - public String name() { - return name; } public StateStore get() { RocksDBWindowStore<K, V> store = new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde); - return new MeteredWindowStore<>(store.enableLogging(), "rocksdb-window", time); + return new MeteredWindowStore<>(logged ? store.enableLogging() : store, "rocksdb-window", time); } + public long retentionPeriod() { + return retentionPeriod; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java index 3f848fe..41f9ae2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java @@ -109,7 +109,7 @@ public class StoreChangeLogger<K, V> { } for (K k : this.dirty) { V v = getter.get(k); - collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer); + collector.send(new ProducerRecord<>(this.topic, this.partition, context.timestamp(), k, v), keySerializer, valueSerializer); } this.removed.clear(); this.dirty.clear();