This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 528e5c0 KAFKA-8602: Fix bug in stand-by task creation (#7008) 528e5c0 is described below commit 528e5c0f57aa5014cd13baef2b683a1a328f459a Author: cadonna <br...@confluent.io> AuthorDate: Mon Jul 15 22:40:01 2019 +0200 KAFKA-8602: Fix bug in stand-by task creation (#7008) Reviewers: Matthias J. Sax <mj...@apache.org>, John Roesler <j...@confluent.io>, Boyang Chen <boy...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../streams/processor/internals/StreamThread.java | 2 +- .../StandbyTaskCreationIntegrationTest.java | 186 +++++++++++++++++++++ .../processor/internals/StreamThreadTest.java | 61 +++++++ 3 files changed, 248 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 4dc1bde..f9a4c24 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -495,7 +495,7 @@ public class StreamThread extends Thread { final ProcessorTopology topology = builder.build(taskId.topicGroupId); - if (!topology.stateStores().isEmpty()) { + if (!topology.stateStores().isEmpty() && !topology.storeToChangelogTopic().isEmpty()) { return new StandbyTask( taskId, partitions, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java new file mode 100644 index 0000000..34442d4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Properties; +import java.util.function.Predicate; + +@Category({IntegrationTest.class}) +public class StandbyTaskCreationIntegrationTest { + + private static final int NUM_BROKERS = 1; + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + private static final String INPUT_TOPIC = "input-topic"; + + private KafkaStreams client1; + private KafkaStreams client2; + private volatile boolean client1IsOk = false; + private volatile boolean client2IsOk = false; + + @BeforeClass + public static void createTopics() throws InterruptedException { + CLUSTER.createTopic(INPUT_TOPIC, 2, 1); + } + + @After + public void after() { + client1.close(); + client2.close(); + } + + private Properties streamsConfiguration() { + final String applicationId = "testApp"; + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + return streamsConfiguration; + } + + @Test + public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + final String stateStoreName = "myTransformState"; + final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder = + Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), + Serdes.Integer(), + Serdes.Integer()).withLoggingDisabled(); + builder.addStateStore(keyValueStoreBuilder); + builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer())) + .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() { + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) {} + + @Override + public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) { + return null; + } + + @Override + public void close() {} + }, stateStoreName); + + final Topology topology = builder.build(); + createClients(topology, streamsConfiguration(), topology, streamsConfiguration()); + + setStateListenersForVerification(thread -> thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty()); + + startClients(); + + waitUntilBothClientAreOK( + "At least one client did not reach state RUNNING with active tasks but no stand-by tasks" + ); + } + + @Test + public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws Exception { + final Properties streamsConfiguration1 = streamsConfiguration(); + streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + final Properties streamsConfiguration2 = streamsConfiguration(); + streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.table(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("source-table")); + + createClients( + builder.build(streamsConfiguration1), + streamsConfiguration1, + builder.build(streamsConfiguration2), + streamsConfiguration2 + ); + + setStateListenersForVerification(thread -> !thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty()); + + startClients(); + + waitUntilBothClientAreOK( + "At least one client did not reach state RUNNING with active tasks and stand-by tasks" + ); + } + + private void createClients(final Topology topology1, + final Properties streamsConfiguration1, + final Topology topology2, + final Properties streamsConfiguration2) { + + client1 = new KafkaStreams(topology1, streamsConfiguration1); + client2 = new KafkaStreams(topology2, streamsConfiguration2); + } + + private void setStateListenersForVerification(final Predicate<ThreadMetadata> taskCondition) { + client1.setStateListener((newState, oldState) -> { + if (newState == State.RUNNING && + client1.localThreadsMetadata().stream().allMatch(taskCondition)) { + + client1IsOk = true; + } + }); + client2.setStateListener((newState, oldState) -> { + if (newState == State.RUNNING && + client2.localThreadsMetadata().stream().allMatch(taskCondition)) { + + client2IsOk = true; + } + }); + } + + private void startClients() { + client1.start(); + client2.start(); + } + + private void waitUntilBothClientAreOK(final String message) throws Exception { + TestUtils.waitForCondition( + () -> client1IsOk && client2IsOk, + 30 * 1000, + message + ": " + + "Client 1 is " + (!client1IsOk ? "NOT " : "") + "OK, " + + "client 2 is " + (!client2IsOk ? "NOT " : "") + "OK." + ); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index d1d6671..7be1a71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; @@ -59,8 +60,10 @@ import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockKeyValueStoreBuilder; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.MockTimestampExtractor; @@ -70,6 +73,7 @@ import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; import java.io.File; import java.time.Duration; @@ -92,6 +96,8 @@ import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.streams.processor.internals.StreamThread.getSharedAdminClientId; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1058,6 +1064,61 @@ public class StreamThreadTest { } @Test + public void shouldCreateStandbyTask() { + setupInternalTopologyWithoutState(); + internalTopologyBuilder.addStateStore(new MockKeyValueStoreBuilder("myStore", true), "processor1"); + + final StandbyTask standbyTask = createStandbyTask(); + + assertThat(standbyTask, not(nullValue())); + } + + @Test + public void shouldNotCreateStandbyTaskWithoutStateStores() { + setupInternalTopologyWithoutState(); + + final StandbyTask standbyTask = createStandbyTask(); + + assertThat(standbyTask, nullValue()); + } + + @Test + public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled() { + setupInternalTopologyWithoutState(); + final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("myStore", true); + storeBuilder.withLoggingDisabled(); + internalTopologyBuilder.addStateStore(storeBuilder, "processor1"); + + final StandbyTask standbyTask = createStandbyTask(); + + assertThat(standbyTask, nullValue()); + } + + private void setupInternalTopologyWithoutState() { + final MockProcessor mockProcessor = new MockProcessor(); + internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); + internalTopologyBuilder.addProcessor("processor1", () -> mockProcessor, "source1"); + } + + private StandbyTask createStandbyTask() { + final LogContext logContext = new LogContext("test"); + final Logger log = logContext.logger(StreamThreadTest.class); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamThread.StandbyTaskCreator standbyTaskCreator = new StreamThread.StandbyTaskCreator( + internalTopologyBuilder, + config, + streamsMetrics, + stateDirectory, + new MockChangelogReader(), + mockTime, + log); + return standbyTaskCreator.createTask( + new MockConsumer<>(OffsetResetStrategy.EARLIEST), + new TaskId(1, 2), + Collections.emptySet()); + } + + @Test public void shouldPunctuateActiveTask() { final List<Long> punctuatedStreamTime = new ArrayList<>(); final List<Long> punctuatedWallClockTime = new ArrayList<>();