This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 51ace6306f KAFKA-14143: Exactly-once source connector system tests (#11783) 51ace6306f is described below commit 51ace6306f06d9875c090323a82ee26876957789 Author: Chris Egerton <fearthecel...@gmail.com> AuthorDate: Thu Sep 8 15:13:43 2022 -0400 KAFKA-14143: Exactly-once source connector system tests (#11783) Also includes a minor quality-of-life improvement to clarify why some internal REST requests to workers may fail while that worker is still starting up. Reviewers: Tom Bentley <tbent...@redhat.com>, Luke Chen <show...@gmail.com>, José Armando García Sancio <jsan...@gmail.com>, Mickael Maison <mickael.mai...@gmail.com> --- .../runtime/distributed/DistributedHerder.java | 17 +- .../kafka/connect/tools/VerifiableSourceTask.java | 29 ++- .../apache/kafka/connect/util/KafkaBasedLog.java | 2 + .../runtime/distributed/DistributedHerderTest.java | 53 ++++- tests/kafkatest/services/connect.py | 15 +- .../tests/connect/connect_distributed_test.py | 228 ++++++++++++++++----- tests/kafkatest/tests/connect/connect_rest_test.py | 3 +- .../templates/connect-distributed.properties | 6 + 8 files changed, 286 insertions(+), 67 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 388bfa4218..5dde71f331 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -2402,13 +2402,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable { requestSignature.keyAlgorithm(), keySignatureVerificationAlgorithms )); - } else { - if (!requestSignature.isValid(sessionKey)) { - requestValidationError = new ConnectRestException( - Response.Status.FORBIDDEN, - "Internal request contained invalid signature." - ); - } + } else if (sessionKey == null) { + requestValidationError = new ConnectRestException( + Response.Status.SERVICE_UNAVAILABLE, + "This worker is still starting up and has not been able to read a session key from the config topic yet" + ); + } else if (!requestSignature.isValid(sessionKey)) { + requestValidationError = new ConnectRestException( + Response.Status.FORBIDDEN, + "Internal request contained invalid signature." + ); } if (requestValidationError != null) { callback.onCompletion(requestValidationError, null); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java index 9918752759..ee49efed92 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java @@ -18,6 +18,8 @@ package org.apache.kafka.connect.tools; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.tools.ThroughputThrottler; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.connect.data.Schema; @@ -31,6 +33,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Stream; /** * A connector primarily intended for system tests. The connector simply generates as many tasks as requested. The @@ -48,6 +51,7 @@ public class VerifiableSourceTask extends SourceTask { public static final String ID_CONFIG = "id"; public static final String TOPIC_CONFIG = "topic"; public static final String THROUGHPUT_CONFIG = "throughput"; + public static final String COMPLETE_RECORD_DATA_CONFIG = "complete.record.data"; private static final String ID_FIELD = "id"; private static final String SEQNO_FIELD = "seqno"; @@ -61,6 +65,15 @@ public class VerifiableSourceTask extends SourceTask { private long startingSeqno; private long seqno; private ThroughputThrottler throttler; + private boolean completeRecordData; + + private static final Schema COMPLETE_VALUE_SCHEMA = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("task", Schema.INT32_SCHEMA) + .field("topic", Schema.STRING_SCHEMA) + .field("time_ms", Schema.INT64_SCHEMA) + .field("seqno", Schema.INT64_SCHEMA) + .build(); @Override public String version() { @@ -87,6 +100,7 @@ public class VerifiableSourceTask extends SourceTask { seqno = 0; startingSeqno = seqno; throttler = new ThroughputThrottler(throughput, System.currentTimeMillis()); + completeRecordData = "true".equalsIgnoreCase(props.get(COMPLETE_RECORD_DATA_CONFIG)); log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from seqno {}", name, id, topic, startingSeqno); } @@ -114,7 +128,9 @@ public class VerifiableSourceTask extends SourceTask { System.out.println(dataJson); Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno); - SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA, id, Schema.INT64_SCHEMA, seqno); + Schema valueSchema = completeRecordData ? COMPLETE_VALUE_SCHEMA : Schema.INT64_SCHEMA; + Object value = completeRecordData ? completeValue(data) : seqno; + SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA, id, valueSchema, value); List<SourceRecord> result = Collections.singletonList(srcRecord); seqno++; return result; @@ -141,6 +157,15 @@ public class VerifiableSourceTask extends SourceTask { @Override public void stop() { - throttler.wakeup(); + if (throttler != null) + throttler.wakeup(); + } + + private Object completeValue(Map<String, Object> data) { + Struct result = new Struct(COMPLETE_VALUE_SCHEMA); + Stream.of("name", "task", "topic", "time_ms", "seqno").forEach( + field -> result.put(field, data.get(field)) + ); + return result; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 5da5be04a5..cde63b3f83 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -166,6 +166,8 @@ public class KafkaBasedLog<K, V> { this.readLogEndOffsetCallbacks = new ArrayDeque<>(); this.time = time; this.initializer = initializer != null ? initializer : admin -> { }; + // Initialize the producer Optional here to prevent NPEs later on + this.producer = Optional.empty(); // If the consumer is configured with isolation.level = read_committed, then its end offsets method cannot be relied on // as it will not take records from currently-open transactions into account. We want to err on the side of caution in that diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 3249412259..82c7b50d67 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -105,6 +105,7 @@ import java.util.stream.IntStream; import static java.util.Collections.singletonList; import static javax.ws.rs.core.Response.Status.FORBIDDEN; +import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; import static org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG; @@ -2773,7 +2774,15 @@ public class DistributedHerderTest extends ThreadedTest { EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes(); EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(false).anyTimes(); - PowerMock.replayAll(taskConfigCb, signature); + SessionKey sessionKey = EasyMock.mock(SessionKey.class); + SecretKey secretKey = EasyMock.niceMock(SecretKey.class); + EasyMock.expect(sessionKey.key()).andReturn(secretKey); + EasyMock.expect(sessionKey.creationTimestamp()).andReturn(time.milliseconds()); + + PowerMock.replayAll(taskConfigCb, signature, sessionKey, secretKey); + + // Read a new session key from the config topic + configUpdateListener.onSessionKeyUpdate(sessionKey); herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, signature); @@ -2782,6 +2791,28 @@ public class DistributedHerderTest extends ThreadedTest { assertEquals(FORBIDDEN.getStatusCode(), ((ConnectRestException) errorCapture.getValue()).statusCode()); } + @Test + public void putTaskConfigsWorkerStillStarting() { + Callback<Void> taskConfigCb = EasyMock.mock(Callback.class); + Capture<Throwable> errorCapture = Capture.newInstance(); + taskConfigCb.onCompletion(capture(errorCapture), EasyMock.eq(null)); + EasyMock.expectLastCall().once(); + + EasyMock.expect(member.currentProtocolVersion()).andReturn(CONNECT_PROTOCOL_V2).anyTimes(); + + InternalRequestSignature signature = EasyMock.mock(InternalRequestSignature.class); + EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes(); + EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(true).anyTimes(); + + PowerMock.replayAll(taskConfigCb, signature); + + herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, signature); + + PowerMock.verifyAll(); + assertTrue(errorCapture.getValue() instanceof ConnectRestException); + assertEquals(SERVICE_UNAVAILABLE.getStatusCode(), ((ConnectRestException) errorCapture.getValue()).statusCode()); + } + @Test public void testPutTaskConfigsValidRequiredSignature() { Callback<Void> taskConfigCb = EasyMock.mock(Callback.class); @@ -2794,7 +2825,15 @@ public class DistributedHerderTest extends ThreadedTest { EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes(); EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(true).anyTimes(); - PowerMock.replayAll(taskConfigCb, signature); + SessionKey sessionKey = EasyMock.mock(SessionKey.class); + SecretKey secretKey = EasyMock.niceMock(SecretKey.class); + EasyMock.expect(sessionKey.key()).andReturn(secretKey); + EasyMock.expect(sessionKey.creationTimestamp()).andReturn(time.milliseconds()); + + PowerMock.replayAll(taskConfigCb, signature, sessionKey, secretKey); + + // Read a new session key from the config topic + configUpdateListener.onSessionKeyUpdate(sessionKey); herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, signature); @@ -2893,7 +2932,15 @@ public class DistributedHerderTest extends ThreadedTest { EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes(); EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(false).anyTimes(); - PowerMock.replayAll(taskConfigCb, signature); + SessionKey sessionKey = EasyMock.mock(SessionKey.class); + SecretKey secretKey = EasyMock.niceMock(SecretKey.class); + EasyMock.expect(sessionKey.key()).andReturn(secretKey); + EasyMock.expect(sessionKey.creationTimestamp()).andReturn(time.milliseconds()); + + PowerMock.replayAll(taskConfigCb, signature, sessionKey, secretKey); + + // Read a new session key from the config topic + configUpdateListener.onSessionKeyUpdate(sessionKey); herder.fenceZombieSourceTasks(CONN1, taskConfigCb, signature); diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index 41c33ccb9e..6cd4fa0675 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -152,14 +152,17 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): err_msg="Never saw message indicating Kafka Connect joined group on node: " + "%s in condition mode: %s" % (str(node.account), self.startup_mode)) - def stop_node(self, node, clean_shutdown=True): - self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account)) + def stop_node(self, node, clean_shutdown=True, await_shutdown=None): + if await_shutdown is None: + await_shutdown = clean_shutdown + self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account) \ + + " and " + ("" if await_shutdown else "not ") + "awaiting shutdown") pids = self.pids(node) sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL for pid in pids: node.account.signal(pid, sig, allow_fail=True) - if clean_shutdown: + if await_shutdown: for pid in pids: wait_until(lambda: not node.account.alive(pid), timeout_sec=self.startup_timeout_sec, err_msg="Kafka Connect process on " + str( node.account) + " took too long to exit") @@ -464,13 +467,14 @@ class VerifiableSource(VerifiableConnector): Helper class for running a verifiable source connector on a Kafka Connect cluster and analyzing the output. """ - def __init__(self, cc, name="verifiable-source", tasks=1, topic="verifiable", throughput=1000): + def __init__(self, cc, name="verifiable-source", tasks=1, topic="verifiable", throughput=1000, complete_records=False): self.cc = cc self.logger = self.cc.logger self.name = name self.tasks = tasks self.topic = topic self.throughput = throughput + self.complete_records = complete_records def committed_messages(self): return list(filter(lambda m: 'committed' in m and m['committed'], self.messages())) @@ -485,7 +489,8 @@ class VerifiableSource(VerifiableConnector): 'connector.class': 'org.apache.kafka.connect.tools.VerifiableSourceConnector', 'tasks.max': self.tasks, 'topic': self.topic, - 'throughput': self.throughput + 'throughput': self.throughput, + 'complete.record.data': self.complete_records }) diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 970779f723..8347afc8d6 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -54,6 +54,7 @@ class ConnectDistributedTest(Test): STATUS_TOPIC = "connect-status" STATUS_REPLICATION_FACTOR = "1" STATUS_PARTITIONS = "1" + EXACTLY_ONCE_SOURCE_SUPPORT = "disabled" SCHEDULED_REBALANCE_MAX_DELAY_MS = "60000" CONNECT_PROTOCOL="sessioned" @@ -84,7 +85,11 @@ class ConnectDistributedTest(Test): self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, security_protocol=security_protocol, interbroker_security_protocol=security_protocol, topics=self.topics, version=broker_version, - server_prop_overrides=[["auto.create.topics.enable", str(auto_create_topics)]]) + server_prop_overrides=[ + ["auto.create.topics.enable", str(auto_create_topics)], + ["transaction.state.log.replication.factor", str(self.num_brokers)], + ["transaction.state.log.min.isr", str(self.num_brokers)] + ]) if timestamp_type is not None: for node in self.kafka.nodes: node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type @@ -159,27 +164,32 @@ class ConnectDistributedTest(Test): return self._task_has_state(task_id, status, 'RUNNING') @cluster(num_nodes=5) - @matrix(connect_protocol=['sessioned', 'compatible', 'eager']) - def test_restart_failed_connector(self, connect_protocol): + @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager']) + def test_restart_failed_connector(self, exactly_once_source, connect_protocol): + self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled' self.CONNECT_PROTOCOL = connect_protocol self.setup_services() self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() - self.sink = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5) - self.sink.start() + if exactly_once_source: + self.connector = MockSource(self.cc, mode='connector-failure', delay_sec=5) + else: + self.connector = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5) + self.connector.start() - wait_until(lambda: self.connector_is_failed(self.sink), timeout_sec=15, + wait_until(lambda: self.connector_is_failed(self.connector), timeout_sec=15, err_msg="Failed to see connector transition to the FAILED state") - self.cc.restart_connector(self.sink.name) + self.cc.restart_connector(self.connector.name) - wait_until(lambda: self.connector_is_running(self.sink), timeout_sec=10, + wait_until(lambda: self.connector_is_running(self.connector), timeout_sec=10, err_msg="Failed to see connector transition to the RUNNING state") @cluster(num_nodes=5) - @matrix(connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager']) + @matrix(connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager']) def test_restart_failed_task(self, connector_type, connect_protocol): + self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type == 'exactly-once source' else 'disabled' self.CONNECT_PROTOCOL = connect_protocol self.setup_services() self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) @@ -247,13 +257,14 @@ class ConnectDistributedTest(Test): err_msg="Failed to see task transition to the RUNNING state") @cluster(num_nodes=5) - @matrix(connect_protocol=['sessioned', 'compatible', 'eager']) - def test_pause_and_resume_source(self, connect_protocol): + @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager']) + def test_pause_and_resume_source(self, exactly_once_source, connect_protocol): """ Verify that source connectors stop producing records when paused and begin again after being resumed. """ + self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled' self.CONNECT_PROTOCOL = connect_protocol self.setup_services() self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) @@ -336,12 +347,13 @@ class ConnectDistributedTest(Test): err_msg="Failed to consume messages after resuming sink connector") @cluster(num_nodes=5) - @matrix(connect_protocol=['sessioned', 'compatible', 'eager']) - def test_pause_state_persistent(self, connect_protocol): + @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager']) + def test_pause_state_persistent(self, exactly_once_source, connect_protocol): """ Verify that paused state is preserved after a cluster restart. """ + self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled' self.CONNECT_PROTOCOL = connect_protocol self.setup_services() self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) @@ -363,13 +375,14 @@ class ConnectDistributedTest(Test): err_msg="Failed to see connector startup in PAUSED state") @cluster(num_nodes=6) - @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], connect_protocol=['sessioned', 'compatible', 'eager']) - def test_file_source_and_sink(self, security_protocol, connect_protocol): + @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager']) + def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol): """ Tests that a basic file connector works across clean rolling bounces. This validates that the connector is correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes. """ + self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled' self.CONNECT_PROTOCOL = connect_protocol self.setup_services(security_protocol=security_protocol, include_filestream_connectors=True) self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) @@ -400,7 +413,8 @@ class ConnectDistributedTest(Test): def test_bounce(self, clean, connect_protocol): """ Validates that source and sink tasks that run continuously and produce a predictable sequence of messages - run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces. + run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces, + and at least once when workers undergo unclean bounces. """ num_tasks = 3 @@ -414,11 +428,14 @@ class ConnectDistributedTest(Test): self.sink = VerifiableSink(self.cc, tasks=num_tasks, topics=[self.TOPIC]) self.sink.start() - for _ in range(3): - for node in self.cc.nodes: + for i in range(3): + start = i % len(self.cc.nodes) + # Don't want to restart worker nodes in the same order every time + shuffled_nodes = self.cc.nodes[start:] + self.cc.nodes[:start] + for node in shuffled_nodes: started = time.time() self.logger.info("%s bouncing Kafka Connect on %s", clean and "Clean" or "Hard", str(node.account)) - self.cc.stop_node(node, clean_shutdown=clean) + self.cc.stop_node(node, clean_shutdown=clean, await_shutdown=True) with node.account.monitor_log(self.cc.LOG_FILE) as monitor: self.cc.start_node(node) monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90, @@ -453,7 +470,7 @@ class ConnectDistributedTest(Test): self.cc.stop() # Validate at least once delivery of everything that was reported as written since we should have flushed and - # cleanly exited. Currently this only tests at least once delivery because the sink task may not have consumed + # cleanly exited. Currently this only tests at least once delivery for sinks because the task may not have consumed # all the messages generated by the source task. This needs to be done per-task since seqnos are not unique across # tasks. success = True @@ -519,6 +536,110 @@ class ConnectDistributedTest(Test): assert success, "Found validation errors:\n" + "\n ".join(errors) + @cluster(num_nodes=6) + @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager']) + def test_exactly_once_source(self, clean, connect_protocol): + """ + Validates that source tasks run correctly and deliver messages exactly once + when Kafka Connect workers undergo bounces, both clean and unclean. + """ + num_tasks = 3 + + self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' + self.CONNECT_PROTOCOL = connect_protocol + self.setup_services() + self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) + self.cc.start() + + self.source = VerifiableSource(self.cc, topic=self.TOPIC, tasks=num_tasks, throughput=100, complete_records=True) + self.source.start() + + for i in range(4): + start = i % len(self.cc.nodes) + # Don't want to restart worker nodes in the same order every time + shuffled_nodes = self.cc.nodes[start:] + self.cc.nodes[:start] + for node in shuffled_nodes: + started = time.time() + self.logger.info("%s bouncing Kafka Connect on %s", clean and "Clean" or "Hard", str(node.account)) + self.cc.stop_node(node, clean_shutdown=clean, await_shutdown=True) + with node.account.monitor_log(self.cc.LOG_FILE) as monitor: + self.cc.start_node(node) + monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90, + err_msg="Kafka Connect worker didn't successfully join group and start work") + self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started) + + if i < 2: + # Give additional time for the worker group to recover. Even if it is not a hard bounce, there are + # some cases where a restart can cause a rebalance to take the full length of the session timeout + # (e.g. if the client shuts down before it has received the memberId from its initial JoinGroup). + # If we don't give enough time for the group to stabilize, the next bounce may cause workers to + # be shut down before they have any time to process data and we can end up with zero data making it + # through the test. + time.sleep(15) + else: + # We also need to make sure that, even without time for the cluster to recover gracefully in between + # worker restarts, the cluster and its tasks do not get into an inconsistent state and either duplicate or + # drop messages. + pass + + # Wait at least scheduled.rebalance.max.delay.ms to expire and rebalance + time.sleep(60) + + # It's possible that a zombie fencing request from a follower to the leader failed when we bounced the leader + # We don't automatically retry these requests because some failures (such as insufficient ACLs for the + # connector's principal) are genuine and need to be reported by failing the task and displaying an error message + # in the status for the task in the REST API. + # So, we make a polite request to the cluster to restart any failed tasks + self.cc.restart_connector_and_tasks(self.source.name, only_failed='true', include_tasks='true') + + # Allow the connectors to startup, recover, and exit cleanly before + # ending the test. + wait_until(lambda: self.is_running(self.source), timeout_sec=30, + err_msg="Failed to see connector transition to the RUNNING state") + time.sleep(15) + self.source.stop() + self.cc.stop() + + consumer = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, message_validator=json.loads, consumer_timeout_ms=1000, isolation_level="read_committed") + consumer.run() + src_messages = consumer.messages_consumed[1] + + success = True + errors = [] + for task in range(num_tasks): + # Validate source messages + src_seqnos = [msg['payload']['seqno'] for msg in src_messages if msg['payload']['task'] == task] + if len(src_seqnos) == 0: + self.logger.error("No records produced by task " + str(task)) + errors.append("No records produced by task %d" % (task)) + success = False + continue + # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean + # bouncing should commit on rebalance. + src_seqno_max = max(src_seqnos) + self.logger.debug("Max source seqno: %d", src_seqno_max) + src_seqno_counts = Counter(src_seqnos) + missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos))) + duplicate_src_seqnos = sorted(seqno for seqno,count in src_seqno_counts.items() if count > 1) + + if missing_src_seqnos: + self.logger.error("Missing source sequence numbers for task " + str(task)) + errors.append("Found missing source sequence numbers for task %d: %s" % (task, missing_src_seqnos)) + success = False + if duplicate_src_seqnos: + self.logger.error("Duplicate source sequence numbers for task " + str(task)) + errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos)) + success = False + + if not success: + self.mark_for_collect(self.cc) + # Also collect the data in the topic to aid in debugging + consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True) + consumer_validator.run() + self.mark_for_collect(consumer_validator, "consumer_stdout") + + assert success, "Found validation errors:\n" + "\n ".join(errors) + @cluster(num_nodes=6) @matrix(connect_protocol=['sessioned', 'compatible', 'eager']) def test_transformations(self, connect_protocol): @@ -577,42 +698,51 @@ class ConnectDistributedTest(Test): assert obj['payload'][ts_fieldname] == ts @cluster(num_nodes=5) - @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='sessioned') - @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible') - @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager') - def test_broker_compatibility(self, broker_version, auto_create_topics, security_protocol, connect_protocol): + @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') + @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned') + @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='sessioned') + @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='compatible') + @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') + @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') + @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') + @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') + @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') + @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') + @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') + @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') + @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') + @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') + @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='eager') + def test_broker_compatibility(self, broker_version, auto_create_topics, exactly_once_source, connect_protocol): """ Verify that Connect will start up with various broker versions with various configurations. When Connect distributed starts up, it either creates internal topics (v0.10.1.0 and after) or relies upon the broker to auto-create the topics (v0.10.0.x and before). """ + self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled' self.CONNECT_PROTOCOL = connect_protocol self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics, - security_protocol=security_protocol, include_filestream_connectors=True) + security_protocol=SecurityConfig.PLAINTEXT, include_filestream_connectors=True) self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index ff44d9412f..5e3e69e1c7 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -34,7 +34,8 @@ class ConnectRestApiTest(KafkaTest): FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter', 'header.converter', 'batch.size', 'topic', 'file', 'transforms', 'config.action.reload', 'errors.retry.timeout', 'errors.retry.delay.max.ms', - 'errors.tolerance', 'errors.log.enable', 'errors.log.include.messages', 'predicates', 'topic.creation.groups'} + 'errors.tolerance', 'errors.log.enable', 'errors.log.include.messages', 'predicates', 'topic.creation.groups', + 'exactly.once.support', 'transaction.boundary', 'transaction.boundary.interval.ms', 'offsets.storage.topic'} FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter', 'header.converter', 'topics', 'file', 'transforms', 'topics.regex', 'config.action.reload', 'errors.retry.timeout', 'errors.retry.delay.max.ms', 'errors.tolerance', 'errors.log.enable', 'errors.log.include.messages', 'errors.deadletterqueue.topic.name', diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties index 6d2d5e28d1..cd80219090 100644 --- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties +++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties @@ -20,6 +20,12 @@ bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_prot group.id={{ group|default("connect-cluster") }} +exactly.once.source.support={{ EXACTLY_ONCE_SOURCE_SUPPORT|default("disabled") }} +{% if EXACTLY_ONCE_SOURCE_SUPPORT is defined and EXACTLY_ONCE_SOURCE_SUPPORT == "enabled" %} +# Reduce transaction timeouts so tests that kill workers don't need to wait as long to recover +producer.transaction.timeout.ms = 10000 +{% endif %} + connect.protocol={{ CONNECT_PROTOCOL|default("sessioned") }} scheduled.rebalance.max.delay.ms={{ SCHEDULED_REBALANCE_MAX_DELAY_MS|default(60000) }}