Use DBCP as Jdbc pool instead of Commons-Pool to avoid query pushdown with invalid connection
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aae24e35 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aae24e35 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aae24e35 Branch: refs/heads/master Commit: aae24e351540930f09c681e6532df048f37f195d Parents: 8e6c60f Author: nichunen <chunen...@kyligence.io> Authored: Mon Sep 25 18:37:19 2017 +0800 Committer: Shaofeng Shi <shaofeng...@gmail.com> Committed: Tue Sep 26 13:19:22 2017 +0800 ---------------------------------------------------------------------- .../query/adhoc/JdbcConnectionFactory.java | 95 -------------------- .../kylin/query/adhoc/JdbcConnectionPool.java | 73 --------------- .../adhoc/JdbcPushDownConnectionManager.java | 86 ++++++++++++++++++ .../query/adhoc/PushDownRunnerJdbcImpl.java | 63 ++++--------- 4 files changed, 103 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/aae24e35/query/src/main/java/org/apache/kylin/query/adhoc/JdbcConnectionFactory.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/adhoc/JdbcConnectionFactory.java b/query/src/main/java/org/apache/kylin/query/adhoc/JdbcConnectionFactory.java deleted file mode 100644 index e83306c..0000000 --- a/query/src/main/java/org/apache/kylin/query/adhoc/JdbcConnectionFactory.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.query.adhoc; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -import org.apache.commons.pool.PoolableObjectFactory; - -@SuppressWarnings("unused") -class JdbcConnectionFactory implements PoolableObjectFactory { - - private final String jdbcUrl; - - private final String driverClass; - - private final String username; - - private final String password; - - public JdbcConnectionFactory(String jdbcUrl, String driverClass, String username, String password) { - this.jdbcUrl = jdbcUrl; - this.driverClass = driverClass; - this.username = username; - this.password = password; - - try { - Class.forName(driverClass); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - @Override - public Connection makeObject() throws Exception { - Connection connection = DriverManager.getConnection(jdbcUrl, username, password); - return connection; - } - - @Override - public void activateObject(Object o) throws Exception { - - } - - @Override - public void passivateObject(Object o) throws Exception { - - } - - @Override - public void destroyObject(Object pooledObject) throws Exception { - - if (pooledObject instanceof Connection) { - Connection connection = (Connection) pooledObject; - - if (connection != null) - connection.close(); - } - - } - - @Override - public boolean validateObject(Object pooledObject) { - if (pooledObject instanceof Connection) { - Connection connection = (Connection) pooledObject; - - if (connection != null) { - try { - return ((!connection.isClosed()) && (connection.isValid(1))); - } catch (SQLException e) { - throw new RuntimeException(e.getMessage(), e); - } - } - } - - return false; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/aae24e35/query/src/main/java/org/apache/kylin/query/adhoc/JdbcConnectionPool.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/adhoc/JdbcConnectionPool.java b/query/src/main/java/org/apache/kylin/query/adhoc/JdbcConnectionPool.java deleted file mode 100644 index 7c4df90..0000000 --- a/query/src/main/java/org/apache/kylin/query/adhoc/JdbcConnectionPool.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.query.adhoc; - -import java.io.Closeable; -import java.io.IOException; -import java.sql.Connection; - -import org.apache.commons.pool.impl.GenericObjectPool; - -public class JdbcConnectionPool implements Closeable { - - private GenericObjectPool internalPool = null; - - public void createPool(JdbcConnectionFactory factory, GenericObjectPool.Config poolConfig) throws IOException { - if (this.internalPool != null) - this.close(); - this.internalPool = new GenericObjectPool(factory, poolConfig); - } - - public Connection getConnection() { - - try { - return (Connection) internalPool.borrowObject(); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - public void returnConnection(Connection conn) { - if (conn != null) { - try { - internalPool.returnObject(conn); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } - } - - public void invalidateConnection(Connection conn) { - if (conn != null) - try { - internalPool.invalidateObject(conn); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - @Override - public void close() throws IOException { - try { - this.internalPool.close(); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/aae24e35/query/src/main/java/org/apache/kylin/query/adhoc/JdbcPushDownConnectionManager.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/adhoc/JdbcPushDownConnectionManager.java b/query/src/main/java/org/apache/kylin/query/adhoc/JdbcPushDownConnectionManager.java new file mode 100644 index 0000000..f5cdbb6 --- /dev/null +++ b/query/src/main/java/org/apache/kylin/query/adhoc/JdbcPushDownConnectionManager.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.query.adhoc; + +import java.sql.Connection; +import java.sql.SQLException; + +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.kylin.common.KylinConfig; + +public class JdbcPushDownConnectionManager { + + private static JdbcPushDownConnectionManager manager = null; + + static JdbcPushDownConnectionManager getConnectionManager() throws ClassNotFoundException { + if (manager == null) { + synchronized (JdbcPushDownConnectionManager.class) { + if (manager == null) { + manager = new JdbcPushDownConnectionManager(KylinConfig.getInstanceFromEnv()); + } + } + } + return manager; + } + + private final BasicDataSource dataSource; + + private JdbcPushDownConnectionManager(KylinConfig config) throws ClassNotFoundException { + dataSource = new BasicDataSource(); + + Class.forName(config.getJdbcDriverClass()); + dataSource.setDriverClassName(config.getJdbcDriverClass()); + dataSource.setUrl(config.getJdbcUrl()); + dataSource.setUsername(config.getJdbcUsername()); + dataSource.setPassword(config.getJdbcPassword()); + dataSource.setMaxActive(config.getPoolMaxTotal()); + dataSource.setMaxIdle(config.getPoolMaxIdle()); + dataSource.setMinIdle(config.getPoolMinIdle()); + + // Default settings + dataSource.setTestOnBorrow(true); + dataSource.setValidationQuery("select 1"); + dataSource.setRemoveAbandoned(true); + dataSource.setRemoveAbandonedTimeout(300); + } + + public void close() { + try { + dataSource.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void close(Connection conn) { + try { + conn.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public synchronized Connection getConnection() { + try { + return dataSource.getConnection(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/aae24e35/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java index 1dc82f2..503e273 100644 --- a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java +++ b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java @@ -18,7 +18,6 @@ package org.apache.kylin.query.adhoc; -import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -27,7 +26,6 @@ import java.sql.Statement; import java.util.LinkedList; import java.util.List; -import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.DBUtils; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; @@ -35,24 +33,14 @@ import org.apache.kylin.source.adhocquery.IPushDownRunner; public class PushDownRunnerJdbcImpl implements IPushDownRunner { - private static org.apache.kylin.query.adhoc.JdbcConnectionPool pool = null; + private JdbcPushDownConnectionManager manager = null; @Override public void init(KylinConfig config) { - if (pool == null) { - pool = new JdbcConnectionPool(); - JdbcConnectionFactory factory = new JdbcConnectionFactory(config.getJdbcUrl(), config.getJdbcDriverClass(), - config.getJdbcUsername(), config.getJdbcPassword()); - GenericObjectPool.Config poolConfig = new GenericObjectPool.Config(); - poolConfig.maxActive = config.getPoolMaxTotal(); - poolConfig.maxIdle = config.getPoolMaxIdle(); - poolConfig.minIdle = config.getPoolMinIdle(); - - try { - pool.createPool(factory, poolConfig); - } catch (IOException e) { - throw new RuntimeException(e.getMessage(), e); - } + try { + manager = JdbcPushDownConnectionManager.getConnectionManager(); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); } } @@ -60,7 +48,7 @@ public class PushDownRunnerJdbcImpl implements IPushDownRunner { public void executeQuery(String query, List<List<String>> results, List<SelectedColumnMeta> columnMetas) throws Exception { Statement statement = null; - Connection connection = this.getConnection(); + Connection connection = manager.getConnection(); ResultSet resultSet = null; //extract column metadata @@ -81,55 +69,38 @@ public class PushDownRunnerJdbcImpl implements IPushDownRunner { metaData.getPrecision(i), metaData.getScale(i), metaData.getColumnType(i), metaData.getColumnTypeName(i), metaData.isReadOnly(i), false, false)); } - } catch (SQLException sqlException) { - throw sqlException; } finally { DBUtils.closeQuietly(resultSet); DBUtils.closeQuietly(statement); - closeConnection(connection); + manager.close(connection); } } @Override public void executeUpdate(String sql) throws Exception { Statement statement = null; - Connection connection = this.getConnection(); + Connection connection = manager.getConnection(); try { statement = connection.createStatement(); statement.execute(sql); - } catch (SQLException sqlException) { - throw sqlException; } finally { DBUtils.closeQuietly(statement); - closeConnection(connection); + manager.close(connection); } } - private Connection getConnection() { - return pool.getConnection(); - } - - private void closeConnection(Connection connection) { - pool.returnConnection(connection); - } - - static void extractResults(ResultSet resultSet, List<List<String>> results) throws SQLException { + private void extractResults(ResultSet resultSet, List<List<String>> results) throws SQLException { List<String> oneRow = new LinkedList<String>(); - try { - while (resultSet.next()) { - //logger.debug("resultSet value: " + resultSet.getString(1)); - for (int i = 0; i < resultSet.getMetaData().getColumnCount(); i++) { - oneRow.add((resultSet.getString(i + 1))); - } - - results.add(new LinkedList<String>(oneRow)); - oneRow.clear(); + while (resultSet.next()) { + //logger.debug("resultSet value: " + resultSet.getString(1)); + for (int i = 0; i < resultSet.getMetaData().getColumnCount(); i++) { + oneRow.add((resultSet.getString(i + 1))); } - } catch (SQLException sqlException) { - throw sqlException; + + results.add(new LinkedList<String>(oneRow)); + oneRow.clear(); } } - }