This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 51ace6306f KAFKA-14143: Exactly-once source connector system tests 
(#11783)
51ace6306f is described below

commit 51ace6306f06d9875c090323a82ee26876957789
Author: Chris Egerton <fearthecel...@gmail.com>
AuthorDate: Thu Sep 8 15:13:43 2022 -0400

    KAFKA-14143: Exactly-once source connector system tests (#11783)
    
    Also includes a minor quality-of-life improvement to clarify why some 
internal REST requests to workers may fail while that worker is still starting 
up.
    
    Reviewers: Tom Bentley <tbent...@redhat.com>, Luke Chen 
<show...@gmail.com>, José Armando García Sancio <jsan...@gmail.com>, Mickael 
Maison <mickael.mai...@gmail.com>
---
 .../runtime/distributed/DistributedHerder.java     |  17 +-
 .../kafka/connect/tools/VerifiableSourceTask.java  |  29 ++-
 .../apache/kafka/connect/util/KafkaBasedLog.java   |   2 +
 .../runtime/distributed/DistributedHerderTest.java |  53 ++++-
 tests/kafkatest/services/connect.py                |  15 +-
 .../tests/connect/connect_distributed_test.py      | 228 ++++++++++++++++-----
 tests/kafkatest/tests/connect/connect_rest_test.py |   3 +-
 .../templates/connect-distributed.properties       |   6 +
 8 files changed, 286 insertions(+), 67 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 388bfa4218..5dde71f331 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -2402,13 +2402,16 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                         requestSignature.keyAlgorithm(),
                         keySignatureVerificationAlgorithms
                 ));
