This is an automated email from the ASF dual-hosted git repository. johncasey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 67e2008c0ee Add retry to test connections (#23757) 67e2008c0ee is described below commit 67e2008c0ee6878e6a95a361e41d02734fba4ad1 Author: johnjcasey <95318300+johnjca...@users.noreply.github.com> AuthorDate: Tue Nov 22 13:38:18 2022 -0500 Add retry to test connections (#23757) * Move connection setup logic for JDBCIO WriteFn to @startBundle to limit parallel calls to datasource.getConnection() * move connection setup logic back to processElement. Put a retry into the DatabaseTestHelper * run spotless * use fluent backoff instead of manual implementation * refactor to manual resource management * run spotless --- .../beam/sdk/io/common/DatabaseTestHelper.java | 42 ++++++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java index 3cfe08ba4d1..1204fa7ada6 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java @@ -34,8 +34,13 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import javax.sql.DataSource; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.joda.time.Duration; import org.postgresql.ds.PGSimpleDataSource; import org.testcontainers.containers.JdbcDatabaseContainer; @@ -86,9 +91,40 @@ public class DatabaseTestHelper { fieldsAndTypes.stream() .map(kv -> kv.getKey() + " " + kv.getValue()) .collect(Collectors.joining(", ")); - try (Connection connection = dataSource.getConnection()) { - try (Statement statement = connection.createStatement()) { - statement.execute(String.format("create table %s (%s)", tableName, fieldsList)); + SQLException exception = null; + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.standardSeconds(1)) + .withMaxCumulativeBackoff(Duration.standardMinutes(5)) + .withMaxRetries(4) + .backoff(); + while (true) { + // This is not implemented as try-with-resources because it appears that try-with-resources is + // not correctly catching the PSQLException thrown by dataSource.getConnection() + Connection connection = null; + try { + connection = dataSource.getConnection(); + try (Statement statement = connection.createStatement()) { + statement.execute(String.format("create table %s (%s)", tableName, fieldsList)); + return; + } + } catch (SQLException e) { + exception = e; + } finally { + if (connection != null) { + connection.close(); + } + } + boolean hasNext; + try { + hasNext = BackOffUtils.next(sleeper, backoff); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (!hasNext) { + // we tried the max number of times + throw exception; } } }