aws-nageshvh commented on code in PR #208:
URL: 
https://github.com/apache/flink-connector-aws/pull/208#discussion_r2112973263


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -334,4 +368,87 @@ public void onComplete() {
             activateSubscription();
         }
     }
+
+    /**
+     * Submits an event processing task to the executor service.
+     * This method encapsulates the task submission logic and error handling.
+     *
+     * @param event The subscription event to process
+     */
+    private void submitEventProcessingTask(SubscribeToShardEvent event) {
+        try {
+            subscriptionEventProcessingExecutor.execute(() -> {
+                synchronized (subscriptionEventProcessingLock) {
+                    try {
+                        processSubscriptionEvent(event);
+                    } catch (Exception e) {
+                        // For critical path operations, propagate exceptions 
to cause a Flink job restart
+                        LOG.error("Error processing subscription event", e);
+                        // Propagate the exception to the subscription 
exception handler
+                        terminateSubscription(new 
KinesisStreamsSourceException(
+                            "Error processing subscription event", e));
+                    }
+                }
+            });
+        } catch (Exception e) {

Review Comment:
   Have updated code and pushed some more improvements. please check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to