This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit a6ba99792ea4916f73cca5bee4389dcbec2bf180 Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Fri Nov 11 15:14:22 2022 +0800 [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase. (#21252) --- .../connector/pulsar/sink/PulsarSinkITCase.java | 2 ++ .../pulsar/testutils/function/ControlSource.java | 41 ++++++++++++++++++---- .../testutils/runtime/PulsarRuntimeOperator.java | 4 +++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java index 71e778c..0e5a173 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java @@ -51,6 +51,7 @@ import java.util.concurrent.ThreadLocalRandom; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema; +import static org.apache.pulsar.client.api.Schema.STRING; import static org.assertj.core.api.Assertions.assertThat; /** Tests for using PulsarSink writing to a Pulsar cluster. */ @@ -104,6 +105,7 @@ class PulsarSinkITCase { // A random topic with partition 4. String topic = randomAlphabetic(8); operator().createTopic(topic, 4); + operator().createSchema(topic, STRING); int counts = ThreadLocalRandom.current().nextInt(100, 200); ControlSource source = diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java index 3684167..127e267 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java @@ -31,8 +31,12 @@ import org.apache.flink.testutils.junit.SharedReference; import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +49,12 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; +import static org.apache.pulsar.client.api.SubscriptionMode.Durable; +import static org.apache.pulsar.client.api.SubscriptionType.Exclusive; /** * This source is used for testing in Pulsar sink. We would generate a fix number of records by the @@ -183,20 +191,30 @@ public class ControlSource extends AbstractRichFunction private static class StopSignal implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class); - private final String topic; private final int desiredCounts; // This is a thread-safe list. private final List<String> consumedRecords; private final AtomicLong deadline; private final ExecutorService executor; + private final Consumer<String> consumer; + private final AtomicReference<PulsarClientException> throwableException; public StopSignal( PulsarRuntimeOperator operator, String topic, int messageCounts, Duration timeout) { - this.topic = topic; this.desiredCounts = messageCounts; this.consumedRecords = Collections.synchronizedList(new ArrayList<>(messageCounts)); this.deadline = new AtomicLong(timeout.toMillis() + System.currentTimeMillis()); this.executor = Executors.newSingleThreadExecutor(); + ConsumerBuilder<String> consumerBuilder = + operator.client() + .newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(randomAlphanumeric(10)) + .subscriptionMode(Durable) + .subscriptionType(Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); + this.consumer = sneakyClient(consumerBuilder::subscribe); + this.throwableException = new AtomicReference<>(); // Start consuming. executor.execute( @@ -204,16 +222,27 @@ public class ControlSource extends AbstractRichFunction while (consumedRecords.size() < desiredCounts) { // This method would block until we consumed a message. int counts = desiredCounts - consumedRecords.size(); - List<Message<String>> messages = - operator.receiveMessages(this.topic, Schema.STRING, counts); - for (Message<String> message : messages) { - consumedRecords.add(message.getValue()); + for (int i = 0; i < counts; i++) { + try { + Message<String> message = consumer.receive(); + consumedRecords.add(message.getValue()); + } catch (PulsarClientException e) { + throwableException.set(e); + break; + } } } }); } public boolean canStop() { + PulsarClientException exception = throwableException.get(); + if (exception != null) { + LOG.error("Error in consuming messages from Pulsar."); + LOG.error("", exception); + return true; + } + if (deadline.get() < System.currentTimeMillis()) { String errorMsg = String.format( diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java index 5d14d7e..c76bb2f 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java @@ -180,6 +180,10 @@ public class PulsarRuntimeOperator implements Closeable { } } + public void createSchema(String topic, Schema<?> schema) { + sneakyAdmin(() -> admin().schemas().createSchema(topic, schema.getSchemaInfo())); + } + /** * Increase the partition number of the topic. *