Repository: kafka Updated Branches: refs/heads/trunk 356544cab -> 45e7f7130
KAFKA-2811: add standby tasks guozhangwang * added a new config param "num.standby.replicas" (the default value is 0). * added a new abstract class AbstractTask * added StandbyTask as a subclass of AbstractTask * modified StreamTask to a subclass of AbstractTask * StreamThread * standby tasks are created by calling StreamThread.addStandbyTask() from onPartitionsAssigned() * standby tasks are destroyed by calling StreamThread.removeStandbyTasks() from onPartitionRevoked() * In addStandbyTasks(), change log partitions are assigned to restoreConsumer. * In removeStandByTasks(), change log partitions are removed from restoreConsumer. * StreamThread polls change log records using restoreConsumer in the runLoop with timeout=0. * If records are returned, StreamThread calls StandbyTask.update and pass records to each standby tasks. Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #526 from ymatsuda/standby_task Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45e7f713 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45e7f713 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45e7f713 Branch: refs/heads/trunk Commit: 45e7f71309f9a3e30d25a6ddd3171c67e3e79286 Parents: 356544c Author: Yasuhiro Matsuda <[email protected]> Authored: Mon Nov 16 13:34:42 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Mon Nov 16 13:34:42 2015 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/StreamingConfig.java | 10 + .../processor/internals/AbstractTask.java | 93 +++++++++ .../KafkaStreamingPartitionAssignor.java | 6 +- .../internals/ProcessorStateManager.java | 103 +++++++--- .../processor/internals/StandbyContextImpl.java | 164 ++++++++++++++++ .../processor/internals/StandbyTask.java | 88 +++++++++ .../streams/processor/internals/StreamTask.java | 56 +----- .../processor/internals/StreamThread.java | 143 ++++++++++---- .../KafkaStreamingPartitionAssignorTest.java | 174 +++++++++++++---- .../internals/ProcessorStateManagerTest.java | 93 +++++++-- .../processor/internals/StandbyTaskTest.java | 190 +++++++++++++++++++ .../processor/internals/StreamThreadTest.java | 12 +- 12 files changed, 951 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java index 693cb0c..f563070 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java @@ -54,6 +54,10 @@ public class StreamingConfig extends AbstractConfig { public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads"; private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing."; + /** <code>num.stream.threads</code> */ + public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas"; + private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task."; + /** <code>buffered.records.per.partition</code> */ public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition"; private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition."; @@ -136,6 +140,11 @@ public class StreamingConfig extends AbstractConfig { 1, Importance.LOW, NUM_STREAM_THREADS_DOC) + .define(NUM_STANDBY_REPLICAS_CONFIG, + Type.INT, + 0, + Importance.LOW, + NUM_STANDBY_REPLICAS_DOC) .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, Type.INT, 1000, @@ -214,6 +223,7 @@ public class StreamingConfig extends AbstractConfig { public Map<String, Object> getConsumerConfigs(StreamThread streamThread) { Map<String, Object> props = getConsumerConfigs(); + props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG)); props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName()); return props; http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java new file mode 100644 index 0000000..64bb10d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.TaskId; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Set; + +public abstract class AbstractTask { + protected final TaskId id; + protected final ProcessorTopology topology; + protected final ProcessorStateManager stateMgr; + protected final Set<TopicPartition> partitions; + protected ProcessorContext processorContext; + + protected AbstractTask(TaskId id, + Consumer<byte[], byte[]> restoreConsumer, + ProcessorTopology topology, + StreamingConfig config, + Set<TopicPartition> partitions) { + this.id = id; + this.topology = topology; + this.partitions = partitions; + + // create the processor state manager + try { + File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString()); + // if partitions is null, this is a standby task + this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, partitions == null); + } catch (IOException e) { + throw new KafkaException("Error while creating the state manager", e); + } + } + + protected void initializeStateStores() { + for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) { + StateStore store = stateStoreSupplier.get(); + store.init(this.processorContext); + } + } + + public final TaskId id() { + return id; + } + + public final Set<TopicPartition> partitions() { + return this.partitions; + } + + public final ProcessorTopology topology() { + return topology; + } + + public final ProcessorContext context() { + return processorContext; + } + + public abstract void commit(); + + public void close() { + try { + stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); + } catch (IOException e) { + throw new KafkaException("Error while closing the state manager in processor context", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java index 35ba0ec..451b214 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java @@ -46,11 +46,14 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class); private StreamThread streamThread; + private int numStandbyReplicas; private Map<TopicPartition, Set<TaskId>> partitionToTaskIds; private Set<TaskId> standbyTasks; @Override public void configure(Map<String, ?> configs) { + numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG); + Object o = configs.get(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE); if (o == null) { KafkaException ex = new KafkaException("StreamThread is not specified"); @@ -99,7 +102,6 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi // - We try not to assign the same set of tasks to two different clients // We do the assignment in one-pass. The result may not satisfy above all. // 2. within each client, tasks are assigned to consumer clients in round-robin manner. - Map<UUID, Set<String>> consumersByClient = new HashMap<>(); Map<UUID, ClientState<TaskId>> states = new HashMap<>(); @@ -132,7 +134,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi // Get partition groups from the partition grouper Map<TaskId, Set<TopicPartition>> partitionGroups = streamThread.partitionGrouper.partitionGroups(metadata); - states = TaskAssignor.assign(states, partitionGroups.keySet(), 0); // TODO: enable standby tasks + states = TaskAssignor.assign(states, partitionGroups.keySet(), numStandbyReplicas); Map<String, Assignment> assignment = new HashMap<>(); for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 3cb9cea..2a8df9e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -35,6 +35,7 @@ import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; public class ProcessorStateManager { @@ -51,13 +52,17 @@ public class ProcessorStateManager { private final Consumer<byte[], byte[]> restoreConsumer; private final Map<TopicPartition, Long> restoredOffsets; private final Map<TopicPartition, Long> checkpointedOffsets; + private final boolean isStandby; + private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks - public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer) throws IOException { + public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException { this.partition = partition; this.baseDir = baseDir; this.stores = new HashMap<>(); this.restoreConsumer = restoreConsumer; this.restoredOffsets = new HashMap<>(); + this.isStandby = isStandby; + this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null; // create the state directory for this task if missing (we won't create the parent directory) createStateDirectory(baseDir); @@ -103,8 +108,6 @@ public class ProcessorStateManager { if (this.stores.containsKey(store.name())) throw new IllegalArgumentException("Store " + store.name() + " has already been registered."); - // ---- register the store ---- // - // check that the underlying change log topic exist or not if (restoreConsumer.listTopics().containsKey(store.name())) { boolean partitionNotFound = true; @@ -124,48 +127,91 @@ public class ProcessorStateManager { this.stores.put(store.name(), store); + if (isStandby) { + if (store.persistent()) + restoreCallbacks.put(store.name(), stateRestoreCallback); + } else { + restoreActiveState(store, stateRestoreCallback); + } + } + + private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback) { + + if (store == null) + throw new IllegalArgumentException("Store " + store.name() + " has not been registered."); + // ---- try to restore the state from change-log ---- // // subscribe to the store's partition - TopicPartition storePartition = new TopicPartition(store.name(), partition); if (!restoreConsumer.subscription().isEmpty()) { throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand"); } + TopicPartition storePartition = new TopicPartition(store.name(), partition); restoreConsumer.assign(Collections.singletonList(storePartition)); - // calculate the end offset of the partition - // TODO: this is a bit hacky to first seek then position to get the end offset - restoreConsumer.seekToEnd(storePartition); - long endOffset = restoreConsumer.position(storePartition); + try { + // calculate the end offset of the partition + // TODO: this is a bit hacky to first seek then position to get the end offset + restoreConsumer.seekToEnd(storePartition); + long endOffset = restoreConsumer.position(storePartition); + + // restore from the checkpointed offset of the change log if it is persistent and the offset exists; + // restore the state from the beginning of the change log otherwise + if (checkpointedOffsets.containsKey(storePartition)) { + restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition)); + } else { + restoreConsumer.seekToBeginning(storePartition); + } - // restore from the checkpointed offset of the change log if it is persistent and the offset exists; - // restore the state from the beginning of the change log otherwise - if (checkpointedOffsets.containsKey(storePartition) && store.persistent()) { - restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition)); - } else { - restoreConsumer.seekToBeginning(storePartition); - } + // restore its state from changelog records; while restoring the log end offset + // should not change since it is only written by this thread. + while (true) { + for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) { + stateRestoreCallback.restore(record.key(), record.value()); + } - // restore its state from changelog records; while restoring the log end offset - // should not change since it is only written by this thread. - while (true) { - for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) { - stateRestoreCallback.restore(record.key(), record.value()); + if (restoreConsumer.position(storePartition) == endOffset) { + break; + } else if (restoreConsumer.position(storePartition) > endOffset) { + throw new IllegalStateException("Log end offset should not change while restoring"); + } } - if (restoreConsumer.position(storePartition) == endOffset) { - break; - } else if (restoreConsumer.position(storePartition) > endOffset) { - throw new IllegalStateException("Log end offset should not change while restoring"); + // record the restored offset for its change log partition + long newOffset = restoreConsumer.position(storePartition); + restoredOffsets.put(storePartition, newOffset); + } finally { + // un-assign the change log partition + restoreConsumer.assign(Collections.<TopicPartition>emptyList()); + } + } + + public Map<TopicPartition, Long> checkpointedOffsets() { + Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>(); + + for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) { + String storeName = entry.getKey(); + TopicPartition storePartition = new TopicPartition(storeName, partition); + + if (checkpointedOffsets.containsKey(storePartition)) { + partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition)); + } else { + partitionsAndOffsets.put(storePartition, -1L); } } + return partitionsAndOffsets; + } + + public void updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) { + // restore states from changelog records + StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic()); + for (ConsumerRecord<byte[], byte[]> record : records) { + restoreCallback.restore(record.key(), record.value()); + } // record the restored offset for its change log partition long newOffset = restoreConsumer.position(storePartition); restoredOffsets.put(storePartition, newOffset); - - // un-assign the change log partition - restoreConsumer.assign(Collections.<TopicPartition>emptyList()); } public StateStore getStore(String name) { @@ -224,6 +270,9 @@ public class ProcessorStateManager { checkpoint.write(checkpointOffsets); } + // un-assign the change log partition + restoreConsumer.assign(Collections.<TopicPartition>emptyList()); + // release the state directory directoryLock directoryLock.release(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java new file mode 100644 index 0000000..ea95300 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public class StandbyContextImpl implements ProcessorContext, RecordCollector.Supplier { + + private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class); + + private final TaskId id; + private final StreamingMetrics metrics; + private final ProcessorStateManager stateMgr; + + private final Serializer<?> keySerializer; + private final Serializer<?> valSerializer; + private final Deserializer<?> keyDeserializer; + private final Deserializer<?> valDeserializer; + + private boolean initialized; + + public StandbyContextImpl(TaskId id, + StreamingConfig config, + ProcessorStateManager stateMgr, + StreamingMetrics metrics) { + this.id = id; + this.metrics = metrics; + this.stateMgr = stateMgr; + + this.keySerializer = config.keySerializer(); + this.valSerializer = config.valueSerializer(); + this.keyDeserializer = config.keyDeserializer(); + this.valDeserializer = config.valueDeserializer(); + + this.initialized = false; + } + + public void initialized() { + this.initialized = true; + } + + public TaskId id() { + return id; + } + + public ProcessorStateManager getStateMgr() { + return stateMgr; + } + + @Override + public RecordCollector recordCollector() { + throw new UnsupportedOperationException(); + } + + @Override + public Serializer<?> keySerializer() { + return this.keySerializer; + } + + @Override + public Serializer<?> valueSerializer() { + return this.valSerializer; + } + + @Override + public Deserializer<?> keyDeserializer() { + return this.keyDeserializer; + } + + @Override + public Deserializer<?> valueDeserializer() { + return this.valDeserializer; + } + + @Override + public File stateDir() { + return stateMgr.baseDir(); + } + + @Override + public StreamingMetrics metrics() { + return metrics; + } + + @Override + public void register(StateStore store, StateRestoreCallback stateRestoreCallback) { + if (initialized) + throw new KafkaException("Can only create state stores during initialization."); + + stateMgr.register(store, stateRestoreCallback); + } + + @Override + public StateStore getStateStore(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public String topic() { + throw new UnsupportedOperationException(); + } + + @Override + public int partition() { + throw new UnsupportedOperationException(); + } + + @Override + public long offset() { + throw new UnsupportedOperationException(); + } + + @Override + public long timestamp() { + throw new UnsupportedOperationException(); + } + + @Override + public <K, V> void forward(K key, V value) { + throw new UnsupportedOperationException(); + } + + @Override + public <K, V> void forward(K key, V value, int childIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public void commit() { + throw new UnsupportedOperationException(); + } + + @Override + public void schedule(long interval) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java new file mode 100644 index 0000000..c6442d9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A StandbyTask + */ +public class StandbyTask extends AbstractTask { + + private static final Logger log = LoggerFactory.getLogger(StandbyTask.class); + + private final Map<TopicPartition, Long> checkpointedOffsets; + + /** + * Create {@link StandbyTask} with its assigned partitions + * + * @param id the ID of this task + * @param restoreConsumer the instance of {@link Consumer} used when restoring state + * @param topology the instance of {@link ProcessorTopology} + * @param config the {@link StreamingConfig} specified by the user + * @param metrics the {@link StreamingMetrics} created by the thread + */ + public StandbyTask(TaskId id, + Consumer<byte[], byte[]> restoreConsumer, + ProcessorTopology topology, + StreamingConfig config, + StreamingMetrics metrics) { + super(id, restoreConsumer, topology, config, null); + + // initialize the topology with its own context + this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); + + initializeStateStores(); + + ((StandbyContextImpl) this.processorContext).initialized(); + + this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets()); + } + + public Map<TopicPartition, Long> checkpointedOffsets() { + return checkpointedOffsets; + } + + public Collection<TopicPartition> changeLogPartitions() { + return checkpointedOffsets.keySet(); + } + + /** + * Updates a state store using records from one change log partition + */ + public void update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) { + stateMgr.updateStandbyStates(partition, records); + } + + public void commit() { + stateMgr.flush(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index a9c14e5..5d170f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -21,46 +21,37 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.streams.StreamingMetrics; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing. */ -public class StreamTask implements Punctuator { +public class StreamTask extends AbstractTask implements Punctuator { private static final Logger log = LoggerFactory.getLogger(StreamTask.class); - private final TaskId id; private final int maxBufferedSize; private final Consumer consumer; private final PartitionGroup partitionGroup; private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo(); private final PunctuationQueue punctuationQueue; - private final ProcessorContextImpl processorContext; - private final ProcessorTopology topology; private final Map<TopicPartition, Long> consumedOffsets; private final RecordCollector recordCollector; - private final ProcessorStateManager stateMgr; private boolean commitRequested = false; private boolean commitOffsetNeeded = false; @@ -89,12 +80,10 @@ public class StreamTask implements Punctuator { ProcessorTopology topology, StreamingConfig config, StreamingMetrics metrics) { - - this.id = id; + super(id, restoreConsumer, topology, config, Collections.unmodifiableSet(new HashSet<>(partitions))); this.consumer = consumer; this.punctuationQueue = new PunctuationQueue(); this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); - this.topology = topology; // create queues for each assigned partition and associate them // to corresponding source nodes in the processor topology @@ -117,22 +106,11 @@ public class StreamTask implements Punctuator { log.info("Creating restoration consumer client for stream task #" + id()); - // create the processor state manager - try { - File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString()); - this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer); - } catch (IOException e) { - throw new KafkaException("Error while creating the state manager", e); - } - // initialize the topology with its own context this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics); // initialize the state stores - for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) { - StateStore store = stateStoreSupplier.get(); - store.init(this.processorContext); - } + initializeStateStores(); // initialize the task by initializing all its processor nodes in the topology for (ProcessorNode node : this.topology.processors()) { @@ -144,15 +122,7 @@ public class StreamTask implements Punctuator { } } - this.processorContext.initialized(); - } - - public TaskId id() { - return id; - } - - public Set<TopicPartition> partitions() { - return this.partitionGroup.partitions(); + ((ProcessorContextImpl) this.processorContext).initialized(); } /** @@ -261,10 +231,6 @@ public class StreamTask implements Punctuator { return this.currNode; } - public ProcessorTopology topology() { - return this.topology; - } - /** * Commit the current task state */ @@ -335,11 +301,7 @@ public class StreamTask implements Punctuator { if (exception != null) throw exception; - try { - stateMgr.close(recordCollector.offsets()); - } catch (IOException e) { - throw new KafkaException("Error while closing the state manager in processor context", e); - } + super.close(); } private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) { @@ -371,8 +333,4 @@ public class StreamTask implements Punctuator { } } - public ProcessorContext context() { - return processorContext; - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 06e5951..bbaeb14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -78,7 +78,8 @@ public class StreamThread extends Thread { protected final Consumer<byte[], byte[]> restoreConsumer; private final AtomicBoolean running; - private final Map<TaskId, StreamTask> tasks; + private final Map<TaskId, StreamTask> activeTasks; + private final Map<TaskId, StandbyTask> standbyTasks; private final Set<TaskId> prevTasks; private final String clientId; private final Time time; @@ -96,14 +97,16 @@ public class StreamThread extends Thread { final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() { @Override public void onPartitionsAssigned(Collection<TopicPartition> assignment) { - addPartitions(assignment); + addStreamTasks(assignment); + addStandbyTasks(); lastClean = time.milliseconds(); // start the cleaning cycle } @Override public void onPartitionsRevoked(Collection<TopicPartition> assignment) { commitAll(); - removePartitions(); + removeStreamTasks(); + removeStandbyTasks(); lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned } }; @@ -141,7 +144,8 @@ public class StreamThread extends Thread { this.restoreConsumer = (restoreConsumer != null) ? restoreConsumer : createRestoreConsumer(); // initialize the task list - this.tasks = new HashMap<>(); + this.activeTasks = new HashMap<>(); + this.standbyTasks = new HashMap<>(); this.prevTasks = new HashSet<>(); // read in task specific config values @@ -208,7 +212,7 @@ public class StreamThread extends Thread { } public Map<TaskId, StreamTask> tasks() { - return Collections.unmodifiableMap(tasks); + return Collections.unmodifiableMap(activeTasks); } private void shutdown() { @@ -236,7 +240,8 @@ public class StreamThread extends Thread { log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e); } try { - removePartitions(); + removeStreamTasks(); + removeStandbyTasks(); } catch (Throwable e) { // already logged in removePartition() } @@ -261,7 +266,7 @@ public class StreamThread extends Thread { ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0); if (!records.isEmpty()) { - for (StreamTask task : tasks.values()) { + for (StreamTask task : activeTasks.values()) { for (TopicPartition partition : task.partitions()) { task.addRecords(partition, records.records(partition)); } @@ -274,11 +279,11 @@ public class StreamThread extends Thread { totalNumBuffered = 0; - if (!tasks.isEmpty()) { + if (!activeTasks.isEmpty()) { // try to process one record from each task requiresPoll = false; - for (StreamTask task : tasks.values()) { + for (StreamTask task : activeTasks.values()) { long startProcess = time.milliseconds(); totalNumBuffered += task.process(); @@ -294,6 +299,10 @@ public class StreamThread extends Thread { requiresPoll = true; } + if (!standbyTasks.isEmpty()) { + updateStandbyTasks(); + } + maybeClean(); } } catch (Exception e) { @@ -301,6 +310,18 @@ public class StreamThread extends Thread { } } + private void updateStandbyTasks() { + ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0); + + if (!records.isEmpty()) { + for (StandbyTask task : standbyTasks.values()) { + for (TopicPartition partition : task.changeLogPartitions()) { + task.update(partition, records.records(partition)); + } + } + } + } + private boolean stillRunning() { if (!running.get()) { log.debug("Shutting down at user request."); @@ -316,7 +337,7 @@ public class StreamThread extends Thread { } private void maybePunctuate() { - for (StreamTask task : tasks.values()) { + for (StreamTask task : activeTasks.values()) { try { long now = time.milliseconds(); @@ -324,7 +345,7 @@ public class StreamThread extends Thread { sensors.punctuateTimeSensor.record(time.milliseconds() - now); } catch (Exception e) { - log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e); + log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e); throw e; } } @@ -339,12 +360,12 @@ public class StreamThread extends Thread { commitAll(); lastCommit = now; } else { - for (StreamTask task : tasks.values()) { + for (StreamTask task : activeTasks.values()) { try { if (task.commitNeeded()) commitOne(task, time.milliseconds()); } catch (Exception e) { - log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e); + log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e); throw e; } } @@ -355,24 +376,22 @@ public class StreamThread extends Thread { * Commit the states of all its tasks */ private void commitAll() { - for (StreamTask task : tasks.values()) { - try { - commitOne(task, time.milliseconds()); - } catch (Exception e) { - log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e); - throw e; - } + for (StreamTask task : activeTasks.values()) { + commitOne(task, time.milliseconds()); + } + for (StandbyTask task : standbyTasks.values()) { + commitOne(task, time.milliseconds()); } } /** * Commit the state of a task */ - private void commitOne(StreamTask task, long now) { + private void commitOne(AbstractTask task, long now) { try { task.commit(); } catch (Exception e) { - log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e); + log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e); throw e; } @@ -426,7 +445,7 @@ public class StreamThread extends Thread { * Returns ids of tasks that were being executed before the rebalance. */ public Set<TaskId> prevTasks() { - return prevTasks; + return Collections.unmodifiableSet(prevTasks); } /** @@ -467,8 +486,7 @@ public class StreamThread extends Thread { return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors); } - private void addPartitions(Collection<TopicPartition> assignment) { - + private void addStreamTasks(Collection<TopicPartition> assignment) { HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>(); for (TopicPartition partition : assignment) { @@ -486,9 +504,9 @@ public class StreamThread extends Thread { // create the tasks for (TaskId taskId : partitionsForTask.keySet()) { try { - tasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId))); + activeTasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId))); } catch (Exception e) { - log.error("Failed to create a task #" + taskId + " in thread [" + this.getName() + "]: ", e); + log.error("Failed to create an active task #" + taskId + " in thread [" + this.getName() + "]: ", e); throw e; } } @@ -496,23 +514,68 @@ public class StreamThread extends Thread { lastClean = time.milliseconds(); } - private void removePartitions() { - + private void removeStreamTasks() { // TODO: change this clearing tasks behavior - for (StreamTask task : tasks.values()) { - log.info("Removing task {}", task.id()); - try { - task.close(); - } catch (Exception e) { - log.error("Failed to close a task #" + task.id() + " in thread [" + this.getName() + "]: ", e); - throw e; - } - sensors.taskDestructionSensor.record(); + for (StreamTask task : activeTasks.values()) { + closeOne(task); } + prevTasks.clear(); - prevTasks.addAll(tasks.keySet()); + prevTasks.addAll(activeTasks.keySet()); + + activeTasks.clear(); + } + + private void closeOne(AbstractTask task) { + log.info("Removing a task {}", task.id()); + try { + task.close(); + } catch (Exception e) { + log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e); + throw e; + } + sensors.taskDestructionSensor.record(); + } + + protected StandbyTask createStandbyTask(TaskId id) { + sensors.taskCreationSensor.record(); + + ProcessorTopology topology = builder.build(id.topicGroupId); + + return new StandbyTask(id, restoreConsumer, topology, config, sensors); + } + + private void addStandbyTasks() { + Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>(); + + for (TaskId taskId : partitionGrouper.standbyTasks()) { + StandbyTask task = createStandbyTask(taskId); + standbyTasks.put(taskId, task); + checkpointedOffsets.putAll(task.checkpointedOffsets()); + } + + restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet())); + + for (Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) { + TopicPartition partition = entry.getKey(); + long offset = entry.getValue(); + if (offset >= 0) { + restoreConsumer.seek(partition, offset); + } else { + restoreConsumer.seekToBeginning(partition); + } + } + } + + + private void removeStandbyTasks() { + for (StandbyTask task : standbyTasks.values()) { + closeOne(task); + } + // un-assign the change log partitions + restoreConsumer.assign(Collections.<TopicPartition>emptyList()); - tasks.clear(); + standbyTasks.clear(); } private void ensureCopartitioning(Collection<Set<String>> copartitionGroups) { http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java index c0df2e7..aa484fc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java @@ -17,12 +17,10 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.producer.MockProducer; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -42,7 +40,6 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -110,26 +107,6 @@ public class KafkaStreamingPartitionAssignorTest { }; } - private static class TestStreamTask extends StreamTask { - public boolean committed = false; - - public TestStreamTask(TaskId id, - Consumer<byte[], byte[]> consumer, - Producer<byte[], byte[]> producer, - Consumer<byte[], byte[]> restoreConsumer, - Collection<TopicPartition> partitions, - ProcessorTopology topology, - StreamingConfig config) { - super(id, consumer, producer, restoreConsumer, partitions, topology, config, null); - } - - @Override - public void commit() { - super.commit(); - committed = true; - } - } - private ByteArraySerializer serializer = new ByteArraySerializer(); @SuppressWarnings("unchecked") @@ -165,9 +142,7 @@ public class KafkaStreamingPartitionAssignorTest { }; KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure( - Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread) - ); + partitionAssignor.configure(config.getConsumerConfigs(thread)); PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); @@ -193,6 +168,103 @@ public class KafkaStreamingPartitionAssignorTest { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); + List<String> topics = Utils.mkList("topic1", "topic2"); + Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + + final Set<TaskId> prevTasks10 = Utils.mkSet(task0); + final Set<TaskId> prevTasks11 = Utils.mkSet(task1); + final Set<TaskId> prevTasks20 = Utils.mkSet(task2); + final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); + final Set<TaskId> standbyTasks11 = Utils.mkSet(task2); + final Set<TaskId> standbyTasks20 = Utils.mkSet(task0); + + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + + StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime()); + + KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread10)); + + Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); + subscriptions.put("consumer10", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode())); + subscriptions.put("consumer11", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode())); + subscriptions.put("consumer20", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode())); + + Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); + + // check assigned partitions + + assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)), + Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions()))); + assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions())); + + // check assignment info + Set<TaskId> allActiveTasks = new HashSet<>(); + AssignmentInfo info; + + List<TaskId> activeTasks = new ArrayList<>(); + for (TopicPartition partition : assignments.get("consumer10").partitions()) { + activeTasks.add(new TaskId(0, partition.partition())); + } + info = AssignmentInfo.decode(assignments.get("consumer10").userData()); + assertEquals(activeTasks, info.activeTasks); + assertEquals(2, info.activeTasks.size()); + assertEquals(1, new HashSet<>(info.activeTasks).size()); + assertEquals(0, info.standbyTasks.size()); + + allActiveTasks.addAll(info.activeTasks); + + activeTasks.clear(); + for (TopicPartition partition : assignments.get("consumer11").partitions()) { + activeTasks.add(new TaskId(0, partition.partition())); + } + info = AssignmentInfo.decode(assignments.get("consumer11").userData()); + assertEquals(activeTasks, info.activeTasks); + assertEquals(2, info.activeTasks.size()); + assertEquals(1, new HashSet<>(info.activeTasks).size()); + assertEquals(0, info.standbyTasks.size()); + + allActiveTasks.addAll(info.activeTasks); + + // check active tasks assigned to the first client + assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); + + activeTasks.clear(); + for (TopicPartition partition : assignments.get("consumer20").partitions()) { + activeTasks.add(new TaskId(0, partition.partition())); + } + info = AssignmentInfo.decode(assignments.get("consumer20").userData()); + assertEquals(activeTasks, info.activeTasks); + assertEquals(2, info.activeTasks.size()); + assertEquals(1, new HashSet<>(info.activeTasks).size()); + assertEquals(0, info.standbyTasks.size()); + + allActiveTasks.addAll(info.activeTasks); + + assertEquals(3, allActiveTasks.size()); + assertEquals(allTasks, new HashSet<>(allActiveTasks)); + } + + @Test + public void testAssignWithStandbyReplicas() throws Exception { + Properties props = configProps(); + props.setProperty(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); + StreamingConfig config = new StreamingConfig(props); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + builder.addSource("source2", "topic2"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); + List<String> topics = Utils.mkList("topic1", "topic2"); + Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); final Set<TaskId> prevTasks10 = Utils.mkSet(task0); final Set<TaskId> prevTasks11 = Utils.mkSet(task1); @@ -207,17 +279,15 @@ public class KafkaStreamingPartitionAssignorTest { StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime()); KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure( - Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread10) - ); + partitionAssignor.configure(config.getConsumerConfigs(thread10)); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(Utils.mkList("topic1", "topic2"), new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode())); + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(Utils.mkList("topic1", "topic2"), new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode())); + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode())); subscriptions.put("consumer20", - new PartitionAssignor.Subscription(Utils.mkList("topic1", "topic2"), new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode())); + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode())); Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); @@ -228,24 +298,54 @@ public class KafkaStreamingPartitionAssignorTest { assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions())); // check assignment info + Set<TaskId> allActiveTasks = new HashSet<>(); + Set<TaskId> allStandbyTasks = new HashSet<>(); + AssignmentInfo info; List<TaskId> activeTasks = new ArrayList<>(); for (TopicPartition partition : assignments.get("consumer10").partitions()) { activeTasks.add(new TaskId(0, partition.partition())); } - assertEquals(activeTasks, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks); + info = AssignmentInfo.decode(assignments.get("consumer10").userData()); + assertEquals(activeTasks, info.activeTasks); + assertEquals(2, info.activeTasks.size()); + assertEquals(1, new HashSet<>(info.activeTasks).size()); + + allActiveTasks.addAll(info.activeTasks); + allStandbyTasks.addAll(info.standbyTasks); activeTasks.clear(); for (TopicPartition partition : assignments.get("consumer11").partitions()) { activeTasks.add(new TaskId(0, partition.partition())); } - assertEquals(activeTasks, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks); + info = AssignmentInfo.decode(assignments.get("consumer11").userData()); + assertEquals(activeTasks, info.activeTasks); + assertEquals(2, info.activeTasks.size()); + assertEquals(1, new HashSet<>(info.activeTasks).size()); + + allActiveTasks.addAll(info.activeTasks); + allStandbyTasks.addAll(info.standbyTasks); + + // check tasks assigned to the first client + assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); activeTasks.clear(); for (TopicPartition partition : assignments.get("consumer20").partitions()) { activeTasks.add(new TaskId(0, partition.partition())); } - assertEquals(activeTasks, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks); + info = AssignmentInfo.decode(assignments.get("consumer20").userData()); + assertEquals(activeTasks, info.activeTasks); + assertEquals(2, info.activeTasks.size()); + assertEquals(1, new HashSet<>(info.activeTasks).size()); + + allActiveTasks.addAll(info.activeTasks); + allStandbyTasks.addAll(info.standbyTasks); + + assertEquals(3, allActiveTasks.size()); + assertEquals(allTasks, new HashSet<>(allActiveTasks)); + + assertEquals(3, allStandbyTasks.size()); + assertEquals(allTasks, new HashSet<>(allStandbyTasks)); } @Test @@ -266,9 +366,7 @@ public class KafkaStreamingPartitionAssignorTest { StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime()); KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure( - Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread) - ); + partitionAssignor.configure(config.getConsumerConfigs(thread)); List<TaskId> activeTaskList = Utils.mkList(task0, task3); Set<TaskId> standbyTasks = Utils.mkSet(task1, task2); http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index c447f99..17bc9da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.nio.channels.FileLock; import java.nio.file.Files; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -50,7 +49,7 @@ import static org.junit.Assert.assertFalse; public class ProcessorStateManagerTest { - private class MockRestoreConsumer extends MockConsumer<byte[], byte[]> { + public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> { private final Serializer<Integer> serializer = new IntegerSerializer(); public TopicPartition assignedPartition = null; @@ -79,7 +78,7 @@ public class ProcessorStateManagerTest { recordBuffer.clear(); } - // buffer a record (we cannot use addRecord because we need to add records before asigning a partition) + // buffer a record (we cannot use addRecord because we need to add records before assigning a partition) public void bufferRecord(ConsumerRecord<Integer, Integer> record) { recordBuffer.add( new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), @@ -186,7 +185,7 @@ public class ProcessorStateManagerTest { FileLock lock; // the state manager locks the directory - ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer()); + ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false); try { // this should not get the lock @@ -215,7 +214,7 @@ public class ProcessorStateManagerTest { try { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false); - ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer()); + ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false); try { stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback); } finally { @@ -235,21 +234,24 @@ public class ProcessorStateManagerTest { checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset)); MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - restoreConsumer.updatePartitions("persistentStore", Arrays.asList( + restoreConsumer.updatePartitions("persistentStore", Utils.mkList( new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("persistentStore", 2, Node.noNode(), new Node[0], new Node[0]) )); - restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L)); + + TopicPartition partition = new TopicPartition("persistentStore", 2); + restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L)); MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer); + ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false); try { restoreConsumer.reset(); ArrayList<Integer> expectedKeys = new ArrayList<>(); + long offset = -1L; for (int i = 1; i <= 3; i++) { - long offset = (long) i; + offset = (long) i; int key = i * 10; expectedKeys.add(key); restoreConsumer.bufferRecord( @@ -283,21 +285,24 @@ public class ProcessorStateManagerTest { checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset)); MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList( + restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList( new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("nonPersistentStore", 2, Node.noNode(), new Node[0], new Node[0]) )); - restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L)); + + TopicPartition partition = new TopicPartition("persistentStore", 2); + restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L)); MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer); + ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false); try { restoreConsumer.reset(); ArrayList<Integer> expectedKeys = new ArrayList<>(); + long offset = -1L; for (int i = 1; i <= 3; i++) { - long offset = (long) (i + 100); + offset = (long) (i + 100); int key = i; expectedKeys.add(i); restoreConsumer.bufferRecord( @@ -312,6 +317,7 @@ public class ProcessorStateManagerTest { assertTrue(restoreConsumer.seekToBeginingCalled); assertTrue(restoreConsumer.seekToEndCalled); assertEquals(expectedKeys, nonPersistentStore.keys); + } finally { stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); } @@ -321,17 +327,68 @@ public class ProcessorStateManagerTest { } @Test + public void testChangeLogOffsets() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + long lastCheckpointedOffset = 10L; + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); + checkpoint.write(Collections.singletonMap(new TopicPartition("store1", 0), lastCheckpointedOffset)); + + MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + restoreConsumer.updatePartitions("store1", Utils.mkList( + new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0]) + )); + restoreConsumer.updatePartitions("store2", Utils.mkList( + new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0]) + )); + + TopicPartition partition1 = new TopicPartition("store1", 0); + TopicPartition partition2 = new TopicPartition("store2", 0); + + Map<TopicPartition, Long> endOffsets = new HashMap<>(); + endOffsets.put(partition1, 13L); + endOffsets.put(partition2, 17L); + restoreConsumer.updateEndOffsets(endOffsets); + + MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore("store1", true); + MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore("store2", true); + + ProcessorStateManager stateMgr = new ProcessorStateManager(0, baseDir, restoreConsumer, true); // standby + try { + restoreConsumer.reset(); + + stateMgr.register(store1, store1.stateRestoreCallback); + stateMgr.register(store2, store2.stateRestoreCallback); + + Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets(); + + assertEquals(2, changeLogOffsets.size()); + assertTrue(changeLogOffsets.containsKey(partition1)); + assertTrue(changeLogOffsets.containsKey(partition2)); + assertEquals(lastCheckpointedOffset, (long) changeLogOffsets.get(partition1)); + assertEquals(-1L, (long) changeLogOffsets.get(partition2)); + + } finally { + stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); + } + + } finally { + Utils.delete(baseDir); + } + } + + @Test public void testGetStore() throws IOException { File baseDir = Files.createTempDirectory("test").toFile(); try { MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - restoreConsumer.updatePartitions("mockStore", Arrays.asList( + restoreConsumer.updatePartitions("mockStore", Utils.mkList( new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0]) )); MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false); - ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer); + ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false); try { stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback); @@ -356,10 +413,10 @@ public class ProcessorStateManagerTest { oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap()); MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - restoreConsumer.updatePartitions("persistentStore", Arrays.asList( + restoreConsumer.updatePartitions("persistentStore", Utils.mkList( new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]) )); - restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList( + restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList( new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]) )); @@ -372,7 +429,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); - ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer); + ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false); try { // make sure the checkpoint file is deleted assertFalse(checkpointFile.exists()); http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java new file mode 100644 index 0000000..b8a6990 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.OffsetCheckpoint; +import org.apache.kafka.test.MockStateStoreSupplier; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class StandbyTaskTest { + + private final TaskId taskId = new TaskId(0, 1); + + private final Serializer<Integer> intSerializer = new IntegerSerializer(); + + private final TopicPartition partition1 = new TopicPartition("store1", 1); + private final TopicPartition partition2 = new TopicPartition("store2", 1); + + private final ProcessorTopology topology = new ProcessorTopology( + Collections.<ProcessorNode>emptyList(), + Collections.<String, SourceNode>emptyMap(), + Utils.<StateStoreSupplier>mkList( + new MockStateStoreSupplier(partition1.topic(), false), + new MockStateStoreSupplier(partition2.topic(), true) + ) + ); + + + private StreamingConfig createConfig(final File baseDir) throws Exception { + return new StreamingConfig(new Properties() { + { + setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); + setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); + setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); + } + }); + } + + private final ProcessorStateManagerTest.MockRestoreConsumer restoreStateConsumer = new ProcessorStateManagerTest.MockRestoreConsumer(); + + private final byte[] recordValue = intSerializer.serialize(null, 10); + private final byte[] recordKey = intSerializer.serialize(null, 1); + + @Before + public void setup() { + restoreStateConsumer.reset(); + restoreStateConsumer.updatePartitions("store1", Utils.mkList( + new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("store1", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("store1", 2, Node.noNode(), new Node[0], new Node[0]) + )); + + restoreStateConsumer.updatePartitions("store2", Utils.mkList( + new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("store2", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("store2", 2, Node.noNode(), new Node[0], new Node[0]) + )); + } + + @Test + public void testStorePartitions() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + StreamingConfig config = createConfig(baseDir); + StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null); + + assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions())); + + } finally { + Utils.delete(baseDir); + } + } + + @SuppressWarnings("unchecked") + @Test(expected = Exception.class) + public void testUpdateNonPersistentStore() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + StreamingConfig config = createConfig(baseDir); + StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null); + + restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); + + task.update(partition1, + records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue)) + ); + + } finally { + Utils.delete(baseDir); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testUpdate() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + StreamingConfig config = createConfig(baseDir); + StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null); + + restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); + + for (ConsumerRecord<Integer, Integer> record : Arrays.asList( + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 1, 100), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 2, 100), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 3, 100))) { + restoreStateConsumer.bufferRecord(record); + } + + for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) { + TopicPartition partition = entry.getKey(); + long offset = entry.getValue(); + if (offset >= 0) { + restoreStateConsumer.seek(partition, offset); + } else { + restoreStateConsumer.seekToBeginning(partition); + } + } + + task.update(partition2, restoreStateConsumer.poll(100).records(partition2)); + + StandbyContextImpl context = (StandbyContextImpl) task.context(); + MockStateStoreSupplier.MockStateStore store1 = + (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition1.topic()); + MockStateStoreSupplier.MockStateStore store2 = + (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition2.topic()); + + assertEquals(Collections.emptyList(), store1.keys); + assertEquals(Utils.mkList(1, 2, 3), store2.keys); + + task.close(); + + File taskDir = new File(baseDir, taskId.toString()); + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); + Map<TopicPartition, Long> offsets = checkpoint.read(); + + assertEquals(1, offsets.size()); + assertEquals(new Long(30L + 1L), offsets.get(partition2)); + + } finally { + Utils.delete(baseDir); + } + } + + private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) { + return Arrays.asList(recs); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 54d0a18..02d0ac7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -167,7 +167,7 @@ public class StreamThreadTest { } }; - initPartitionGrouper(thread); + initPartitionGrouper(config, thread); ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; @@ -292,7 +292,7 @@ public class StreamThreadTest { } }; - initPartitionGrouper(thread); + initPartitionGrouper(config, thread); ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; @@ -414,7 +414,7 @@ public class StreamThreadTest { } }; - initPartitionGrouper(thread); + initPartitionGrouper(config, thread); ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; @@ -467,13 +467,11 @@ public class StreamThreadTest { } } - private void initPartitionGrouper(StreamThread thread) { + private void initPartitionGrouper(StreamingConfig config, StreamThread thread) { KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure( - Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread) - ); + partitionAssignor.configure(config.getConsumerConfigs(thread)); Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));
