This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a619a580802 fix rrio teardown executor cleanup path (#38417)
a619a580802 is described below
commit a619a5808022c69417d0178b1ed777d523160ad8
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Fri May 8 19:51:18 2026 +0200
fix rrio teardown executor cleanup path (#38417)
---
.../org/apache/beam/io/requestresponse/Call.java | 55 +++++++++++-----------
1 file changed, 28 insertions(+), 27 deletions(-)
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
index b515957459b..b318cac1737 100644
---
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
@@ -270,34 +270,35 @@ class Call<RequestT, ResponseT> extends
PTransform<PCollection<RequestT>, Result
Sleeper sleeper = configuration.getSleeperSupplier().get();
backoffIfNeeded(backOff, sleeper);
-
- if (!configuration.getShouldRepeat()) {
- incIfPresent(teardownCounter);
- setupTeardown.teardown();
- return;
- }
-
- Repeater<Void, Void> repeater =
- Repeater.<Void, Void>builder()
- .setBackOff(backOff)
- .setSleeper(sleeper)
- .setThrowableFunction(
- ignored -> {
- incIfPresent(teardownCounter);
- setupTeardown.teardown();
- return null;
- })
- .build()
- .withBackoffCounter(backoffCounter)
- .withSleeperCounter(sleeperCounter);
-
- repeater.apply(null);
-
- checkStateNotNull(executor).shutdown();
try {
- boolean ignored = executor.awaitTermination(3L, TimeUnit.SECONDS);
- } catch (InterruptedException ignored) {
- // Ignore the interrupt during teardown.
+ if (!configuration.getShouldRepeat()) {
+ incIfPresent(teardownCounter);
+ setupTeardown.teardown();
+ return;
+ }
+
+ Repeater<Void, Void> repeater =
+ Repeater.<Void, Void>builder()
+ .setBackOff(backOff)
+ .setSleeper(sleeper)
+ .setThrowableFunction(
+ ignored -> {
+ incIfPresent(teardownCounter);
+ setupTeardown.teardown();
+ return null;
+ })
+ .build()
+ .withBackoffCounter(backoffCounter)
+ .withSleeperCounter(sleeperCounter);
+
+ repeater.apply(null);
+ } finally {
+ checkStateNotNull(executor).shutdown();
+ try {
+ boolean ignored = executor.awaitTermination(3L, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ // Ignore the interrupt during teardown.
+ }
}
}