[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java: ## @@ -231,7 +232,11 @@ public void handleSplitsChanges(SplitsChange splitsChanges } // Create pulsar consumer. -this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +try { +this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +} catch (PulsarClientException e) { Review Comment: This should be the difference between Kafka client and Pulsar client. Kafka use polling API, and the client is created before handling the split. Pulsar share the consumers in a same client instance, every consumer will support only one split. So we have to create the consumer here. And the exception have to be wrapped into a runtime exception. I think we should expose exceptions in SplitReader.handleSplitsChanges` on the Flink side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103523033 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java: ## @@ -220,11 +223,14 @@ private void createSubscription(List newPartitions) { CursorPosition position = startCursor.position(partition.getTopic(), partition.getPartitionId()); -if (sourceConfiguration.isResetSubscriptionCursor()) { -sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, subscriptionName)); -} else { -sneakyAdmin( -() -> position.createInitialPosition(pulsarAdmin, topic, subscriptionName)); +try { +if (sourceConfiguration.isResetSubscriptionCursor()) { +position.seekPosition(pulsarAdmin, topic, subscriptionName); +} else { +position.createInitialPosition(pulsarAdmin, topic, subscriptionName); +} +} catch (PulsarAdminException e) { +throw new FlinkRuntimeException(e); Review Comment: My bad, we can still throw this exception with a little more works on code refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103523033 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java: ## @@ -220,11 +223,14 @@ private void createSubscription(List newPartitions) { CursorPosition position = startCursor.position(partition.getTopic(), partition.getPartitionId()); -if (sourceConfiguration.isResetSubscriptionCursor()) { -sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, subscriptionName)); -} else { -sneakyAdmin( -() -> position.createInitialPosition(pulsarAdmin, topic, subscriptionName)); +try { +if (sourceConfiguration.isResetSubscriptionCursor()) { +position.seekPosition(pulsarAdmin, topic, subscriptionName); +} else { +position.createInitialPosition(pulsarAdmin, topic, subscriptionName); +} +} catch (PulsarAdminException e) { +throw new FlinkRuntimeException(e); Review Comment: My bad, we can still throw this exception will a little more works on code refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103522744 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java: ## @@ -220,11 +223,14 @@ private void createSubscription(List newPartitions) { CursorPosition position = startCursor.position(partition.getTopic(), partition.getPartitionId()); -if (sourceConfiguration.isResetSubscriptionCursor()) { -sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, subscriptionName)); -} else { -sneakyAdmin( -() -> position.createInitialPosition(pulsarAdmin, topic, subscriptionName)); +try { +if (sourceConfiguration.isResetSubscriptionCursor()) { +position.seekPosition(pulsarAdmin, topic, subscriptionName); +} else { +position.createInitialPosition(pulsarAdmin, topic, subscriptionName); +} +} catch (PulsarAdminException e) { +throw new FlinkRuntimeException(e); Review Comment: This method is call in `context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);`. It's hard to throw the exception here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103522426 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java: ## @@ -619,7 +619,7 @@ private void ensureSubscriberIsNull(String attemptingSubscribeMode) { private void ensureSchemaTypeIsValid(Schema schema) { SchemaInfo info = schema.getSchemaInfo(); -if (info.getType() == SchemaType.AUTO_CONSUME || info.getType() == SchemaType.AUTO) { +if (info.getType() == SchemaType.AUTO_CONSUME) { Review Comment: I just notice this is a deprecated API calling in Pulsar and could never happen. Since this is a code refactor PR, I add it here and didn't submit a new PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java: ## @@ -231,7 +232,11 @@ public void handleSplitsChanges(SplitsChange splitsChanges } // Create pulsar consumer. -this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +try { +this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +} catch (PulsarClientException e) { Review Comment: This should be the difference between Kafka client and Pulsar client. Kafka use polling API, and the client is created before handling the split. Pulsar share the consumer in same client instance, every consumer will support only one split. So we have to create the consumer here. And the exception have to be wrapped into a runtime exception. I think we should expose exceptions in SplitReader.handleSplitsChanges` on the Flink side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java: ## @@ -231,7 +232,11 @@ public void handleSplitsChanges(SplitsChange splitsChanges } // Create pulsar consumer. -this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +try { +this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +} catch (PulsarClientException e) { Review Comment: This should be the difference between Kafka client and Pulsar client. Kafka use polling API, and the client is created before handling the split. Pulsar share the consumer in same client instance, every consumer will support only one split. So we have to create the consumer here. And the exception have to wrap into a runtime exception. I think we should expose exceptions in SplitReader.handleSplitsChanges` on the Flink side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103481149 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java: ## @@ -46,17 +45,18 @@ private PulsarTransactionUtils() { /** Create transaction with given timeout millis. */ public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) { try { -CompletableFuture future = -sneakyClient(pulsarClient::newTransaction) -.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) -.build(); - -return future.get(); +return pulsarClient +.newTransaction() +.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) +.build() +.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } catch (ExecutionException e) { throw new FlinkRuntimeException(unwrap(e)); +} catch (PulsarClientException e) { Review Comment: You are right. I changed it to Pulsar's built-in handle method. ```java public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) throws PulsarClientException { try { return pulsarClient .newTransaction() .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) .build() .get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new PulsarClientException(e); } catch (Exception e) { throw PulsarClientException.unwrap(e); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103481149 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java: ## @@ -46,17 +45,18 @@ private PulsarTransactionUtils() { /** Create transaction with given timeout millis. */ public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) { try { -CompletableFuture future = -sneakyClient(pulsarClient::newTransaction) -.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) -.build(); - -return future.get(); +return pulsarClient +.newTransaction() +.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) +.build() +.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } catch (ExecutionException e) { throw new FlinkRuntimeException(unwrap(e)); +} catch (PulsarClientException e) { Review Comment: You are right. I changed it to Pulsar's built-in handle method. ``` public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) throws PulsarClientException { try { return pulsarClient .newTransaction() .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) .build() .get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new PulsarClientException(e); } catch (Exception e) { throw PulsarClientException.unwrap(e); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103451369 ## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicsConsumingContext.java: ## @@ -56,7 +57,11 @@ protected String subscriptionName() { @Override protected String generatePartitionName() { String topic = topicPrefix + index; -operator.createTopic(topic, 1); +try { +operator.createTopic(topic, 1); +} catch (Exception e) { Review Comment: I'll keep it unfixed in this PR until it get fixed in [FLINK-31014](https://issues.apache.org/jira/browse/FLINK-31014). ## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java: ## @@ -55,7 +56,11 @@ public Sink createSink(TestingSinkSettings sinkSettings) { // Create the topic if it needs. if (creatTopic()) { for (String topic : topics) { -operator.createTopic(topic, 4); +try { +operator.createTopic(topic, 4); +} catch (Exception e) { Review Comment: I'll keep it unfixed in this PR until it get fixed in [FLINK-31014](https://issues.apache.org/jira/browse/FLINK-31014). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103449916 ## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java: ## @@ -80,7 +72,7 @@ import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; /** A pulsar cluster operator used for operating pulsar instance. */ -public class PulsarRuntimeOperator implements Closeable { +public class PulsarRuntimeOperator { Review Comment: Yep, this is a mistake from my side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org