This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new bc9aa73 [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not new 702df1b Merge pull request #8257: [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not bc9aa73 is described below commit bc9aa730009909d9c632fce669bff5ce25d9d81a Author: Jean-Baptiste Onofré <jbono...@apache.org> AuthorDate: Tue Apr 9 17:15:21 2019 +0200 [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not --- .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 45 ++++++++++++++-------- .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 14 +++++++ 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index e6f2699..8c824a8 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -242,6 +242,8 @@ public class JdbcIO { @Nullable abstract DataSource getDataSource(); + abstract boolean isPoolingDataSource(); + abstract Builder builder(); @AutoValue.Builder @@ -258,14 +260,22 @@ public class JdbcIO { abstract Builder setDataSource(DataSource dataSource); + abstract Builder setPoolingDataSource(boolean poolingDataSource); + abstract DataSourceConfiguration build(); } public static DataSourceConfiguration create(DataSource dataSource) { + return create(dataSource, true); + } + + public static DataSourceConfiguration create( + DataSource dataSource, boolean isPoolingDataSource) { checkArgument(dataSource != null, "dataSource can not be null"); checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable"); return new AutoValue_JdbcIO_DataSourceConfiguration.Builder() .setDataSource(dataSource) + .setPoolingDataSource(isPoolingDataSource) .build(); } @@ -284,6 +294,7 @@ public class JdbcIO { return new AutoValue_JdbcIO_DataSourceConfiguration.Builder() .setDriverClassName(driverClassName) .setUrl(url) + .setPoolingDataSource(true) .build(); } @@ -356,21 +367,25 @@ public class JdbcIO { current = basicDataSource; } - // wrapping the datasource as a pooling datasource - DataSourceConnectionFactory connectionFactory = new DataSourceConnectionFactory(current); - PoolableConnectionFactory poolableConnectionFactory = - new PoolableConnectionFactory(connectionFactory, null); - GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); - poolConfig.setMaxTotal(1); - poolConfig.setMinIdle(0); - poolConfig.setMinEvictableIdleTimeMillis(10000); - poolConfig.setSoftMinEvictableIdleTimeMillis(30000); - GenericObjectPool connectionPool = - new GenericObjectPool(poolableConnectionFactory, poolConfig); - poolableConnectionFactory.setPool(connectionPool); - poolableConnectionFactory.setDefaultAutoCommit(false); - poolableConnectionFactory.setDefaultReadOnly(false); - return new PoolingDataSource(connectionPool); + if (isPoolingDataSource()) { + // wrapping the datasource as a pooling datasource + DataSourceConnectionFactory connectionFactory = new DataSourceConnectionFactory(current); + PoolableConnectionFactory poolableConnectionFactory = + new PoolableConnectionFactory(connectionFactory, null); + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); + poolConfig.setMaxTotal(1); + poolConfig.setMinIdle(0); + poolConfig.setMinEvictableIdleTimeMillis(10000); + poolConfig.setSoftMinEvictableIdleTimeMillis(30000); + GenericObjectPool connectionPool = + new GenericObjectPool(poolableConnectionFactory, poolConfig); + poolableConnectionFactory.setPool(connectionPool); + poolableConnectionFactory.setDefaultAutoCommit(false); + poolableConnectionFactory.setDefaultReadOnly(false); + return new PoolingDataSource(connectionPool); + } else { + return current; + } } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 0e9127a..3e45363 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.dbcp2.PoolingDataSource; import org.apache.derby.drda.NetworkServerControl; import org.apache.derby.jdbc.ClientDataSource; import org.junit.After; @@ -143,6 +144,19 @@ public class JdbcIOTest implements Serializable { } @Test + public void testDataSourceConfigurationDataSourceWithoutPool() throws Exception { + JdbcIO.DataSourceConfiguration config = + JdbcIO.DataSourceConfiguration.create(dataSource, false); + assertTrue(config.buildDatasource() instanceof ClientDataSource); + } + + @Test + public void testDataSourceConfigurationDataSourceWithPool() throws Exception { + JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(dataSource, true); + assertTrue(config.buildDatasource() instanceof PoolingDataSource); + } + + @Test public void testDataSourceConfigurationDriverAndUrl() throws Exception { JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(