syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528


##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java:
##########
@@ -231,7 +232,11 @@ public void 
handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
         }
 
         // Create pulsar consumer.
-        this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+        try {
+            this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+        } catch (PulsarClientException e) {

Review Comment:
   This should be the difference between Kafka client and Pulsar client. Kafka 
use polling API, and the client is created before handling the split. Pulsar 
share the consumer in same client instance, every consumer will support only 
one split. So we have to create the consumer here. And the exception have to 
wrap into a runtime exception.
   
   I think we should expose exceptions in SplitReader.handleSplitsChanges` on 
the Flink side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to