aws-nageshvh commented on code in PR #208:
URL:
https://github.com/apache/flink-connector-aws/pull/208#discussion_r2112973263
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -334,4 +368,87 @@ public void onComplete() {
activateSubscription();
}
}
+
+ /**
+ * Submits an event processing task to the executor service.
+ * This method encapsulates the task submission logic and error handling.
+ *
+ * @param event The subscription event to process
+ */
+ private void submitEventProcessingTask(SubscribeToShardEvent event) {
+ try {
+ subscriptionEventProcessingExecutor.execute(() -> {
+ synchronized (subscriptionEventProcessingLock) {
+ try {
+ processSubscriptionEvent(event);
+ } catch (Exception e) {
+ // For critical path operations, propagate exceptions
to cause a Flink job restart
+ LOG.error("Error processing subscription event", e);
+ // Propagate the exception to the subscription
exception handler
+ terminateSubscription(new
KinesisStreamsSourceException(
+ "Error processing subscription event", e));
+ }
+ }
+ });
+ } catch (Exception e) {
Review Comment:
Have updated code and pushed some more improvements. please check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]