This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit a6ba99792ea4916f73cca5bee4389dcbec2bf180
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Fri Nov 11 15:14:22 2022 +0800

    [FLINK-29830][Connector/Pulsar] Create the topic with schema before 
consuming messages in PulsarSinkITCase. (#21252)
---
 .../connector/pulsar/sink/PulsarSinkITCase.java    |  2 ++
 .../pulsar/testutils/function/ControlSource.java   | 41 ++++++++++++++++++----
 .../testutils/runtime/PulsarRuntimeOperator.java   |  4 +++
 3 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 71e778c..0e5a173 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
+import static org.apache.pulsar.client.api.Schema.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for using PulsarSink writing to a Pulsar cluster. */
@@ -104,6 +105,7 @@ class PulsarSinkITCase {
             // A random topic with partition 4.
             String topic = randomAlphabetic(8);
             operator().createTopic(topic, 4);
+            operator().createSchema(topic, STRING);
             int counts = ThreadLocalRandom.current().nextInt(100, 200);
 
             ControlSource source =
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
index 3684167..127e267 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
@@ -31,8 +31,12 @@ import org.apache.flink.testutils.junit.SharedReference;
 
 import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles;
 
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +49,12 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
 
 /**
  * This source is used for testing in Pulsar sink. We would generate a fix 
number of records by the
@@ -183,20 +191,30 @@ public class ControlSource extends AbstractRichFunction
     private static class StopSignal implements Closeable {
         private static final Logger LOG = 
LoggerFactory.getLogger(StopSignal.class);
 
-        private final String topic;
         private final int desiredCounts;
         // This is a thread-safe list.
         private final List<String> consumedRecords;
         private final AtomicLong deadline;
         private final ExecutorService executor;
+        private final Consumer<String> consumer;
+        private final AtomicReference<PulsarClientException> 
throwableException;
 
         public StopSignal(
                 PulsarRuntimeOperator operator, String topic, int 
messageCounts, Duration timeout) {
-            this.topic = topic;
             this.desiredCounts = messageCounts;
             this.consumedRecords = Collections.synchronizedList(new 
ArrayList<>(messageCounts));
             this.deadline = new AtomicLong(timeout.toMillis() + 
System.currentTimeMillis());
             this.executor = Executors.newSingleThreadExecutor();
+            ConsumerBuilder<String> consumerBuilder =
+                    operator.client()
+                            .newConsumer(Schema.STRING)
+                            .topic(topic)
+                            .subscriptionName(randomAlphanumeric(10))
+                            .subscriptionMode(Durable)
+                            .subscriptionType(Exclusive)
+                            
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+            this.consumer = sneakyClient(consumerBuilder::subscribe);
+            this.throwableException = new AtomicReference<>();
 
             // Start consuming.
             executor.execute(
@@ -204,16 +222,27 @@ public class ControlSource extends AbstractRichFunction
                         while (consumedRecords.size() < desiredCounts) {
                             // This method would block until we consumed a 
message.
                             int counts = desiredCounts - 
consumedRecords.size();
-                            List<Message<String>> messages =
-                                    operator.receiveMessages(this.topic, 
Schema.STRING, counts);
-                            for (Message<String> message : messages) {
-                                consumedRecords.add(message.getValue());
+                            for (int i = 0; i < counts; i++) {
+                                try {
+                                    Message<String> message = 
consumer.receive();
+                                    consumedRecords.add(message.getValue());
+                                } catch (PulsarClientException e) {
+                                    throwableException.set(e);
+                                    break;
+                                }
                             }
                         }
                     });
         }
 
         public boolean canStop() {
+            PulsarClientException exception = throwableException.get();
+            if (exception != null) {
+                LOG.error("Error in consuming messages from Pulsar.");
+                LOG.error("", exception);
+                return true;
+            }
+
             if (deadline.get() < System.currentTimeMillis()) {
                 String errorMsg =
                         String.format(
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index 5d14d7e..c76bb2f 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -180,6 +180,10 @@ public class PulsarRuntimeOperator implements Closeable {
         }
     }
 
+    public void createSchema(String topic, Schema<?> schema) {
+        sneakyAdmin(() -> admin().schemas().createSchema(topic, 
schema.getSchemaInfo()));
+    }
+
     /**
      * Increase the partition number of the topic.
      *

Reply via email to