[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java:
##
@@ -231,7 +232,11 @@ public void 
handleSplitsChanges(SplitsChange splitsChanges
 }
 
 // Create pulsar consumer.
-this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+try {
+this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+} catch (PulsarClientException e) {

Review Comment:
   This should be the difference between Kafka client and Pulsar client. Kafka 
use polling API, and the client is created before handling the split. Pulsar 
share the consumers in a same client instance, every consumer will support only 
one split. So we have to create the consumer here. And the exception have to be 
wrapped into a runtime exception.
   
   I think we should expose exceptions in SplitReader.handleSplitsChanges` on 
the Flink side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103523033


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##
@@ -220,11 +223,14 @@ private void createSubscription(List 
newPartitions) {
 CursorPosition position =
 startCursor.position(partition.getTopic(), 
partition.getPartitionId());
 
-if (sourceConfiguration.isResetSubscriptionCursor()) {
-sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, 
subscriptionName));
-} else {
-sneakyAdmin(
-() -> position.createInitialPosition(pulsarAdmin, 
topic, subscriptionName));
+try {
+if (sourceConfiguration.isResetSubscriptionCursor()) {
+position.seekPosition(pulsarAdmin, topic, 
subscriptionName);
+} else {
+position.createInitialPosition(pulsarAdmin, topic, 
subscriptionName);
+}
+} catch (PulsarAdminException e) {
+throw new FlinkRuntimeException(e);

Review Comment:
   My bad, we can still throw this exception with a little more works on code 
refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103523033


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##
@@ -220,11 +223,14 @@ private void createSubscription(List 
newPartitions) {
 CursorPosition position =
 startCursor.position(partition.getTopic(), 
partition.getPartitionId());
 
-if (sourceConfiguration.isResetSubscriptionCursor()) {
-sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, 
subscriptionName));
-} else {
-sneakyAdmin(
-() -> position.createInitialPosition(pulsarAdmin, 
topic, subscriptionName));
+try {
+if (sourceConfiguration.isResetSubscriptionCursor()) {
+position.seekPosition(pulsarAdmin, topic, 
subscriptionName);
+} else {
+position.createInitialPosition(pulsarAdmin, topic, 
subscriptionName);
+}
+} catch (PulsarAdminException e) {
+throw new FlinkRuntimeException(e);

Review Comment:
   My bad, we can still throw this exception will a little more works on code 
refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103522744


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##
@@ -220,11 +223,14 @@ private void createSubscription(List 
newPartitions) {
 CursorPosition position =
 startCursor.position(partition.getTopic(), 
partition.getPartitionId());
 
-if (sourceConfiguration.isResetSubscriptionCursor()) {
-sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, 
subscriptionName));
-} else {
-sneakyAdmin(
-() -> position.createInitialPosition(pulsarAdmin, 
topic, subscriptionName));
+try {
+if (sourceConfiguration.isResetSubscriptionCursor()) {
+position.seekPosition(pulsarAdmin, topic, 
subscriptionName);
+} else {
+position.createInitialPosition(pulsarAdmin, topic, 
subscriptionName);
+}
+} catch (PulsarAdminException e) {
+throw new FlinkRuntimeException(e);

Review Comment:
   This method is call in 
`context.callAsync(this::getSubscribedTopicPartitions, 
this::checkPartitionChanges);`. It's hard to throw the exception here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103522426


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java:
##
@@ -619,7 +619,7 @@ private void ensureSubscriberIsNull(String 
attemptingSubscribeMode) {
 
 private void ensureSchemaTypeIsValid(Schema schema) {
 SchemaInfo info = schema.getSchemaInfo();
-if (info.getType() == SchemaType.AUTO_CONSUME || info.getType() == 
SchemaType.AUTO) {
+if (info.getType() == SchemaType.AUTO_CONSUME) {

Review Comment:
   I just notice this is a deprecated API calling in Pulsar and could never 
happen. Since this is a code refactor PR, I add it here and didn't submit a new 
PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java:
##
@@ -231,7 +232,11 @@ public void 
handleSplitsChanges(SplitsChange splitsChanges
 }
 
 // Create pulsar consumer.
-this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+try {
+this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+} catch (PulsarClientException e) {

Review Comment:
   This should be the difference between Kafka client and Pulsar client. Kafka 
use polling API, and the client is created before handling the split. Pulsar 
share the consumer in same client instance, every consumer will support only 
one split. So we have to create the consumer here. And the exception have to be 
wrapped into a runtime exception.
   
   I think we should expose exceptions in SplitReader.handleSplitsChanges` on 
the Flink side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java:
##
@@ -231,7 +232,11 @@ public void 
handleSplitsChanges(SplitsChange splitsChanges
 }
 
 // Create pulsar consumer.
-this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+try {
+this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+} catch (PulsarClientException e) {

Review Comment:
   This should be the difference between Kafka client and Pulsar client. Kafka 
use polling API, and the client is created before handling the split. Pulsar 
share the consumer in same client instance, every consumer will support only 
one split. So we have to create the consumer here. And the exception have to 
wrap into a runtime exception.
   
   I think we should expose exceptions in SplitReader.handleSplitsChanges` on 
the Flink side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103481149


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java:
##
@@ -46,17 +45,18 @@ private PulsarTransactionUtils() {
 /** Create transaction with given timeout millis. */
 public static Transaction createTransaction(PulsarClient pulsarClient, 
long timeoutMs) {
 try {
-CompletableFuture future =
-sneakyClient(pulsarClient::newTransaction)
-.withTransactionTimeout(timeoutMs, 
TimeUnit.MILLISECONDS)
-.build();
-
-return future.get();
+return pulsarClient
+.newTransaction()
+.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+.build()
+.get();
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new IllegalStateException(e);
 } catch (ExecutionException e) {
 throw new FlinkRuntimeException(unwrap(e));
+} catch (PulsarClientException e) {

Review Comment:
   You are right. I changed it to Pulsar's built-in handle method.
   
   ```java
   public static Transaction createTransaction(PulsarClient pulsarClient, long 
timeoutMs)
   throws PulsarClientException {
   try {
   return pulsarClient
   .newTransaction()
   .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
   .build()
   .get();
   } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
   throw new PulsarClientException(e);
   } catch (Exception e) {
   throw PulsarClientException.unwrap(e);
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103481149


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java:
##
@@ -46,17 +45,18 @@ private PulsarTransactionUtils() {
 /** Create transaction with given timeout millis. */
 public static Transaction createTransaction(PulsarClient pulsarClient, 
long timeoutMs) {
 try {
-CompletableFuture future =
-sneakyClient(pulsarClient::newTransaction)
-.withTransactionTimeout(timeoutMs, 
TimeUnit.MILLISECONDS)
-.build();
-
-return future.get();
+return pulsarClient
+.newTransaction()
+.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+.build()
+.get();
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new IllegalStateException(e);
 } catch (ExecutionException e) {
 throw new FlinkRuntimeException(unwrap(e));
+} catch (PulsarClientException e) {

Review Comment:
   You are right. I changed it to Pulsar's built-in handle method.
   
   ```
   public static Transaction createTransaction(PulsarClient pulsarClient, 
long timeoutMs)
   throws PulsarClientException {
   try {
   return pulsarClient
   .newTransaction()
   .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
   .build()
   .get();
   } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
   throw new PulsarClientException(e);
   } catch (Exception e) {
   throw PulsarClientException.unwrap(e);
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103451369


##
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicsConsumingContext.java:
##
@@ -56,7 +57,11 @@ protected String subscriptionName() {
 @Override
 protected String generatePartitionName() {
 String topic = topicPrefix + index;
-operator.createTopic(topic, 1);
+try {
+operator.createTopic(topic, 1);
+} catch (Exception e) {

Review Comment:
   I'll keep it unfixed in this PR until it get fixed in 
[FLINK-31014](https://issues.apache.org/jira/browse/FLINK-31014).



##
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java:
##
@@ -55,7 +56,11 @@ public Sink createSink(TestingSinkSettings 
sinkSettings) {
 // Create the topic if it needs.
 if (creatTopic()) {
 for (String topic : topics) {
-operator.createTopic(topic, 4);
+try {
+operator.createTopic(topic, 4);
+} catch (Exception e) {

Review Comment:
   I'll keep it unfixed in this PR until it get fixed in 
[FLINK-31014](https://issues.apache.org/jira/browse/FLINK-31014).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103449916


##
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##
@@ -80,7 +72,7 @@
 import static 
org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
 
 /** A pulsar cluster operator used for operating pulsar instance. */
-public class PulsarRuntimeOperator implements Closeable {
+public class PulsarRuntimeOperator {

Review Comment:
   Yep, this is a mistake from my side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org