-            } else {
-                if (!requestSignature.isValid(sessionKey)) {
-                    requestValidationError = new ConnectRestException(
-                            Response.Status.FORBIDDEN,
-                            "Internal request contained invalid signature."
-                    );
-                }
+            } else if (sessionKey == null) {
+                requestValidationError = new ConnectRestException(
+                        Response.Status.SERVICE_UNAVAILABLE,
+                        "This worker is still starting up and has not been 
able to read a session key from the config topic yet"
+                );
+            } else if (!requestSignature.isValid(sessionKey)) {
+                requestValidationError = new ConnectRestException(
+                        Response.Status.FORBIDDEN,
+                        "Internal request contained invalid signature."
+                );
             }
             if (requestValidationError != null) {
                 callback.onCompletion(requestValidationError, null);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
index 9918752759..ee49efed92 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.tools;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.tools.ThroughputThrottler;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.connect.data.Schema;
@@ -31,6 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 /**
  * A connector primarily intended for system tests. The connector simply 
generates as many tasks as requested. The
@@ -48,6 +51,7 @@ public class VerifiableSourceTask extends SourceTask {
     public static final String ID_CONFIG = "id";
     public static final String TOPIC_CONFIG = "topic";
     public static final String THROUGHPUT_CONFIG = "throughput";
+    public static final String COMPLETE_RECORD_DATA_CONFIG = 
"complete.record.data";
 
     private static final String ID_FIELD = "id";
     private static final String SEQNO_FIELD = "seqno";
@@ -61,6 +65,15 @@ public class VerifiableSourceTask extends SourceTask {
     private long startingSeqno;
     private long seqno;
     private ThroughputThrottler throttler;
+    private boolean completeRecordData;
+
+    private static final Schema COMPLETE_VALUE_SCHEMA = SchemaBuilder.struct()
+        .field("name", Schema.STRING_SCHEMA)
+        .field("task", Schema.INT32_SCHEMA)
+        .field("topic", Schema.STRING_SCHEMA)
+        .field("time_ms", Schema.INT64_SCHEMA)
+        .field("seqno", Schema.INT64_SCHEMA)
+        .build();
 
     @Override
     public String version() {
@@ -87,6 +100,7 @@ public class VerifiableSourceTask extends SourceTask {
             seqno = 0;
         startingSeqno = seqno;
         throttler = new ThroughputThrottler(throughput, 
System.currentTimeMillis());
+        completeRecordData = 
"true".equalsIgnoreCase(props.get(COMPLETE_RECORD_DATA_CONFIG));
 
         log.info("Started VerifiableSourceTask {}-{} producing to topic {} 
resuming from seqno {}", name, id, topic, startingSeqno);
     }
@@ -114,7 +128,9 @@ public class VerifiableSourceTask extends SourceTask {
         System.out.println(dataJson);
 
         Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, 
seqno);
-        SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, 
Schema.INT32_SCHEMA, id, Schema.INT64_SCHEMA, seqno);
+        Schema valueSchema = completeRecordData ? COMPLETE_VALUE_SCHEMA : 
Schema.INT64_SCHEMA;
+        Object value = completeRecordData ? completeValue(data) : seqno;
+        SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, 
Schema.INT32_SCHEMA, id, valueSchema, value);
         List<SourceRecord> result = Collections.singletonList(srcRecord);
         seqno++;
         return result;
@@ -141,6 +157,15 @@ public class VerifiableSourceTask extends SourceTask {
 
     @Override
     public void stop() {
-        throttler.wakeup();
+        if (throttler != null)
+            throttler.wakeup();
+    }
+
+    private Object completeValue(Map<String, Object> data) {
+        Struct result = new Struct(COMPLETE_VALUE_SCHEMA);
+        Stream.of("name", "task", "topic", "time_ms", "seqno").forEach(
+            field -> result.put(field, data.get(field))
+        );
+        return result;
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 5da5be04a5..cde63b3f83 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -166,6 +166,8 @@ public class KafkaBasedLog<K, V> {
         this.readLogEndOffsetCallbacks = new ArrayDeque<>();
         this.time = time;
         this.initializer = initializer != null ? initializer : admin -> { };
+        // Initialize the producer Optional here to prevent NPEs later on
+        this.producer = Optional.empty();
 
         // If the consumer is configured with isolation.level = 
read_committed, then its end offsets method cannot be relied on
         // as it will not take records from currently-open transactions into 
account. We want to err on the side of caution in that
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 3249412259..82c7b50d67 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -105,6 +105,7 @@ import java.util.stream.IntStream;
 
 import static java.util.Collections.singletonList;
 import static javax.ws.rs.core.Response.Status.FORBIDDEN;
+import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
 import static 
org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED;
 import static 
org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
@@ -2773,7 +2774,15 @@ public class DistributedHerderTest extends ThreadedTest {
         
EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
         
EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(false).anyTimes();
 
-        PowerMock.replayAll(taskConfigCb, signature);
+        SessionKey sessionKey = EasyMock.mock(SessionKey.class);
+        SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
+        EasyMock.expect(sessionKey.key()).andReturn(secretKey);
+        
EasyMock.expect(sessionKey.creationTimestamp()).andReturn(time.milliseconds());
+
+        PowerMock.replayAll(taskConfigCb, signature, sessionKey, secretKey);
+
+        // Read a new session key from the config topic
+        configUpdateListener.onSessionKeyUpdate(sessionKey);
 
         herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, signature);
 
@@ -2782,6 +2791,28 @@ public class DistributedHerderTest extends ThreadedTest {
         assertEquals(FORBIDDEN.getStatusCode(), ((ConnectRestException) 
errorCapture.getValue()).statusCode());
     }
 
+    @Test
+    public void putTaskConfigsWorkerStillStarting() {
+        Callback<Void> taskConfigCb = EasyMock.mock(Callback.class);
+        Capture<Throwable> errorCapture = Capture.newInstance();
+        taskConfigCb.onCompletion(capture(errorCapture), EasyMock.eq(null));
+        EasyMock.expectLastCall().once();
+
+        
EasyMock.expect(member.currentProtocolVersion()).andReturn(CONNECT_PROTOCOL_V2).anyTimes();
+
+        InternalRequestSignature signature = 
EasyMock.mock(InternalRequestSignature.class);
+        
EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
+        
EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+        PowerMock.replayAll(taskConfigCb, signature);
+
+        herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, signature);
+
+        PowerMock.verifyAll();
+        assertTrue(errorCapture.getValue() instanceof ConnectRestException);
+        assertEquals(SERVICE_UNAVAILABLE.getStatusCode(), 
((ConnectRestException) errorCapture.getValue()).statusCode());
+    }
+
     @Test
     public void testPutTaskConfigsValidRequiredSignature() {
         Callback<Void> taskConfigCb = EasyMock.mock(Callback.class);
@@ -2794,7 +2825,15 @@ public class DistributedHerderTest extends ThreadedTest {
         
EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
         
EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(true).anyTimes();
 
-        PowerMock.replayAll(taskConfigCb, signature);
+        SessionKey sessionKey = EasyMock.mock(SessionKey.class);
+        SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
+        EasyMock.expect(sessionKey.key()).andReturn(secretKey);
+        
EasyMock.expect(sessionKey.creationTimestamp()).andReturn(time.milliseconds());
+
+        PowerMock.replayAll(taskConfigCb, signature, sessionKey, secretKey);
+
+        // Read a new session key from the config topic
+        configUpdateListener.onSessionKeyUpdate(sessionKey);
 
         herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, signature);
 
@@ -2893,7 +2932,15 @@ public class DistributedHerderTest extends ThreadedTest {
         
EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
         
EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(false).anyTimes();
 
-        PowerMock.replayAll(taskConfigCb, signature);
+        SessionKey sessionKey = EasyMock.mock(SessionKey.class);
+        SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
+        EasyMock.expect(sessionKey.key()).andReturn(secretKey);
+        
EasyMock.expect(sessionKey.creationTimestamp()).andReturn(time.milliseconds());
+
+        PowerMock.replayAll(taskConfigCb, signature, sessionKey, secretKey);
+
+        // Read a new session key from the config topic
+        configUpdateListener.onSessionKeyUpdate(sessionKey);
 
         herder.fenceZombieSourceTasks(CONN1, taskConfigCb, signature);
 
diff --git a/tests/kafkatest/services/connect.py 
b/tests/kafkatest/services/connect.py
index 41c33ccb9e..6cd4fa0675 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -152,14 +152,17 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
                                err_msg="Never saw message indicating Kafka 
Connect joined group on node: " +
                                        "%s in condition mode: %s" % 
(str(node.account), self.startup_mode))
 
-    def stop_node(self, node, clean_shutdown=True):
-        self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " 
stopping Kafka Connect on " + str(node.account))
+    def stop_node(self, node, clean_shutdown=True, await_shutdown=None):
+        if await_shutdown is None:
+            await_shutdown = clean_shutdown
+        self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " 
stopping Kafka Connect on " + str(node.account) \
+                         + " and " + ("" if await_shutdown else "not ") + 
"awaiting shutdown")
         pids = self.pids(node)
         sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
 
         for pid in pids:
             node.account.signal(pid, sig, allow_fail=True)
-        if clean_shutdown:
+        if await_shutdown:
             for pid in pids:
                 wait_until(lambda: not node.account.alive(pid), 
timeout_sec=self.startup_timeout_sec, err_msg="Kafka Connect process on " + str(
                     node.account) + " took too long to exit")
@@ -464,13 +467,14 @@ class VerifiableSource(VerifiableConnector):
     Helper class for running a verifiable source connector on a Kafka Connect 
cluster and analyzing the output.
     """
 
-    def __init__(self, cc, name="verifiable-source", tasks=1, 
topic="verifiable", throughput=1000):
+    def __init__(self, cc, name="verifiable-source", tasks=1, 
topic="verifiable", throughput=1000, complete_records=False):
         self.cc = cc
         self.logger = self.cc.logger
         self.name = name
         self.tasks = tasks
         self.topic = topic
         self.throughput = throughput
+        self.complete_records = complete_records
 
     def committed_messages(self):
         return list(filter(lambda m: 'committed' in m and m['committed'], 
self.messages()))
@@ -485,7 +489,8 @@ class VerifiableSource(VerifiableConnector):
             'connector.class': 
'org.apache.kafka.connect.tools.VerifiableSourceConnector',
             'tasks.max': self.tasks,
             'topic': self.topic,
-            'throughput': self.throughput
+            'throughput': self.throughput,
+            'complete.record.data': self.complete_records
         })
 
 
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py 
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 970779f723..8347afc8d6 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -54,6 +54,7 @@ class ConnectDistributedTest(Test):
     STATUS_TOPIC = "connect-status"
     STATUS_REPLICATION_FACTOR = "1"
     STATUS_PARTITIONS = "1"
+    EXACTLY_ONCE_SOURCE_SUPPORT = "disabled"
     SCHEDULED_REBALANCE_MAX_DELAY_MS = "60000"
     CONNECT_PROTOCOL="sessioned"
 
@@ -84,7 +85,11 @@ class ConnectDistributedTest(Test):
         self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
                                   security_protocol=security_protocol, 
interbroker_security_protocol=security_protocol,
                                   topics=self.topics, version=broker_version,
-                                  
server_prop_overrides=[["auto.create.topics.enable", str(auto_create_topics)]])
+                                  server_prop_overrides=[
+                                      ["auto.create.topics.enable", 
str(auto_create_topics)],
+                                      
["transaction.state.log.replication.factor", str(self.num_brokers)],
+                                      ["transaction.state.log.min.isr", 
str(self.num_brokers)]
+                                  ])
         if timestamp_type is not None:
             for node in self.kafka.nodes:
                 node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = 
timestamp_type
@@ -159,27 +164,32 @@ class ConnectDistributedTest(Test):
         return self._task_has_state(task_id, status, 'RUNNING')
 
     @cluster(num_nodes=5)
-    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_restart_failed_connector(self, connect_protocol):
+    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 
'compatible', 'eager'])
+    def test_restart_failed_connector(self, exactly_once_source, 
connect_protocol):
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source 
else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
         self.cc.start()
 
-        self.sink = MockSink(self.cc, self.topics.keys(), 
mode='connector-failure', delay_sec=5)
-        self.sink.start()
+        if exactly_once_source:
+            self.connector = MockSource(self.cc, mode='connector-failure', 
delay_sec=5)
+        else:
+            self.connector = MockSink(self.cc, self.topics.keys(), 
mode='connector-failure', delay_sec=5)
+        self.connector.start()
 
-        wait_until(lambda: self.connector_is_failed(self.sink), timeout_sec=15,
+        wait_until(lambda: self.connector_is_failed(self.connector), 
timeout_sec=15,
                    err_msg="Failed to see connector transition to the FAILED 
state")
 
-        self.cc.restart_connector(self.sink.name)
+        self.cc.restart_connector(self.connector.name)
         
-        wait_until(lambda: self.connector_is_running(self.sink), 
timeout_sec=10,
+        wait_until(lambda: self.connector_is_running(self.connector), 
timeout_sec=10,
                    err_msg="Failed to see connector transition to the RUNNING 
state")
 
     @cluster(num_nodes=5)
-    @matrix(connector_type=['source', 'sink'], connect_protocol=['sessioned', 
'compatible', 'eager'])
+    @matrix(connector_type=['source', 'exactly-once source', 'sink'], 
connect_protocol=['sessioned', 'compatible', 'eager'])
     def test_restart_failed_task(self, connector_type, connect_protocol):
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type == 
'exactly-once source' else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
@@ -247,13 +257,14 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see task transition to the RUNNING 
state")
 
     @cluster(num_nodes=5)
-    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_pause_and_resume_source(self, connect_protocol):
+    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 
'compatible', 'eager'])
+    def test_pause_and_resume_source(self, exactly_once_source, 
connect_protocol):
         """
         Verify that source connectors stop producing records when paused and 
begin again after
         being resumed.
         """
 
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source 
else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
@@ -336,12 +347,13 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to consume messages after resuming sink 
connector")
 
     @cluster(num_nodes=5)
-    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_pause_state_persistent(self, connect_protocol):
+    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 
'compatible', 'eager'])
+    def test_pause_state_persistent(self, exactly_once_source, 
connect_protocol):
         """
         Verify that paused state is preserved after a cluster restart.
         """
 
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source 
else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
@@ -363,13 +375,14 @@ class ConnectDistributedTest(Test):
                        err_msg="Failed to see connector startup in PAUSED 
state")
 
     @cluster(num_nodes=6)
-    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, 
SecurityConfig.SASL_SSL], connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_file_source_and_sink(self, security_protocol, connect_protocol):
+    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, 
SecurityConfig.SASL_SSL], exactly_once_source=[True, False], 
connect_protocol=['sessioned', 'compatible', 'eager'])
+    def test_file_source_and_sink(self, security_protocol, 
exactly_once_source, connect_protocol):
         """
         Tests that a basic file connector works across clean rolling bounces. 
This validates that the connector is
         correctly created, tasks instantiated, and as nodes restart the work 
is rebalanced across nodes.
         """
 
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source 
else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services(security_protocol=security_protocol, 
include_filestream_connectors=True)
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
@@ -400,7 +413,8 @@ class ConnectDistributedTest(Test):
     def test_bounce(self, clean, connect_protocol):
         """
         Validates that source and sink tasks that run continuously and produce 
a predictable sequence of messages
-        run correctly and deliver messages exactly once when Kafka Connect 
workers undergo clean rolling bounces.
+        run correctly and deliver messages exactly once when Kafka Connect 
workers undergo clean rolling bounces,
+        and at least once when workers undergo unclean bounces.
         """
         num_tasks = 3
 
@@ -414,11 +428,14 @@ class ConnectDistributedTest(Test):
         self.sink = VerifiableSink(self.cc, tasks=num_tasks, 
topics=[self.TOPIC])
         self.sink.start()
 
-        for _ in range(3):
-            for node in self.cc.nodes:
+        for i in range(3):
+            start = i % len(self.cc.nodes)
+            # Don't want to restart worker nodes in the same order every time
+            shuffled_nodes = self.cc.nodes[start:] + self.cc.nodes[:start]
+            for node in shuffled_nodes:
                 started = time.time()
                 self.logger.info("%s bouncing Kafka Connect on %s", clean and 
"Clean" or "Hard", str(node.account))
-                self.cc.stop_node(node, clean_shutdown=clean)
+                self.cc.stop_node(node, clean_shutdown=clean, 
await_shutdown=True)
                 with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
                     self.cc.start_node(node)
                     monitor.wait_until("Starting connectors and tasks using 
config offset", timeout_sec=90,
@@ -453,7 +470,7 @@ class ConnectDistributedTest(Test):
         self.cc.stop()
 
         # Validate at least once delivery of everything that was reported as 
written since we should have flushed and
-        # cleanly exited. Currently this only tests at least once delivery 
because the sink task may not have consumed
+        # cleanly exited. Currently this only tests at least once delivery for 
sinks because the task may not have consumed
         # all the messages generated by the source task. This needs to be done 
per-task since seqnos are not unique across
         # tasks.
         success = True
@@ -519,6 +536,110 @@ class ConnectDistributedTest(Test):
 
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
+    @cluster(num_nodes=6)
+    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 
'eager'])
+    def test_exactly_once_source(self, clean, connect_protocol):
+        """
+        Validates that source tasks run correctly and deliver messages exactly 
once
+        when Kafka Connect workers undergo bounces, both clean and unclean.
+        """
+        num_tasks = 3
+
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled'
+        self.CONNECT_PROTOCOL = connect_protocol
+        self.setup_services()
+        self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc, topic=self.TOPIC, 
tasks=num_tasks, throughput=100, complete_records=True)
+        self.source.start()
+
+        for i in range(4):
+            start = i % len(self.cc.nodes)
+            # Don't want to restart worker nodes in the same order every time
+            shuffled_nodes = self.cc.nodes[start:] + self.cc.nodes[:start]
+            for node in shuffled_nodes:
+                started = time.time()
+                self.logger.info("%s bouncing Kafka Connect on %s", clean and 
"Clean" or "Hard", str(node.account))
+                self.cc.stop_node(node, clean_shutdown=clean, 
await_shutdown=True)
+                with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
+                    self.cc.start_node(node)
+                    monitor.wait_until("Starting connectors and tasks using 
config offset", timeout_sec=90,
+                                       err_msg="Kafka Connect worker didn't 
successfully join group and start work")
+                self.logger.info("Bounced Kafka Connect on %s and rejoined in 
%f seconds", node.account, time.time() - started)
+
+                if i < 2:
+                    # Give additional time for the worker group to recover. 
Even if it is not a hard bounce, there are
+                    # some cases where a restart can cause a rebalance to take 
the full length of the session timeout
+                    # (e.g. if the client shuts down before it has received 
the memberId from its initial JoinGroup).
+                    # If we don't give enough time for the group to stabilize, 
the next bounce may cause workers to
+                    # be shut down before they have any time to process data 
and we can end up with zero data making it
+                    # through the test.
+                    time.sleep(15)
+                else:
+                    # We also need to make sure that, even without time for 
the cluster to recover gracefully in between
+                    # worker restarts, the cluster and its tasks do not get 
into an inconsistent state and either duplicate or
+                    # drop messages.
+                    pass
+
+        # Wait at least scheduled.rebalance.max.delay.ms to expire and 
rebalance
+        time.sleep(60)
+
+        # It's possible that a zombie fencing request from a follower to the 
leader failed when we bounced the leader
+        # We don't automatically retry these requests because some failures 
(such as insufficient ACLs for the
+        # connector's principal) are genuine and need to be reported by 
failing the task and displaying an error message
+        # in the status for the task in the REST API.
+        # So, we make a polite request to the cluster to restart any failed 
tasks
+        self.cc.restart_connector_and_tasks(self.source.name, 
only_failed='true', include_tasks='true')
+
+        # Allow the connectors to startup, recover, and exit cleanly before
+        # ending the test.
+        wait_until(lambda: self.is_running(self.source), timeout_sec=30,
+                   err_msg="Failed to see connector transition to the RUNNING 
state")
+        time.sleep(15)
+        self.source.stop()
+        self.cc.stop()
+
+        consumer = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, message_validator=json.loads, consumer_timeout_ms=1000, 
isolation_level="read_committed")
+        consumer.run()
+        src_messages = consumer.messages_consumed[1]
+
+        success = True
+        errors = []
+        for task in range(num_tasks):
+            # Validate source messages
+            src_seqnos = [msg['payload']['seqno'] for msg in src_messages if 
msg['payload']['task'] == task]
+            if len(src_seqnos) == 0:
+                self.logger.error("No records produced by task " + str(task))
+                errors.append("No records produced by task %d" % (task))
+                success = False
+                continue
+            # Every seqno up to the largest one we ever saw should appear. 
Each seqno should only appear once because clean
+            # bouncing should commit on rebalance.
+            src_seqno_max = max(src_seqnos)
+            self.logger.debug("Max source seqno: %d", src_seqno_max)
+            src_seqno_counts = Counter(src_seqnos)
+            missing_src_seqnos = 
sorted(set(range(src_seqno_max)).difference(set(src_seqnos)))
+            duplicate_src_seqnos = sorted(seqno for seqno,count in 
src_seqno_counts.items() if count > 1)
+
+            if missing_src_seqnos:
+                self.logger.error("Missing source sequence numbers for task " 
+ str(task))
+                errors.append("Found missing source sequence numbers for task 
%d: %s" % (task, missing_src_seqnos))
+                success = False
+            if duplicate_src_seqnos:
+                self.logger.error("Duplicate source sequence numbers for task 
" + str(task))
+                errors.append("Found duplicate source sequence numbers for 
task %d: %s" % (task, duplicate_src_seqnos))
+                success = False
+
+        if not success:
+            self.mark_for_collect(self.cc)
+            # Also collect the data in the topic to aid in debugging
+            consumer_validator = ConsoleConsumer(self.test_context, 1, 
self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True)
+            consumer_validator.run()
+            self.mark_for_collect(consumer_validator, "consumer_stdout")
+
+        assert success, "Found validation errors:\n" + "\n  ".join(errors)
+
     @cluster(num_nodes=6)
     @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
     def test_transformations(self, connect_protocol):
@@ -577,42 +698,51 @@ class ConnectDistributedTest(Test):
             assert obj['payload'][ts_fieldname] == ts
 
     @cluster(num_nodes=5)
-    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='sessioned')
-    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='sessioned')
-    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='sessioned')
-    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='sessioned')
-    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='sessioned')
-    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
-    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, 
security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
-    def test_broker_compatibility(self, broker_version, auto_create_topics, 
security_protocol, connect_protocol):
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='sessioned')
+    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, 
exactly_once_source=False, connect_protocol='sessioned')
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, 
exactly_once_source=False, connect_protocol='compatible')
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, 
exactly_once_source=False, connect_protocol='eager')
+    def test_broker_compatibility(self, broker_version, auto_create_topics, 
exactly_once_source, connect_protocol):
         """
         Verify that Connect will start up with various broker versions with 
various configurations. 
         When Connect distributed starts up, it either creates internal topics 
(v0.10.1.0 and after) 
         or relies upon the broker to auto-create the topics (v0.10.0.x and 
before).
         """
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source 
else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services(broker_version=KafkaVersion(broker_version), 
auto_create_topics=auto_create_topics,
-                            security_protocol=security_protocol, 
include_filestream_connectors=True)
+                            security_protocol=SecurityConfig.PLAINTEXT, 
include_filestream_connectors=True)
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
 
         self.cc.start()
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py 
b/tests/kafkatest/tests/connect/connect_rest_test.py
index ff44d9412f..5e3e69e1c7 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -34,7 +34,8 @@ class ConnectRestApiTest(KafkaTest):
 
     FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 
'key.converter', 'value.converter', 'header.converter', 'batch.size',
                            'topic', 'file', 'transforms', 
'config.action.reload', 'errors.retry.timeout', 'errors.retry.delay.max.ms',
-                           'errors.tolerance', 'errors.log.enable', 
'errors.log.include.messages', 'predicates', 'topic.creation.groups'}
+                           'errors.tolerance', 'errors.log.enable', 
'errors.log.include.messages', 'predicates', 'topic.creation.groups',
+                           'exactly.once.support', 'transaction.boundary', 
'transaction.boundary.interval.ms', 'offsets.storage.topic'}
     FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 
'key.converter', 'value.converter', 'header.converter', 'topics',
                          'file', 'transforms', 'topics.regex', 
'config.action.reload', 'errors.retry.timeout', 'errors.retry.delay.max.ms',
                          'errors.tolerance', 'errors.log.enable', 
'errors.log.include.messages', 'errors.deadletterqueue.topic.name',
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-distributed.properties 
b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index 6d2d5e28d1..cd80219090 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -20,6 +20,12 @@ bootstrap.servers={{ 
kafka.bootstrap_servers(kafka.security_config.security_prot
 
 group.id={{ group|default("connect-cluster") }}
 
+exactly.once.source.support={{ EXACTLY_ONCE_SOURCE_SUPPORT|default("disabled") 
}}
+{% if EXACTLY_ONCE_SOURCE_SUPPORT is defined and EXACTLY_ONCE_SOURCE_SUPPORT 
== "enabled" %}
+# Reduce transaction timeouts so tests that kill workers don't need to wait as 
long to recover
+producer.transaction.timeout.ms = 10000
+{% endif %}
+
 connect.protocol={{ CONNECT_PROTOCOL|default("sessioned") }}
 scheduled.rebalance.max.delay.ms={{ 
SCHEDULED_REBALANCE_MAX_DELAY_MS|default(60000) }}
 

Reply via email to