XComp commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1016457270
##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##
@@ -183,37 +191,58 @@ public List getExpectedRecords() {
private static class StopSignal implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(StopSignal.class);
-private final String topic;
private final int desiredCounts;
// This is a thread-safe list.
private final List consumedRecords;
private final AtomicLong deadline;
private final ExecutorService executor;
+private final Consumer consumer;
+private final AtomicReference
throwableException;
public StopSignal(
PulsarRuntimeOperator operator, String topic, int
messageCounts, Duration timeout) {
-this.topic = topic;
this.desiredCounts = messageCounts;
this.consumedRecords = Collections.synchronizedList(new
ArrayList<>(messageCounts));
this.deadline = new AtomicLong(timeout.toMillis() +
System.currentTimeMillis());
this.executor = Executors.newSingleThreadExecutor();
+ConsumerBuilder consumerBuilder =
Review Comment:
```suggestion
final ConsumerBuilder consumerBuilder =
```
nit
##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##
@@ -180,6 +180,10 @@ public void createTopic(String topic, int
numberOfPartitions) {
}
}
+public void createSchema(String topic, Schema schema) {
Review Comment:
```suggestion
@VisibleForTesting
public void createSchema(String topic, Schema schema) {
```
##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##
@@ -180,6 +180,10 @@ public void createTopic(String topic, int
numberOfPartitions) {
}
}
+public void createSchema(String topic, Schema schema) {
+sneakyAdmin(() -> admin().schemas().createSchema(topic,
schema.getSchemaInfo()));
Review Comment:
Why do we use these `sneaky` utility functions everywhere? It feels a bit
fishy to hide checked exceptions. Either we have to document the reason
properly or we should get rid of it and do proper exception handling. WDYT?
I understand that such a change would be out-of-scope of this PR. We would
need to create a follow-up ticket.
##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##
@@ -183,37 +191,58 @@ public List getExpectedRecords() {
private static class StopSignal implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(StopSignal.class);
-private final String topic;
private final int desiredCounts;
// This is a thread-safe list.
private final List consumedRecords;
private final AtomicLong deadline;
private final ExecutorService executor;
+private final Consumer consumer;
+private final AtomicReference
throwableException;
public StopSignal(
PulsarRuntimeOperator operator, String topic, int
messageCounts, Duration timeout) {
-this.topic = topic;
this.desiredCounts = messageCounts;
this.consumedRecords = Collections.synchronizedList(new
ArrayList<>(messageCounts));
this.deadline = new AtomicLong(timeout.toMillis() +
System.currentTimeMillis());
this.executor = Executors.newSingleThreadExecutor();
+ConsumerBuilder consumerBuilder =
+operator.client()
+.newConsumer(Schema.STRING)
+.topic(topic)
+.subscriptionName(randomAlphanumeric(10))
+.subscriptionMode(Durable)
+.subscriptionType(Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+this.consumer = sneakyClient(consumerBuilder::subscribe);
+this.throwableException = new AtomicReference<>();
Review Comment:
What about working with `CompletableFutures` here? It feels more natural to
use rather than utilizing an `AtomicReference` here.
##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##
@@ -183,37 +191,58 @@ public List getExpectedRecords() {
private static class StopSignal implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(StopSignal.class);
-pr