[GitHub] [flink] XComp commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

2022-11-20 Thread GitBox


XComp commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1027592579


##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##
@@ -180,6 +180,10 @@ public void createTopic(String topic, int 
numberOfPartitions) {
 }
 }
 
+public void createSchema(String topic, Schema schema) {
+sneakyAdmin(() -> admin().schemas().createSchema(topic, 
schema.getSchemaInfo()));

Review Comment:
   Thanks for the brief clarification, @syhily . I created FLINK-30109 to cover 
the issue. We either should add more documentation clarifying why we do this or 
removing it as it's a workaround that might raise questions to readers. Let's 
move the discussion into FLINK-30109. I'm curious about your view on that and 
why the Pulsar API is forcing us to do this exception transformation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

2022-11-09 Thread GitBox


XComp commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1016457270


##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##
@@ -183,37 +191,58 @@ public List getExpectedRecords() {
 private static class StopSignal implements Closeable {
 private static final Logger LOG = 
LoggerFactory.getLogger(StopSignal.class);
 
-private final String topic;
 private final int desiredCounts;
 // This is a thread-safe list.
 private final List consumedRecords;
 private final AtomicLong deadline;
 private final ExecutorService executor;
+private final Consumer consumer;
+private final AtomicReference 
throwableException;
 
 public StopSignal(
 PulsarRuntimeOperator operator, String topic, int 
messageCounts, Duration timeout) {
-this.topic = topic;
 this.desiredCounts = messageCounts;
 this.consumedRecords = Collections.synchronizedList(new 
ArrayList<>(messageCounts));
 this.deadline = new AtomicLong(timeout.toMillis() + 
System.currentTimeMillis());
 this.executor = Executors.newSingleThreadExecutor();
+ConsumerBuilder consumerBuilder =

Review Comment:
   ```suggestion
   final ConsumerBuilder consumerBuilder =
   ```
   nit



##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##
@@ -180,6 +180,10 @@ public void createTopic(String topic, int 
numberOfPartitions) {
 }
 }
 
+public void createSchema(String topic, Schema schema) {

Review Comment:
   ```suggestion
   @VisibleForTesting
   public void createSchema(String topic, Schema schema) {
   ```



##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##
@@ -180,6 +180,10 @@ public void createTopic(String topic, int 
numberOfPartitions) {
 }
 }
 
+public void createSchema(String topic, Schema schema) {
+sneakyAdmin(() -> admin().schemas().createSchema(topic, 
schema.getSchemaInfo()));

Review Comment:
   Why do we use these `sneaky` utility functions everywhere? It feels a bit 
fishy to hide checked exceptions. Either we have to document the reason 
properly or we should get rid of it and do proper exception handling. WDYT?
   I understand that such a change would be out-of-scope of this PR. We would 
need to create a follow-up ticket.



##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##
@@ -183,37 +191,58 @@ public List getExpectedRecords() {
 private static class StopSignal implements Closeable {
 private static final Logger LOG = 
LoggerFactory.getLogger(StopSignal.class);
 
-private final String topic;
 private final int desiredCounts;
 // This is a thread-safe list.
 private final List consumedRecords;
 private final AtomicLong deadline;
 private final ExecutorService executor;
+private final Consumer consumer;
+private final AtomicReference 
throwableException;
 
 public StopSignal(
 PulsarRuntimeOperator operator, String topic, int 
messageCounts, Duration timeout) {
-this.topic = topic;
 this.desiredCounts = messageCounts;
 this.consumedRecords = Collections.synchronizedList(new 
ArrayList<>(messageCounts));
 this.deadline = new AtomicLong(timeout.toMillis() + 
System.currentTimeMillis());
 this.executor = Executors.newSingleThreadExecutor();
+ConsumerBuilder consumerBuilder =
+operator.client()
+.newConsumer(Schema.STRING)
+.topic(topic)
+.subscriptionName(randomAlphanumeric(10))
+.subscriptionMode(Durable)
+.subscriptionType(Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+this.consumer = sneakyClient(consumerBuilder::subscribe);
+this.throwableException = new AtomicReference<>();

Review Comment:
   What about working with `CompletableFutures` here? It feels more natural to 
use rather than utilizing an `AtomicReference` here.



##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##
@@ -183,37 +191,58 @@ public List getExpectedRecords() {
 private static class StopSignal implements Closeable {
 private static final Logger LOG = 
LoggerFactory.getLogger(StopSignal.class);
 
-pr