xintongsong commented on a change in pull request #16860:
URL: https://github.com/apache/flink/pull/16860#discussion_r690207576



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
##########
@@ -27,44 +27,101 @@
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.ClassRule;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
 
 /** Base class for Kafka Table IT Cases. */
 public abstract class KafkaTableTestBase extends AbstractTestBase {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTableTestBase.class);
+
     private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
     private static final Network NETWORK = Network.newNetwork();
     private static final int zkTimeoutMills = 30000;
 
-    @ClassRule
-    public static final KafkaContainer KAFKA_CONTAINER =
-            new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
-                    .withEmbeddedZookeeper()
-                    .withNetwork(NETWORK)
-                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+    public static KafkaContainer kafkaContainer;
 
     protected StreamExecutionEnvironment env;
     protected StreamTableEnvironment tEnv;
 
+    // Timer for scheduling logging task if the test hangs
+    private final Timer loggingTimer = new Timer("Debug Logging Timer");
+
+    @BeforeClass
+    public static void startTestEnv() {
+        kafkaContainer =
+                new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+                        .withEmbeddedZookeeper()
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+        kafkaContainer.start();
+        if (LOG.isInfoEnabled()) {
+            kafkaContainer.followOutput(new Slf4jLogConsumer(LOG));
+        }
+    }
+
+    @AfterClass
+    public static void tearDownTestEnv() {
+        kafkaContainer.stop();
+    }
+
     @Before
     public void setup() {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
         tEnv = StreamTableEnvironment.create(env);
         env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+
+        // Print debug log if the test has been hanging for 5 minutes
+        scheduleTimeoutLogger(
+                Duration.ofMinutes(5),
+                () -> {
+                    // List all non-internal topics
+                    final Map<String, TopicDescription> topicDescriptions =
+                            describeExternalTopics();
+                    LOG.info("Current existing topics: {}", 
topicDescriptions.keySet());
+
+                    // Log status in topics
+                    logTopicPartitionStatus(topicDescriptions);
+
+                    // Cancel the timer
+                    loggingTimer.cancel();
+                });

Review comment:
       What is the rationality behind the behavior that prints debug logs for 
once at the 5min timeout?
   - If the test case is expected to finish within 5min, we should also fail is 
on timeout.
   - If there's no assumption that the case should finish within 5min, we may 
print the logs before the test case stuck.
   
   I'd suggest to simply probe the topics periodically, to decouple the logging 
logs from the testing logics.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
##########
@@ -27,44 +27,101 @@
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.ClassRule;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
 
 /** Base class for Kafka Table IT Cases. */
 public abstract class KafkaTableTestBase extends AbstractTestBase {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTableTestBase.class);
+
     private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
     private static final Network NETWORK = Network.newNetwork();
     private static final int zkTimeoutMills = 30000;
 
-    @ClassRule
-    public static final KafkaContainer KAFKA_CONTAINER =
-            new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
-                    .withEmbeddedZookeeper()
-                    .withNetwork(NETWORK)
-                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+    public static KafkaContainer kafkaContainer;
 
     protected StreamExecutionEnvironment env;
     protected StreamTableEnvironment tEnv;
 
+    // Timer for scheduling logging task if the test hangs
+    private final Timer loggingTimer = new Timer("Debug Logging Timer");
+
+    @BeforeClass
+    public static void startTestEnv() {
+        kafkaContainer =
+                new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+                        .withEmbeddedZookeeper()
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+        kafkaContainer.start();
+        if (LOG.isInfoEnabled()) {
+            kafkaContainer.followOutput(new Slf4jLogConsumer(LOG));
+        }
+    }
+
+    @AfterClass
+    public static void tearDownTestEnv() {
+        kafkaContainer.stop();
+    }

Review comment:
       Is it possible to achieve this by overriding `KafkaContainer#doStart()`?
   What the current approach make me uncomfortable are:
   - It makes `kafkaContainer` non-final, which lead to potential problems like 
NPEs or so.
   - As an abstract class, `startTestEnv` and `tearDownTestEnv` could be 
overridden.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to