[ 
https://issues.apache.org/jira/browse/KAFKA-6577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372606#comment-16372606
 ] 

ASF GitHub Bot commented on KAFKA-6577:
---------------------------------------

dguy closed pull request #4610: KAFKA-6577: Fix Connect system tests and add 
debug messages
URL: https://github.com/apache/kafka/pull/4610
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 4afa47dda1a..3b7ec87f644 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -74,6 +74,7 @@ public static void main(String[] args) throws Exception {
         DistributedConfig config = new DistributedConfig(workerProps);
 
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 17699053541..413cb46cf28 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -78,6 +78,7 @@ public static void main(String[] args) throws Exception {
         StandaloneConfig config = new StandaloneConfig(workerProps);
 
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index b34e48390e1..e51b365cec6 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -432,6 +432,7 @@ public void putTargetState(String connector, TargetState 
state) {
         Runnable createTopics = new Runnable() {
             @Override
             public void run() {
+                log.debug("Creating admin client to manage Connect internal 
config topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                     admin.createTopics(topicDescription);
                 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index f29f3c23d03..fb8ad97b48d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -94,6 +94,7 @@ public void configure(final WorkerConfig config) {
         Runnable createTopics = new Runnable() {
             @Override
             public void run() {
+                log.debug("Creating admin client to manage Connect internal 
offset topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                     admin.createTopics(topicDescription);
                 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 8ca21ebb350..6710808f9a9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -157,6 +157,7 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
         Runnable createTopics = new Runnable() {
             @Override
             public void run() {
+                log.debug("Creating admin client to manage Connect internal 
status topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                     admin.createTopics(topicDescription);
                 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 19452047fcb..9f30236fdee 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -40,6 +40,7 @@ else if (timestamp == RecordBatch.NO_TIMESTAMP)
     }
 
     public static String lookupKafkaClusterId(WorkerConfig config) {
+        log.info("Creating Kafka admin client");
         try (AdminClient adminClient = AdminClient.create(config.originals())) 
{
             return lookupKafkaClusterId(adminClient);
         }
@@ -53,13 +54,15 @@ static String lookupKafkaClusterId(AdminClient adminClient) 
{
                 log.info("Kafka cluster version is too old to return cluster 
ID");
                 return null;
             }
+            log.debug("Fetching Kafka cluster ID");
             String kafkaClusterId = clusterIdFuture.get();
             log.info("Kafka cluster ID: {}", kafkaClusterId);
             return kafkaClusterId;
         } catch (InterruptedException e) {
             throw new ConnectException("Unexpectedly interrupted when looking 
up Kafka cluster info", e);
         } catch (ExecutionException e) {
-            throw new ConnectException("Failed to connect to and describe 
Kafka cluster", e);
+            throw new ConnectException("Failed to connect to and describe 
Kafka cluster. "
+                                       + "Check worker's broker connection and 
security properties.", e);
         }
     }
 }
diff --git a/tests/kafkatest/tests/connect/connect_test.py 
b/tests/kafkatest/tests/connect/connect_test.py
index 9436119f886..37538763337 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -91,7 +91,7 @@ def test_file_source_and_sink(self, 
converter="org.apache.kafka.connect.json.Jso
         self.source = ConnectStandaloneService(self.test_context, self.kafka, 
[self.INPUT_FILE, self.OFFSETS_FILE])
         self.sink = ConnectStandaloneService(self.test_context, self.kafka, 
[self.OUTPUT_FILE, self.OFFSETS_FILE])
         self.consumer_validator = ConsoleConsumer(self.test_context, 1, 
self.kafka, self.TOPIC,
-                                                  consumer_timeout_ms=1000)
+                                                  consumer_timeout_ms=10000)
 
         self.zk.start()
         self.kafka.start()
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-distributed.properties 
b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index 6660e6c0e34..a1d3de29d51 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -52,3 +52,6 @@ rest.advertised.host.name = {{ node.account.hostname }}
 # Reduce session timeouts so tests that kill workers don't need to wait as 
long to recover
 session.timeout.ms=10000
 consumer.session.timeout.ms=10000
+
+# Reduce the admin client request timeouts so that we don't wait the default 
120 sec before failing to connect the admin client
+request.timeout.ms=30000
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-standalone.properties 
b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index 09c648720c7..5f079f7a396 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 bootstrap.servers={{ 
kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
+{{ kafka.security_config.client_config().props() }}
 {{ kafka.security_config.client_config().props("producer.") }}
 {{ kafka.security_config.client_config().props("consumer.") }}
 
@@ -32,3 +33,6 @@ internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 
 offset.storage.file.filename={{ OFFSETS_FILE }}
+
+# Reduce the admin client request timeouts so that we don't wait the default 
120 sec before failing to connect the admin client
+request.timeout.ms=30000


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect standalone SASL file source and sink test fails without explanation
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6577
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6577
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect, system tests
>    Affects Versions: 1.1.0
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>            Priority: Blocker
>             Fix For: 1.1.0, 1.2.0
>
>
> The 
> {{tests/kafkatest/tests/connect/connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink}}
>  test is failing with the SASL configuration without a sufficient 
> explanation. During the test, the Connect worker fails to start, but the 
> Connect log contains no useful information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to