KYLIN-2803 Pushdown non select query
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dbecb9b8 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dbecb9b8 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dbecb9b8 Branch: refs/heads/2622-2764 Commit: dbecb9b84bc90049acb3ef314128ac71f3516731 Parents: 480592d Author: shi shaofeng <shishaofeng@shis-MacBook-Pro.local> Authored: Fri Aug 25 16:18:21 2017 +0800 Committer: SHAOFENG SHI <shaofeng...@gmail.com> Committed: Mon Aug 28 16:02:43 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 ++ .../adhocquery/HivePushDownConverter.java | 4 ++ .../source/adhocquery/IPushDownRunner.java | 13 ++++++ .../query/adhoc/PushDownRunnerJdbcImpl.java | 36 ++++++++++++----- .../apache/kylin/query/util/PushDownUtil.java | 32 ++++++++++----- .../org/apache/kylin/query/util/QueryUtil.java | 28 +++++++++++++ .../apache/kylin/query/util/QueryUtilTest.java | 39 ++++++++++++++++++ .../apache/kylin/rest/service/QueryService.java | 42 +++++++++++++++++--- 8 files changed, 172 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index bca00e7..a113327 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1088,6 +1088,10 @@ abstract public class KylinConfigBase implements Serializable { new String[] { "org.apache.kylin.source.adhocquery.HivePushDownConverter" }); } + public boolean isPushdownQueryCacheEnabled() { + return Boolean.parseBoolean(this.getOptional("kylin.query.pushdown.cache-enabled", "false")); + } + public String getJdbcUrl() { return getOptional("kylin.query.pushdown.jdbc.url", ""); } http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/HivePushDownConverter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/HivePushDownConverter.java b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/HivePushDownConverter.java index bcd8608..eef4594 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/HivePushDownConverter.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/HivePushDownConverter.java @@ -262,6 +262,10 @@ public class HivePushDownConverter implements IPushDownConverter { // Step7.Add quote for interval in timestampadd convertedSql = timestampaddReplace(convertedSql); + // Step8.Replace integer with int + convertedSql = replaceString(convertedSql, "INTEGER", "INT"); + convertedSql = replaceString(convertedSql, "integer", "int"); + return convertedSql; } http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java index c8d18aa..0336bfb 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java @@ -36,4 +36,17 @@ public interface IPushDownRunner { * @throws Exception if running pushdown query fails */ void executeQuery(String query, List<List<String>> returnRows, List<SelectedColumnMeta> returnColumnMeta) throws Exception; + + + /** + * Run an pushdown non-query sql + * + * @param sql the sql statement + * + * @return whether the SQL is executed successfully + * + * @throws Exception if running pushdown fails + */ + boolean executeUpdate(String sql) throws Exception; + } http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java index 751e6b0..713629a 100644 --- a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java +++ b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java @@ -29,6 +29,7 @@ import java.util.List; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.DBUtils; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.source.adhocquery.IPushDownRunner; @@ -62,18 +63,15 @@ public class PushDownRunnerJdbcImpl implements IPushDownRunner { Connection connection = this.getConnection(); ResultSet resultSet = null; + //extract column metadata + ResultSetMetaData metaData = null; + int columnCount = 0; + try { statement = connection.createStatement(); resultSet = statement.executeQuery(query); extractResults(resultSet, results); - } catch (SQLException sqlException) { - throw sqlException; - } - //extract column metadata - ResultSetMetaData metaData = null; - int columnCount = 0; - try { metaData = resultSet.getMetaData(); columnCount = metaData.getColumnCount(); @@ -85,12 +83,32 @@ public class PushDownRunnerJdbcImpl implements IPushDownRunner { metaData.getPrecision(i), metaData.getScale(i), metaData.getColumnType(i), metaData.getColumnTypeName(i), metaData.isReadOnly(i), false, false)); } - } catch (SQLException sqlException) { throw sqlException; + } finally { + DBUtils.closeQuietly(resultSet); + DBUtils.closeQuietly(statement); + closeConnection(connection); } + } + + @Override + public boolean executeUpdate(String sql) throws Exception { + Statement statement = null; + Connection connection = this.getConnection(); - closeConnection(connection); + boolean success; + try { + statement = connection.createStatement(); + statement.execute(sql); + success = true; + } catch (SQLException sqlException) { + throw sqlException; + } finally { + DBUtils.closeQuietly(statement); + closeConnection(connection); + } + return success; } private Connection getConnection() { http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java index ff88738..82321a4 100644 --- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java +++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java @@ -38,6 +38,7 @@ import org.apache.calcite.sql.SqlOrderBy; import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.util.SqlVisitor; +import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.commons.lang.text.StrBuilder; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kylin.common.KylinConfig; @@ -61,10 +62,14 @@ public class PushDownUtil { if (!kylinConfig.isPushDownEnabled()) { return false; } - - Throwable rootCause = ExceptionUtils.getRootCause(sqlException); - boolean isExpectedCause = rootCause != null && (rootCause.getClass().equals(NoRealizationFoundException.class)); - + boolean isSelect = QueryUtil.isSelectStatement(sql); + boolean isExpectedCause = true; + + if (sqlException != null) { + Throwable rootCause = ExceptionUtils.getRootCause(sqlException); + isExpectedCause = rootCause != null && ((rootCause.getClass().equals(NoRealizationFoundException.class)) || (rootCause.getClass().equals(SqlValidatorException.class))); + } + if (isExpectedCause) { logger.info("Query failed to utilize pre-calculation, routing to other engines", sqlException); @@ -72,15 +77,16 @@ public class PushDownUtil { runner.init(kylinConfig); logger.debug("Query Pushdown runner {}", runner); - // String expandCC = restoreComputedColumnToExpr(sql, project); - // if (!StringUtils.equals(expandCC, sql)) { - // logger.info("computed column in sql is expanded to: " + expandCC); - // } - // default schema in calcite does not apply to other engines. // since this is a universql requirement, it's not implemented as a converter if (defaultSchema != null && !defaultSchema.equals("DEFAULT")) { - String completed = schemaCompletion(sql, defaultSchema); + String completed = sql; + try { + completed = schemaCompletion(sql, defaultSchema); + } catch (SqlParseException e) { + // fail to parse the pushdown sql, ignore + logger.debug("fail to do schema completion on the pushdown sql, ignore it.", e.getMessage()); + } if (!sql.equals(completed)) { logger.info("the query is converted to {} after schema completion", completed); sql = completed; @@ -96,7 +102,11 @@ public class PushDownUtil { } } - runner.executeQuery(sql, results, columnMetas); + if (isSelect == true) { + runner.executeQuery(sql, results, columnMetas); + } else { + runner.executeUpdate(sql); + } return true; } else { return false; http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java b/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java index 3796d44..2dff07c 100644 --- a/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java +++ b/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java @@ -197,4 +197,32 @@ public class QueryUtil { } } + public static boolean isSelectStatement(String sql) { + String sql1 = removeCommentInSql(sql); + return sql1.startsWith("select") || sql1.startsWith("with") && sql1.contains("select"); + } + + public static String removeCommentInSql(String sql) { + String sql1 = sql.toLowerCase(); + // match two patterns, one is "-- comment", the other is "/* comment */" + final String[] commentPatterns = new String[] {"--[^\r\n]*", "/\\*[^\\*/]*"}; + final int[] endOffset = new int[] {0, 2}; + + for (int i = 0; i < commentPatterns.length; i++) { + String commentPattern = commentPatterns[i]; + Pattern pattern = Pattern.compile(commentPattern); + Matcher matcher = pattern.matcher(sql1); + + while (matcher.find()) { + if (matcher.start() == 0) { + sql1 = sql1.substring(matcher.end() + endOffset[i]).trim(); + } else if ((matcher.start() > 0 && sql1.charAt(matcher.start() - 1) != '\'')) { + sql1 = (sql1.substring(0, matcher.start()) + sql1.substring(matcher.end() + endOffset[i])).trim(); + } + matcher = pattern.matcher(sql1); + } + } + + return sql1; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java b/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java index 942ef0b..46f5df4 100644 --- a/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java +++ b/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java @@ -60,4 +60,43 @@ public class QueryUtilTest extends LocalFileMetadataTestCase { Assert.assertEquals("select * from \"DEFAULT\".TEST_KYLIN_FACT", s); } } + + @Test + public void testRemoveCommentInSql() { + + String originSql = "select count(*) from test_kylin_fact where price > 10.0"; + + { + String sqlWithComment = "-- comment \n" + originSql; + + Assert.assertEquals(originSql, QueryUtil.removeCommentInSql(sqlWithComment)); + } + + { + String sqlWithComment = "-- comment \n -- comment\n" + originSql; + Assert.assertEquals(originSql, QueryUtil.removeCommentInSql(sqlWithComment)); + } + + { + + String sqlWithComment = "-- \n -- comment \n" + originSql; + Assert.assertEquals(originSql, QueryUtil.removeCommentInSql(sqlWithComment)); + } + + { + String sqlWithComment = originSql + "-- \n -- comment \n"; + Assert.assertEquals(originSql, QueryUtil.removeCommentInSql(sqlWithComment)); + } + + { + String sqlWithComment = "-- \n -- comment \n" + originSql + "-- \n -- comment \n"; + Assert.assertEquals(originSql, QueryUtil.removeCommentInSql(sqlWithComment)); + } + + { + String sqlWithComment = "/* comment */ " + originSql + "-- \n -- comment \n"; + Assert.assertEquals(originSql, QueryUtil.removeCommentInSql(sqlWithComment)); + } + + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 533b93d..c227d71 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -182,6 +182,29 @@ public class QueryService extends BasicService { } } + + public SQLResponse update(SQLRequest sqlRequest) throws Exception { + // non select operations, only supported when enable pushdown + logger.debug("Query pushdown enabled, redirect the query to alternative engine. "); + Connection conn = null; + List<List<String>> results = Lists.newArrayList(); + boolean isPushDown; + try { + conn = QueryConnection.getConnection(sqlRequest.getProject()); + isPushDown = PushDownUtil.doPushDownQuery(sqlRequest.getProject(), sqlRequest.getSql(), conn.getSchema(), null, null, null); + } catch (Exception e) { + logger.error("failed to do pushdown, error is " + e.getMessage(), e); + throw new InternalErrorException(e); + } finally { + close(null, null, conn); + } + List<SelectedColumnMeta> columnMetas = Lists.newArrayList(); + columnMetas.add(new SelectedColumnMeta(false, false, false, false, 1, false, Integer.MAX_VALUE, "c0", "c0", + null, null, null, Integer.MAX_VALUE, 128, 1, "char", false, false, false)); + SQLResponse sqlResponse = getSqlResponse(isPushDown, results, columnMetas); + return sqlResponse; + } + public void saveQuery(final String creator, final Query query) throws IOException { List<Query> queries = getQueries(creator); queries.add(query); @@ -367,11 +390,7 @@ public class QueryService extends BasicService { logger.info("Using project: " + project); logger.info("The original query: " + sql); - if (!sql.toLowerCase().contains("select") - && KylinConfig.getInstanceFromEnv().isPushDownEnabled() == false) { - logger.debug("Directly return exception as not supported"); - throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); - } + final boolean isSelect = QueryUtil.isSelectStatement(sql); long startTime = System.currentTimeMillis(); @@ -386,7 +405,15 @@ public class QueryService extends BasicService { try { if (null == sqlResponse) { - sqlResponse = query(sqlRequest); + if (isSelect == true) { + sqlResponse = query(sqlRequest); + } else if (kylinConfig.isPushDownEnabled() == true) { + sqlResponse = update(sqlRequest); + } else { + logger.debug( + "Directly return exception as the sql is unsupported, and query pushdown is disabled"); + throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); + } long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); @@ -397,6 +424,9 @@ public class QueryService extends BasicService { String.valueOf(sqlResponse.getTotalScanCount())); if (checkCondition(queryCacheEnabled, "query cache is disabled") // && checkCondition(!sqlResponse.getIsException(), "query has exception") // + && checkCondition(!(sqlResponse.isPushDown() + && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)), + "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") // && checkCondition( sqlResponse.getDuration() > durationThreshold || sqlResponse.getTotalScanCount() > scanCountThreshold