This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new a7e99ef  KAFKA-8602: Separate PR for 2.3 branch (#7092)
a7e99ef is described below

commit a7e99eff1028194c07562741d9c135ed955e2202
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Tue Jul 16 17:27:07 2019 -0400

    KAFKA-8602: Separate PR for 2.3 branch (#7092)
    
    This PR is from the original work by @cadonna in #7008. Due to incompatible 
changes in trunk that should not get cherry-picked back, a separate PR is 
required for this bug fix
    
    Reviewers:  Matthias J. Sax <mj...@apache.org>
---
 .../streams/processor/internals/StreamThread.java  |   2 +-
 .../StandbyTaskCreationIntegrationTest.java        | 190 +++++++++++++++++++++
 .../processor/internals/StreamThreadTest.java      |  63 ++++++-
 3 files changed, 253 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index db6760c..ac0f5ef 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -503,7 +503,7 @@ public class StreamThread extends Thread {
 
             final ProcessorTopology topology = 
builder.build(taskId.topicGroupId);
 
-            if (!topology.stateStores().isEmpty()) {
+            if (!topology.stateStores().isEmpty() && 
!topology.storeToChangelogTopic().isEmpty()) {
                 return new StandbyTask(
                     taskId,
                     partitions,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
new file mode 100644
index 0000000..e3f7790
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Properties;
+import java.util.function.Predicate;
+
+@Category({IntegrationTest.class})
+public class StandbyTaskCreationIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+    private static final String INPUT_TOPIC = "input-topic";
+
+    private KafkaStreams client1;
+    private KafkaStreams client2;
+    private volatile boolean client1IsOk = false;
+    private volatile boolean client2IsOk = false;
+
+    @BeforeClass
+    public static void createTopics() throws InterruptedException {
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+    }
+
+    @After
+    public void after() {
+        client1.close();
+        client2.close();
+    }
+
+    private Properties streamsConfiguration() {
+        final String applicationId = "testApp";
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        return streamsConfiguration;
+    }
+
+    @Test
+    public void 
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws 
Exception {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String stateStoreName = "myTransformState";
+        final StoreBuilder<KeyValueStore<Integer, Integer>> 
keyValueStoreBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
+                                        Serdes.Integer(),
+                                        
Serdes.Integer()).withLoggingDisabled();
+        builder.addStateStore(keyValueStoreBuilder);
+        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), 
Serdes.Integer()))
+            .transform(() -> new Transformer<Integer, Integer, 
KeyValue<Integer, Integer>>() {
+                @SuppressWarnings("unchecked")
+                @Override
+                public void init(final ProcessorContext context) {}
+
+                @Override
+                public KeyValue<Integer, Integer> transform(final Integer key, 
final Integer value) {
+                    return null;
+                }
+
+                @Override
+                public void close() {}
+            }, stateStoreName);
+
+        final Topology topology = builder.build();
+        createClients(topology, streamsConfiguration(), topology, 
streamsConfiguration());
+
+        setStateListenersForVerification(thread -> 
thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
+
+        startClients();
+
+        waitUntilBothClientAreOK(
+            "At least one client did not reach state RUNNING with active tasks 
but no stand-by tasks"
+        );
+    }
+
+    @Test
+    public void 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws 
Exception {
+        final Properties streamsConfiguration1 = streamsConfiguration();
+        streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);
+        final Properties streamsConfiguration2 = streamsConfiguration();
+        streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(INPUT_TOPIC, Consumed.with(Serdes.Integer(), 
Serdes.Integer()), Materialized.as("source-table"));
+
+        final StreamsBuilder builderII = new StreamsBuilder();
+        builderII.table(INPUT_TOPIC, Consumed.with(Serdes.Integer(), 
Serdes.Integer()), Materialized.as("source-table"));
+
+
+        createClients(
+            builder.build(streamsConfiguration1),
+            streamsConfiguration1,
+            builderII.build(streamsConfiguration2),
+            streamsConfiguration2
+        );
+
+        setStateListenersForVerification(thread -> 
!thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
+
+        startClients();
+
+        waitUntilBothClientAreOK(
+            "At least one client did not reach state RUNNING with active tasks 
and stand-by tasks"
+        );
+    }
+
+    private void createClients(final Topology topology1,
+                               final Properties streamsConfiguration1,
+                               final Topology topology2,
+                               final Properties streamsConfiguration2) {
+
+        client1 = new KafkaStreams(topology1, streamsConfiguration1);
+        client2 = new KafkaStreams(topology2, streamsConfiguration2);
+    }
+
+    private void setStateListenersForVerification(final 
Predicate<ThreadMetadata> taskCondition) {
+        client1.setStateListener((newState, oldState) -> {
+            if (newState == State.RUNNING &&
+                
client1.localThreadsMetadata().stream().allMatch(taskCondition)) {
+
+                client1IsOk = true;
+            }
+        });
+        client2.setStateListener((newState, oldState) -> {
+            if (newState == State.RUNNING &&
+                
client2.localThreadsMetadata().stream().allMatch(taskCondition)) {
+
+                client2IsOk = true;
+            }
+        });
+    }
+
+    private void startClients() {
+        client1.start();
+        client2.start();
+    }
+
+    private void waitUntilBothClientAreOK(final String message) throws 
Exception {
+        TestUtils.waitForCondition(
+            () -> client1IsOk && client2IsOk,
+            30 * 1000,
+            message + ": "
+                + "Client 1 is " + (!client1IsOk ? "NOT " : "") + "OK, "
+                + "client 2 is " + (!client2IsOk ? "NOT " : "") + "OK."
+        );
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 9246ac5..dc867dc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -60,10 +60,13 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
+import 
org.apache.kafka.streams.processor.internals.StreamThread.StreamsMetricsThreadImpl;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -74,6 +77,7 @@ import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
@@ -97,12 +101,14 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static 
org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -1130,6 +1136,61 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldCreateStandbyTask() {
+        setupInternalTopologyWithoutState();
+        internalTopologyBuilder.addStateStore(new 
MockKeyValueStoreBuilder("myStore", true), "processor1");
+
+        final StandbyTask standbyTask = createStandbyTask();
+
+        assertThat(standbyTask, not(nullValue()));
+    }
+
+    @Test
+    public void shouldNotCreateStandbyTaskWithoutStateStores() {
+        setupInternalTopologyWithoutState();
+
+        final StandbyTask standbyTask = createStandbyTask();
+
+        assertThat(standbyTask, nullValue());
+    }
+
+    @Test
+    public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled() {
+        setupInternalTopologyWithoutState();
+        final StoreBuilder storeBuilder = new 
MockKeyValueStoreBuilder("myStore", true);
+        storeBuilder.withLoggingDisabled();
+        internalTopologyBuilder.addStateStore(storeBuilder, "processor1");
+
+        final StandbyTask standbyTask = createStandbyTask();
+
+        assertThat(standbyTask, nullValue());
+    }
+
+    private void setupInternalTopologyWithoutState() {
+        final MockProcessor mockProcessor = new MockProcessor();
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
+        internalTopologyBuilder.addProcessor("processor1", () -> 
mockProcessor, "source1");
+    }
+
+    private StandbyTask createStandbyTask() {
+        final LogContext logContext = new LogContext("test");
+        final Logger log = logContext.logger(StreamThreadTest.class);
+        final StreamsMetricsThreadImpl streamsMetrics = new 
StreamsMetricsThreadImpl(metrics, clientId);
+        final StreamThread.StandbyTaskCreator standbyTaskCreator = new 
StreamThread.StandbyTaskCreator(
+            internalTopologyBuilder,
+            config,
+            streamsMetrics,
+            stateDirectory,
+            new MockChangelogReader(),
+            mockTime,
+            log);
+        return standbyTaskCreator.createTask(
+            new MockConsumer<>(OffsetResetStrategy.EARLIEST),
+            new TaskId(1, 2),
+            Collections.emptySet());
+    }
+
+    @Test
     public void shouldPunctuateActiveTask() {
         final List<Long> punctuatedStreamTime = new ArrayList<>();
         final List<Long> punctuatedWallClockTime = new ArrayList<>();

Reply via email to