Repository: kafka Updated Branches: refs/heads/trunk cfa6a78c7 -> 225b0b9c7
KAFKA-6214: enable use of in-memory store for standby tasks Remove the flag in `ProcessorStateManager` that checks if a store is persistent when registering it as a standby task. Updated the smoke test to use an in-memory store. Author: Damian Guy <[email protected]> Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>, Matthias J. Sax <[email protected]> Closes #4239 from dguy/kafka-6214 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/225b0b9c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/225b0b9c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/225b0b9c Branch: refs/heads/trunk Commit: 225b0b9c712deb3c29f8bca300ba9f73d1084e81 Parents: cfa6a78 Author: Damian Guy <[email protected]> Authored: Tue Nov 21 17:55:30 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Nov 21 17:55:30 2017 -0800 ---------------------------------------------------------------------- .../internals/ProcessorStateManager.java | 7 ++----- .../processor/internals/StandbyTaskTest.java | 18 ++++-------------- .../kafka/streams/tests/SmokeTestClient.java | 9 ++++++--- 3 files changed, 12 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/225b0b9c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index cc14c67..3a2803e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -140,11 +140,8 @@ public class ProcessorStateManager implements StateManager { final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic)); if (isStandby) { - if (store.persistent()) { - log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", store.name(), topic); - - restoreCallbacks.put(topic, stateRestoreCallback); - } + log.trace("Preparing standby replica of state store {} with changelog topic {}", store.name(), topic); + restoreCallbacks.put(topic, stateRestoreCallback); } else { log.trace("Restoring state store {} from changelog topic {}", store.name(), topic); final StateRestorer restorer = new StateRestorer(storePartition, http://git-wip-us.apache.org/repos/asf/kafka/blob/225b0b9c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 396965a..8538567 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -167,8 +167,7 @@ public class StandbyTaskTest { StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory); task.initialize(); - assertEquals(Utils.mkSet(partition2), new HashSet<>(task.checkpointedOffsets().keySet())); - + assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet())); } @SuppressWarnings("unchecked") @@ -190,7 +189,8 @@ public class StandbyTaskTest { StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory); task.initialize(); - restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet())); + final Set<TopicPartition> partition = Collections.singleton(partition2); + restoreStateConsumer.assign(partition); for (ConsumerRecord<Integer, Integer> record : Arrays.asList( new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100), @@ -199,16 +199,7 @@ public class StandbyTaskTest { restoreStateConsumer.bufferRecord(record); } - for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) { - TopicPartition partition = entry.getKey(); - long offset = entry.getValue(); - if (offset >= 0) { - restoreStateConsumer.seek(partition, offset); - } else { - restoreStateConsumer.seekToBeginning(singleton(partition)); - } - } - + restoreStateConsumer.seekToBeginning(partition); task.update(partition2, restoreStateConsumer.poll(100).records(partition2)); StandbyContextImpl context = (StandbyContextImpl) task.context(); @@ -228,7 +219,6 @@ public class StandbyTaskTest { assertEquals(1, offsets.size()); assertEquals(new Long(30L + 1L), offsets.get(partition2)); - } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/225b0b9c/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index b4ed127..887f763 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.tests; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -27,10 +28,12 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.state.Stores; import java.io.File; import java.util.Properties; @@ -186,7 +189,6 @@ public class SmokeTestClient extends SmokeTestUtil { new Unwindow<String, Long>() ).to(stringSerde, longSerde, "sum"); - Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde); KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed); sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum")); @@ -225,8 +227,9 @@ public class SmokeTestClient extends SmokeTestUtil { ).aggregate(agg.init(), agg.adder(), agg.remover(), - longSerde, - "cntByCnt" + Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt")) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long()) ).to(stringSerde, longSerde, "tagg"); final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
