wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r485857758



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1436,6 +1447,16 @@ public void onAssignment(final Assignment assignment, 
final ConsumerGroupMetadat
                 topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                 encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
                 break;
+            case 8:
+                validateActiveTaskEncoding(partitions, info, logPrefix);
+
+                activeTasks = getActiveTasks(partitions, info);
+                partitionsByHost = info.partitionsByHost();
+                standbyPartitionsByHost = info.standbyPartitionByHost();
+                topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
+                encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
+                //recive the shutdown then call the request close

Review comment:
       the error is read in above so just need to handle like version 7

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1053,13 +1067,13 @@ ConsumerRebalanceListener rebalanceListener() {
 
     Consumer<byte[], byte[]> restoreConsumer() {
         return restoreConsumer;
-    };

Review comment:
       extra ; removed

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
##########
@@ -186,6 +186,15 @@ public ByteBuffer encode() {
                     out.writeInt(errCode);
                     out.writeLong(nextRebalanceMs);
                     break;
+                case 8:
+                    out.writeInt(usedVersion);
+                    out.writeInt(commonlySupportedVersion);
+                    encodeActiveAndStandbyTaskAssignment(out);
+                    encodeActiveAndStandbyHostPartitions(out);
+                    out.writeInt(errCode);
+                    out.writeLong(nextRebalanceMs);
+                    out.writeInt(0);

Review comment:
       no error in this case, we only want to trigger shutdown form 
subscriptionUserData

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AppShutdownIntegrationTest.java
##########
@@ -0,0 +1,140 @@
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.errors.ShutdownRequestedException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.*;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+
+@Category(IntegrationTest.class)
+public class AppShutdownIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+
+    @Test
+    public void shouldSendShutDownSignal() throws Exception {
+        //
+        //
+        // Also note that this is an integration test because so many 
components have to come together to
+        // ensure these configurations wind up where they belong, and any 
number of future code changes
+        // could break this change.
+
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + testId;
+        final String inputTopic = "input" + testId;
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+
+        final List<KeyValue<Object, Object>> processorValueCollector = new 
ArrayList<>();
+
+        builder.stream(inputTopic).process(() -> new 
ShutdownProcessor(processorValueCollector), Named.as("process"));
+
+        final Properties properties = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
"5"),
+                        mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
"6"),
+                        mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+                        
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000")
+                )
+        );
+
+
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            kafkaStreams.start();
+            produceMessages(0L, inputTopic);
+
+            latch.await(10, TimeUnit.SECONDS);
+
+            assertThat(processorValueCollector.size(), equalTo(1));
+        }
+    }
+
+    private void produceMessages(final long timestamp, final String 
streamOneInput) throws ShutdownRequestedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                streamOneInput,
+                Arrays.asList(
+                        new KeyValue<>(1, "A"),
+                        new KeyValue<>(2, "B"),
+                        new KeyValue<>(3, "C"),
+                        new KeyValue<>(4, "D"),
+                        new KeyValue<>(5, "E")),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        IntegerSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                timestamp);
+    }
+}
+
+
+class ShutdownProcessor extends AbstractProcessor<Object, Object> {
+    final List<KeyValue<Object, Object>> valueList;
+
+    ShutdownProcessor(final List<KeyValue<Object, Object>> valueList) {
+        this.valueList = valueList;
+    }
+
+    @Override
+    public void init(final ProcessorContext context) {
+//        throw new ShutdownRequestedException("integration test");
+    }
+
+    @Override
+    public void process(final Object key, final Object value) {
+        valueList.add(new KeyValue<>(key, value));
+//        throw new ShutdownRequestedException("integration test");

Review comment:
       uncomment for test to pass, still will not cause a shutdown until 
request close is added to the rebalance listener

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1369,7 +1381,6 @@ public void onAssignment(final Assignment assignment, 
final ConsumerGroupMetadat
 
         final AssignmentInfo info = 
AssignmentInfo.decode(assignment.userData());
         if (info.errCode() != AssignorError.NONE.code()) {
-            // set flag to shutdown streams app
             assignmentErrorCode.set(info.errCode());

Review comment:
       records the error to be handled in onPartitionsAssigned with the other 
possible error

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
##########
@@ -360,6 +369,16 @@ public static AssignmentInfo decode(final ByteBuffer data) 
{
                     assignmentInfo.errCode = in.readInt();
                     assignmentInfo.nextRebalanceMs = in.readLong();
                     break;
+                case 8:
+                    commonlySupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, 
commonlySupportedVersion);
+                    decodeActiveTasks(assignmentInfo, in);
+                    decodeStandbyTasks(assignmentInfo, in);
+                    decodeActiveAndStandbyHostPartitions(assignmentInfo, in);
+                    assignmentInfo.errCode = in.readInt();
+                    assignmentInfo.nextRebalanceMs = in.readLong();
+                    in.readInt();

Review comment:
       I don't know if this is necessary but its best to get the error out of 
the buffer

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AppShutdownIntegrationTest.java
##########
@@ -0,0 +1,140 @@
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.errors.ShutdownRequestedException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.*;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+
+@Category(IntegrationTest.class)
+public class AppShutdownIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+
+    @Test
+    public void shouldSendShutDownSignal() throws Exception {
+        //
+        //
+        // Also note that this is an integration test because so many 
components have to come together to
+        // ensure these configurations wind up where they belong, and any 
number of future code changes
+        // could break this change.
+
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + testId;
+        final String inputTopic = "input" + testId;
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+
+        final List<KeyValue<Object, Object>> processorValueCollector = new 
ArrayList<>();
+
+        builder.stream(inputTopic).process(() -> new 
ShutdownProcessor(processorValueCollector), Named.as("process"));
+
+        final Properties properties = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
"5"),
+                        mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
"6"),
+                        mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+                        
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000")
+                )
+        );
+
+
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            kafkaStreams.start();
+            produceMessages(0L, inputTopic);
+
+            latch.await(10, TimeUnit.SECONDS);

Review comment:
       should replace with a shutdown hook latch with counter instead

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1229,4 +1233,13 @@ boolean needsInitializationOrRestoration() {
     public void setPartitionResetter(final 
java.util.function.Consumer<Set<TopicPartition>> resetter) {
         this.resetter = resetter;
     }
+
+    public void flagForShutdownRequest(){
+        this.shutdownRequested.set(2);

Review comment:
       error code for INCOMPLETE_SOURCE_TOPIC_METADATA is 1, SHUTDOWN_REQUESTED 
is 2

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -566,10 +567,22 @@ void runLoop() {
                 }
             } catch (final TaskMigratedException e) {
                 handleTaskMigrated(e);
+            } catch (final ShutdownRequestedException e){
+                handleShutdownRequest(e);

Review comment:
       The interruption in this location caused ...
   
   
   ```
   TaskManager
        MetadataState:
        Tasks:
    died (org.apache.zookeeper.server.NIOServerCnxnFactory:92)
   java.lang.IllegalStateException: Illegal state SUSPENDED while completing 
restoration for active task 0_0
        at 
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:245)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:487)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:675)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:554)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:513)
   ```
   
   Not sure that is okay, the test still runs. If its is shutting down maybe it 
does matter and I'm not sure where else to put it

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -54,6 +54,9 @@ public void onPartitionsAssigned(final 
Collection<TopicPartition> partitions) {
         if (assignmentErrorCode.get() == 
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
             log.error("Received error code {}", assignmentErrorCode.get());
             throw new MissingSourceTopicException("One or more source topics 
were missing during rebalance");
+        }else if(assignmentErrorCode.get() == 
AssignorError.SHUTDOWN_REQUESTED.code()){
+            //throw new ShutdownRequestedException("onPartition assigned"); 
//TODO: receive request and call requestClose()
+            //requestClose();

Review comment:
       until the shut down application form the stream thread functionality is 
added this will not shutdown the app




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to