Repository: beam Updated Branches: refs/heads/master 946778c5b -> 588a4d00e
[BEAM-1922] Close datasource in JdbcIO when possible Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc846268 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc846268 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc846268 Branch: refs/heads/master Commit: dc84626877a0e7183ed660df167a1d02d1589f90 Parents: 946778c Author: mingmxu <[email protected]> Authored: Mon Apr 10 11:19:02 2017 -0700 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Mon Apr 17 18:07:16 2017 +0200 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 40 +++++++++++--------- .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 10 ++--- 2 files changed, 27 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dc846268/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 05a30a4..b26a47d 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; - import java.io.Serializable; import java.sql.Connection; import java.sql.PreparedStatement; @@ -55,15 +54,13 @@ import org.apache.commons.dbcp2.BasicDataSource; * <p>JdbcIO source returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is the * type returned by the provided {@link RowMapper}. * - * <p>To configure the JDBC source, you have to provide a {@link DataSourceConfiguration} using - * {@link DataSourceConfiguration#create(DataSource)} or - * {@link DataSourceConfiguration#create(String, String)} with either a - * {@link DataSource} (which must be {@link Serializable}) or the parameters needed to create it - * (driver class name and url). Optionally, {@link DataSourceConfiguration#withUsername(String)} and - * {@link DataSourceConfiguration#withPassword(String)} allows you to define DataSource username - * and password. - * For example: + * <p>To configure the JDBC source, you have to provide a {@link DataSourceConfiguration} using<br> + * 1. {@link DataSourceConfiguration#create(DataSource)}(which must be {@link Serializable});<br> + * 2. or {@link DataSourceConfiguration#create(String, String)}(driver class name and url). + * Optionally, {@link DataSourceConfiguration#withUsername(String)} and + * {@link DataSourceConfiguration#withPassword(String)} allows you to define username and password. * + * <p>For example: * <pre>{@code * pipeline.apply(JdbcIO.<KV<Integer, String>>read() * .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( @@ -245,11 +242,9 @@ public class JdbcIO { } } - Connection getConnection() throws Exception { + DataSource buildDatasource() throws Exception{ if (getDataSource() != null) { - return (getUsername() != null) - ? getDataSource().getConnection(getUsername(), getPassword()) - : getDataSource().getConnection(); + return getDataSource(); } else { BasicDataSource basicDataSource = new BasicDataSource(); basicDataSource.setDriverClassName(getDriverClassName()); @@ -259,9 +254,10 @@ public class JdbcIO { if (getConnectionProperties() != null) { basicDataSource.setConnectionProperties(getConnectionProperties()); } - return basicDataSource.getConnection(); + return basicDataSource; } } + } /** @@ -368,6 +364,7 @@ public class JdbcIO { /** A {@link DoFn} executing the SQL query to read from the database. */ static class ReadFn<T> extends DoFn<String, T> { private JdbcIO.Read<T> spec; + private DataSource dataSource; private Connection connection; private ReadFn(Read<T> spec) { @@ -376,7 +373,8 @@ public class JdbcIO { @Setup public void setup() throws Exception { - connection = spec.getDataSourceConfiguration().getConnection(); + dataSource = spec.getDataSourceConfiguration().buildDatasource(); + connection = dataSource.getConnection(); } @ProcessElement @@ -396,8 +394,9 @@ public class JdbcIO { @Teardown public void teardown() throws Exception { - if (connection != null) { - connection.close(); + connection.close(); + if (dataSource instanceof AutoCloseable) { + ((AutoCloseable) dataSource).close(); } } } @@ -462,6 +461,7 @@ public class JdbcIO { private final Write<T> spec; + private DataSource dataSource; private Connection connection; private PreparedStatement preparedStatement; private int batchCount; @@ -472,7 +472,8 @@ public class JdbcIO { @Setup public void setup() throws Exception { - connection = spec.getDataSourceConfiguration().getConnection(); + dataSource = spec.getDataSourceConfiguration().buildDatasource(); + connection = dataSource.getConnection(); connection.setAutoCommit(false); preparedStatement = connection.prepareStatement(spec.getStatement()); } @@ -516,6 +517,9 @@ public class JdbcIO { if (connection != null) { connection.close(); } + if (dataSource instanceof AutoCloseable) { + ((AutoCloseable) dataSource).close(); + } } } } http://git-wip-us.apache.org/repos/asf/beam/blob/dc846268/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 4e82338..984ce1a 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -126,7 +126,7 @@ public class JdbcIOTest implements Serializable { @Test public void testDataSourceConfigurationDataSource() throws Exception { JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(dataSource); - try (Connection conn = config.getConnection()) { + try (Connection conn = config.buildDatasource().getConnection()) { assertTrue(conn.isValid(0)); } } @@ -136,7 +136,7 @@ public class JdbcIOTest implements Serializable { JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create( "org.apache.derby.jdbc.ClientDriver", "jdbc:derby://localhost:" + port + "/target/beam"); - try (Connection conn = config.getConnection()) { + try (Connection conn = config.buildDatasource().getConnection()) { assertTrue(conn.isValid(0)); } } @@ -148,7 +148,7 @@ public class JdbcIOTest implements Serializable { "jdbc:derby://localhost:" + port + "/target/beam") .withUsername("sa") .withPassword("sa"); - try (Connection conn = config.getConnection()) { + try (Connection conn = config.buildDatasource().getConnection()) { assertTrue(conn.isValid(0)); } } @@ -160,7 +160,7 @@ public class JdbcIOTest implements Serializable { "jdbc:derby://localhost:" + port + "/target/beam") .withUsername("sa") .withPassword(null); - try (Connection conn = config.getConnection()) { + try (Connection conn = config.buildDatasource().getConnection()) { assertTrue(conn.isValid(0)); } } @@ -172,7 +172,7 @@ public class JdbcIOTest implements Serializable { "jdbc:derby://localhost:" + port + "/target/beam") .withUsername(null) .withPassword(null); - try (Connection conn = config.getConnection()) { + try (Connection conn = config.buildDatasource().getConnection()) { assertTrue(conn.isValid(0)); } }
