PatrickRen commented on a change in pull request #16860:
URL: https://github.com/apache/flink/pull/16860#discussion_r690247105
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
##########
@@ -27,44 +27,101 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.ClassRule;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
/** Base class for Kafka Table IT Cases. */
public abstract class KafkaTableTestBase extends AbstractTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaTableTestBase.class);
+
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final int zkTimeoutMills = 30000;
- @ClassRule
- public static final KafkaContainer KAFKA_CONTAINER =
- new
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
- .withEmbeddedZookeeper()
- .withNetwork(NETWORK)
- .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+ public static KafkaContainer kafkaContainer;
protected StreamExecutionEnvironment env;
protected StreamTableEnvironment tEnv;
+ // Timer for scheduling logging task if the test hangs
+ private final Timer loggingTimer = new Timer("Debug Logging Timer");
+
+ @BeforeClass
+ public static void startTestEnv() {
+ kafkaContainer =
+ new
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+ .withEmbeddedZookeeper()
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+ kafkaContainer.start();
+ if (LOG.isInfoEnabled()) {
+ kafkaContainer.followOutput(new Slf4jLogConsumer(LOG));
+ }
+ }
+
+ @AfterClass
+ public static void tearDownTestEnv() {
+ kafkaContainer.stop();
+ }
Review comment:
Fixed by d6bfdd9
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
##########
@@ -27,44 +27,101 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.ClassRule;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
/** Base class for Kafka Table IT Cases. */
public abstract class KafkaTableTestBase extends AbstractTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaTableTestBase.class);
+
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final int zkTimeoutMills = 30000;
- @ClassRule
- public static final KafkaContainer KAFKA_CONTAINER =
- new
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
- .withEmbeddedZookeeper()
- .withNetwork(NETWORK)
- .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+ public static KafkaContainer kafkaContainer;
protected StreamExecutionEnvironment env;
protected StreamTableEnvironment tEnv;
+ // Timer for scheduling logging task if the test hangs
+ private final Timer loggingTimer = new Timer("Debug Logging Timer");
+
+ @BeforeClass
+ public static void startTestEnv() {
+ kafkaContainer =
+ new
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+ .withEmbeddedZookeeper()
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+ kafkaContainer.start();
+ if (LOG.isInfoEnabled()) {
+ kafkaContainer.followOutput(new Slf4jLogConsumer(LOG));
+ }
+ }
+
+ @AfterClass
+ public static void tearDownTestEnv() {
+ kafkaContainer.stop();
+ }
+
@Before
public void setup() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+
+ // Print debug log if the test has been hanging for 5 minutes
+ scheduleTimeoutLogger(
+ Duration.ofMinutes(5),
+ () -> {
+ // List all non-internal topics
+ final Map<String, TopicDescription> topicDescriptions =
+ describeExternalTopics();
+ LOG.info("Current existing topics: {}",
topicDescriptions.keySet());
+
+ // Log status in topics
+ logTopicPartitionStatus(topicDescriptions);
+
+ // Cancel the timer
+ loggingTimer.cancel();
+ });
Review comment:
Fixed by d6bfdd9
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]