This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 8b73a96 [BEAM-7230] Make PoolableDataSourceProvider a static singleton And simplifies all the inner method construction that was error-prone. new a37ba1a Merge pull request #8635 from iemejia/BEAM-7230-jdbc-fix-pool-instantiation 8b73a96 is described below commit 8b73a962190907c561ab9c9895fb0911632daac5 Author: Ismaël Mejía <ieme...@gmail.com> AuthorDate: Tue May 21 15:17:47 2019 +0200 [BEAM-7230] Make PoolableDataSourceProvider a static singleton And simplifies all the inner method construction that was error-prone. --- .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 122 ++++++++------------- 1 file changed, 47 insertions(+), 75 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 8bd4f7e..6d6b5e0 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -382,46 +382,6 @@ public class JdbcIO { } } - /** Wraps a {@link DataSourceConfiguration} to provide a {@link PoolingDataSource}. */ - public static class PoolableDataSourceProvider extends BaseDataSourceProvider { - private static SerializableFunction<Void, DataSource> instance = null; - - private PoolableDataSourceProvider( - SerializableFunction<Void, DataSource> dataSourceProviderFn) { - super(dataSourceProviderFn); - } - - public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) { - if (instance == null) { - instance = - MemoizedDataSourceProvider.of( - new PoolableDataSourceProvider( - DataSourceProviderFromDataSourceConfiguration.of(config))); - } - return instance; - } - - @Override - public DataSource apply(Void input) { - DataSource current = super.dataSourceProviderFn.apply(input); - // wrapping the datasource as a pooling datasource - DataSourceConnectionFactory connectionFactory = new DataSourceConnectionFactory(current); - PoolableConnectionFactory poolableConnectionFactory = - new PoolableConnectionFactory(connectionFactory, null); - GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); - poolConfig.setMaxTotal(1); - poolConfig.setMinIdle(0); - poolConfig.setMinEvictableIdleTimeMillis(10000); - poolConfig.setSoftMinEvictableIdleTimeMillis(30000); - GenericObjectPool connectionPool = - new GenericObjectPool(poolableConnectionFactory, poolConfig); - poolableConnectionFactory.setPool(connectionPool); - poolableConnectionFactory.setDefaultAutoCommit(false); - poolableConnectionFactory.setDefaultReadOnly(false); - return new PoolingDataSource(connectionPool); - } - } - /** * An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement} * used to setParameters into the database. @@ -1112,39 +1072,50 @@ public class JdbcIO { } } - private static class DataSourceProviderFromDataSourceConfiguration + /** Wraps a {@link DataSourceConfiguration} to provide a {@link PoolingDataSource}. */ + public static class PoolableDataSourceProvider implements SerializableFunction<Void, DataSource>, HasDisplayData { - private final DataSourceConfiguration config; - private static DataSourceProviderFromDataSourceConfiguration instance; + private static PoolableDataSourceProvider instance; + private static transient DataSource source; + private static SerializableFunction<Void, DataSource> dataSourceProviderFn; - private DataSourceProviderFromDataSourceConfiguration(DataSourceConfiguration config) { - this.config = config; + private PoolableDataSourceProvider(DataSourceConfiguration config) { + dataSourceProviderFn = DataSourceProviderFromDataSourceConfiguration.of(config); } - public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) { + public static synchronized SerializableFunction<Void, DataSource> of( + DataSourceConfiguration config) { if (instance == null) { - instance = new DataSourceProviderFromDataSourceConfiguration(config); + instance = new PoolableDataSourceProvider(config); } return instance; } @Override public DataSource apply(Void input) { - return config.buildDatasource(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - config.populateDisplayData(builder); - } - } - - private abstract static class BaseDataSourceProvider - implements SerializableFunction<Void, DataSource>, HasDisplayData { - private final SerializableFunction<Void, DataSource> dataSourceProviderFn; - - BaseDataSourceProvider(SerializableFunction<Void, DataSource> dataSourceProviderFn) { - this.dataSourceProviderFn = dataSourceProviderFn; + return buildDataSource(input); + } + + static synchronized DataSource buildDataSource(Void input) { + if (source == null) { + DataSource basicSource = dataSourceProviderFn.apply(input); + DataSourceConnectionFactory connectionFactory = + new DataSourceConnectionFactory(basicSource); + PoolableConnectionFactory poolableConnectionFactory = + new PoolableConnectionFactory(connectionFactory, null); + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); + poolConfig.setMaxTotal(1); + poolConfig.setMinIdle(0); + poolConfig.setMinEvictableIdleTimeMillis(10000); + poolConfig.setSoftMinEvictableIdleTimeMillis(30000); + GenericObjectPool connectionPool = + new GenericObjectPool(poolableConnectionFactory, poolConfig); + poolableConnectionFactory.setPool(connectionPool); + poolableConnectionFactory.setDefaultAutoCommit(false); + poolableConnectionFactory.setDefaultReadOnly(false); + source = new PoolingDataSource(connectionPool); + } + return source; } @Override @@ -1155,29 +1126,30 @@ public class JdbcIO { } } - private static class MemoizedDataSourceProvider extends BaseDataSourceProvider { - private static MemoizedDataSourceProvider instance = null; - @Nullable private static DataSource datasource = null; + private static class DataSourceProviderFromDataSourceConfiguration + implements SerializableFunction<Void, DataSource>, HasDisplayData { + private final DataSourceConfiguration config; + private static DataSourceProviderFromDataSourceConfiguration instance; - private MemoizedDataSourceProvider( - SerializableFunction<Void, DataSource> dataSourceProviderFn) { - super(dataSourceProviderFn); + private DataSourceProviderFromDataSourceConfiguration(DataSourceConfiguration config) { + this.config = config; } - public static MemoizedDataSourceProvider of( - SerializableFunction<Void, DataSource> dataSourceProviderFn) { + public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) { if (instance == null) { - instance = new MemoizedDataSourceProvider(dataSourceProviderFn); + instance = new DataSourceProviderFromDataSourceConfiguration(config); } return instance; } @Override public DataSource apply(Void input) { - if (datasource == null) { - datasource = super.dataSourceProviderFn.apply(null); - } - return datasource; + return config.buildDatasource(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + config.populateDisplayData(builder); } } }