This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bc9aa73  [BEAM-7041] Let the user control if he wants to wrap the 
provided DataSource as a poolable one or not
     new 702df1b  Merge pull request #8257: [BEAM-7041] Let the user control if 
he wants to wrap the provided DataSource as a poolable one or not
bc9aa73 is described below

commit bc9aa730009909d9c632fce669bff5ce25d9d81a
Author: Jean-Baptiste Onofré <jbono...@apache.org>
AuthorDate: Tue Apr 9 17:15:21 2019 +0200

    [BEAM-7041] Let the user control if he wants to wrap the provided 
DataSource as a poolable one or not
---
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 45 ++++++++++++++--------
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java    | 14 +++++++
 2 files changed, 44 insertions(+), 15 deletions(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index e6f2699..8c824a8 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -242,6 +242,8 @@ public class JdbcIO {
     @Nullable
     abstract DataSource getDataSource();
 
+    abstract boolean isPoolingDataSource();
+
     abstract Builder builder();
 
     @AutoValue.Builder
@@ -258,14 +260,22 @@ public class JdbcIO {
 
       abstract Builder setDataSource(DataSource dataSource);
 
+      abstract Builder setPoolingDataSource(boolean poolingDataSource);
+
       abstract DataSourceConfiguration build();
     }
 
     public static DataSourceConfiguration create(DataSource dataSource) {
+      return create(dataSource, true);
+    }
+
+    public static DataSourceConfiguration create(
+        DataSource dataSource, boolean isPoolingDataSource) {
       checkArgument(dataSource != null, "dataSource can not be null");
       checkArgument(dataSource instanceof Serializable, "dataSource must be 
Serializable");
       return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
           .setDataSource(dataSource)
+          .setPoolingDataSource(isPoolingDataSource)
           .build();
     }
 
@@ -284,6 +294,7 @@ public class JdbcIO {
       return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
           .setDriverClassName(driverClassName)
           .setUrl(url)
+          .setPoolingDataSource(true)
           .build();
     }
 
@@ -356,21 +367,25 @@ public class JdbcIO {
         current = basicDataSource;
       }
 
-      // wrapping the datasource as a pooling datasource
-      DataSourceConnectionFactory connectionFactory = new 
DataSourceConnectionFactory(current);
-      PoolableConnectionFactory poolableConnectionFactory =
-          new PoolableConnectionFactory(connectionFactory, null);
-      GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
-      poolConfig.setMaxTotal(1);
-      poolConfig.setMinIdle(0);
-      poolConfig.setMinEvictableIdleTimeMillis(10000);
-      poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
-      GenericObjectPool connectionPool =
-          new GenericObjectPool(poolableConnectionFactory, poolConfig);
-      poolableConnectionFactory.setPool(connectionPool);
-      poolableConnectionFactory.setDefaultAutoCommit(false);
-      poolableConnectionFactory.setDefaultReadOnly(false);
-      return new PoolingDataSource(connectionPool);
+      if (isPoolingDataSource()) {
+        // wrapping the datasource as a pooling datasource
+        DataSourceConnectionFactory connectionFactory = new 
DataSourceConnectionFactory(current);
+        PoolableConnectionFactory poolableConnectionFactory =
+            new PoolableConnectionFactory(connectionFactory, null);
+        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+        poolConfig.setMaxTotal(1);
+        poolConfig.setMinIdle(0);
+        poolConfig.setMinEvictableIdleTimeMillis(10000);
+        poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
+        GenericObjectPool connectionPool =
+            new GenericObjectPool(poolableConnectionFactory, poolConfig);
+        poolableConnectionFactory.setPool(connectionPool);
+        poolableConnectionFactory.setDefaultAutoCommit(false);
+        poolableConnectionFactory.setDefaultReadOnly(false);
+        return new PoolingDataSource(connectionPool);
+      } else {
+        return current;
+      }
     }
   }
 
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 0e9127a..3e45363 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.dbcp2.PoolingDataSource;
 import org.apache.derby.drda.NetworkServerControl;
 import org.apache.derby.jdbc.ClientDataSource;
 import org.junit.After;
@@ -143,6 +144,19 @@ public class JdbcIOTest implements Serializable {
   }
 
   @Test
+  public void testDataSourceConfigurationDataSourceWithoutPool() throws 
Exception {
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(dataSource, false);
+    assertTrue(config.buildDatasource() instanceof ClientDataSource);
+  }
+
+  @Test
+  public void testDataSourceConfigurationDataSourceWithPool() throws Exception 
{
+    JdbcIO.DataSourceConfiguration config = 
JdbcIO.DataSourceConfiguration.create(dataSource, true);
+    assertTrue(config.buildDatasource() instanceof PoolingDataSource);
+  }
+
+  @Test
   public void testDataSourceConfigurationDriverAndUrl() throws Exception {
     JdbcIO.DataSourceConfiguration config =
         JdbcIO.DataSourceConfiguration.create(

Reply via email to