[ https://issues.apache.org/jira/browse/KAFKA-6577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372606#comment-16372606 ]
ASF GitHub Bot commented on KAFKA-6577: --------------------------------------- dguy closed pull request #4610: KAFKA-6577: Fix Connect system tests and add debug messages URL: https://github.com/apache/kafka/pull/4610 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 4afa47dda1a..3b7ec87f644 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -74,6 +74,7 @@ public static void main(String[] args) throws Exception { DistributedConfig config = new DistributedConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); + log.debug("Kafka cluster ID: {}", kafkaClusterId); RestServer rest = new RestServer(config); URI advertisedUrl = rest.advertisedUrl(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java index 17699053541..413cb46cf28 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java @@ -78,6 +78,7 @@ public static void main(String[] args) throws Exception { StandaloneConfig config = new StandaloneConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); + log.debug("Kafka cluster ID: {}", kafkaClusterId); RestServer rest = new RestServer(config); URI advertisedUrl = rest.advertisedUrl(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index b34e48390e1..e51b365cec6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -432,6 +432,7 @@ public void putTargetState(String connector, TargetState state) { Runnable createTopics = new Runnable() { @Override public void run() { + log.debug("Creating admin client to manage Connect internal config topic"); try (TopicAdmin admin = new TopicAdmin(adminProps)) { admin.createTopics(topicDescription); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index f29f3c23d03..fb8ad97b48d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -94,6 +94,7 @@ public void configure(final WorkerConfig config) { Runnable createTopics = new Runnable() { @Override public void run() { + log.debug("Creating admin client to manage Connect internal offset topic"); try (TopicAdmin admin = new TopicAdmin(adminProps)) { admin.createTopics(topicDescription); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 8ca21ebb350..6710808f9a9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -157,6 +157,7 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) Runnable createTopics = new Runnable() { @Override public void run() { + log.debug("Creating admin client to manage Connect internal status topic"); try (TopicAdmin admin = new TopicAdmin(adminProps)) { admin.createTopics(topicDescription); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index 19452047fcb..9f30236fdee 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -40,6 +40,7 @@ else if (timestamp == RecordBatch.NO_TIMESTAMP) } public static String lookupKafkaClusterId(WorkerConfig config) { + log.info("Creating Kafka admin client"); try (AdminClient adminClient = AdminClient.create(config.originals())) { return lookupKafkaClusterId(adminClient); } @@ -53,13 +54,15 @@ static String lookupKafkaClusterId(AdminClient adminClient) { log.info("Kafka cluster version is too old to return cluster ID"); return null; } + log.debug("Fetching Kafka cluster ID"); String kafkaClusterId = clusterIdFuture.get(); log.info("Kafka cluster ID: {}", kafkaClusterId); return kafkaClusterId; } catch (InterruptedException e) { throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e); } catch (ExecutionException e) { - throw new ConnectException("Failed to connect to and describe Kafka cluster", e); + throw new ConnectException("Failed to connect to and describe Kafka cluster. " + + "Check worker's broker connection and security properties.", e); } } } diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 9436119f886..37538763337 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -91,7 +91,7 @@ def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.Jso self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE]) self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE]) self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC, - consumer_timeout_ms=1000) + consumer_timeout_ms=10000) self.zk.start() self.kafka.start() diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties index 6660e6c0e34..a1d3de29d51 100644 --- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties +++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties @@ -52,3 +52,6 @@ rest.advertised.host.name = {{ node.account.hostname }} # Reduce session timeouts so tests that kill workers don't need to wait as long to recover session.timeout.ms=10000 consumer.session.timeout.ms=10000 + +# Reduce the admin client request timeouts so that we don't wait the default 120 sec before failing to connect the admin client +request.timeout.ms=30000 diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties index 09c648720c7..5f079f7a396 100644 --- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties +++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties @@ -14,6 +14,7 @@ # limitations under the License. bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }} +{{ kafka.security_config.client_config().props() }} {{ kafka.security_config.client_config().props("producer.") }} {{ kafka.security_config.client_config().props("consumer.") }} @@ -32,3 +33,6 @@ internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename={{ OFFSETS_FILE }} + +# Reduce the admin client request timeouts so that we don't wait the default 120 sec before failing to connect the admin client +request.timeout.ms=30000 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect standalone SASL file source and sink test fails without explanation > --------------------------------------------------------------------------- > > Key: KAFKA-6577 > URL: https://issues.apache.org/jira/browse/KAFKA-6577 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, system tests > Affects Versions: 1.1.0 > Reporter: Randall Hauch > Assignee: Randall Hauch > Priority: Blocker > Fix For: 1.1.0, 1.2.0 > > > The > {{tests/kafkatest/tests/connect/connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink}} > test is failing with the SASL configuration without a sufficient > explanation. During the test, the Connect worker fails to start, but the > Connect log contains no useful information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)