minor, refactor pushdown code

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/068baf6b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/068baf6b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/068baf6b

Branch: refs/heads/master
Commit: 068baf6b3ec13fe6e68e9a41593dcd139b4a8597
Parents: a4aeb77
Author: Li Yang <liy...@apache.org>
Authored: Wed Aug 30 21:18:19 2017 +0800
Committer: Roger Shi <rogershijich...@gmail.com>
Committed: Thu Aug 31 10:37:32 2017 +0800

----------------------------------------------------------------------
 .../kylin/common/debug/BackdoorToggles.java     |  19 +++-
 .../source/adhocquery/IPushDownRunner.java      |   2 +-
 .../org/apache/kylin/query/KylinTestBase.java   |  11 +-
 .../query/adhoc/PushDownRunnerJdbcImpl.java     |   5 +-
 .../apache/kylin/query/relnode/OLAPContext.java |  11 +-
 .../apache/kylin/query/relnode/OLAPJoinRel.java |   1 +
 .../relnode/OLAPToEnumerableConverter.java      |  61 ++---------
 .../routing/RoutingIndicatorException.java      |  35 +++++++
 .../apache/kylin/query/util/PushDownUtil.java   | 104 ++++++++++---------
 .../apache/kylin/rest/service/QueryService.java |  37 ++++---
 10 files changed, 145 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java 
b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index 8cb48b6..88c7c16 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -40,6 +40,14 @@ public class BackdoorToggles {
         _backdoorToggles.set(toggles);
     }
 
+    public static void addToggle(String key, String value) {
+        Map<String, String> map = _backdoorToggles.get();
+        if (map == null) {
+            setToggles(Maps.<String, String> newHashMap());
+        }
+        _backdoorToggles.get().put(key, value);
+    }
+    
     public static void addToggles(Map<String, String> toggles) {
         Map<String, String> map = _backdoorToggles.get();
         if (map == null) {
@@ -47,6 +55,15 @@ public class BackdoorToggles {
         }
         _backdoorToggles.get().putAll(toggles);
     }
+    
+    // try avoid using this generic method
+    public static String getToggle(String key) {
+        Map<String, String> map = _backdoorToggles.get();
+        if (map == null)
+            return null;
+        
+        return map.get(key);
+    }
 
     public static String getCoprocessorBehavior() {
         return getString(DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR);
@@ -109,7 +126,7 @@ public class BackdoorToggles {
     public static boolean getPrepareOnly() {
         return getBoolean(DEBUG_TOGGLE_PREPARE_ONLY);
     }
-
+    
     private static String getString(String key) {
         Map<String, String> toggles = _backdoorToggles.get();
         if (toggles == null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java
 
b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java
index 0336bfb..9983f5c 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java
@@ -47,6 +47,6 @@ public interface IPushDownRunner {
      *
      * @throws Exception if running pushdown fails
      */
-    boolean executeUpdate(String sql) throws Exception;
+    void executeUpdate(String sql) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java 
b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 8f143c7..d3db995 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -70,7 +70,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 /**
@@ -261,14 +260,12 @@ public class KylinTestBase {
 
             return output(resultSet, needDisplay);
         } catch (SQLException sqlException) {
-            List<List<String>> results = Lists.newArrayList();
-            List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
-            boolean b = 
PushDownUtil.doPushDownQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, 
"DEFAULT", results,
-                    columnMetas, sqlException);
-            if (!b) {
+            Pair<List<List<String>>, List<SelectedColumnMeta>> result = 
PushDownUtil
+                    .tryPushDownQuery(ProjectInstance.DEFAULT_PROJECT_NAME, 
sql, "DEFAULT", sqlException);
+            if (result == null) {
                 throw sqlException;
             }
-            return results.size();
+            return result.getFirst().size();
         } finally {
             if (resultSet != null) {
                 try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java 
b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
index 713629a..8c701c1 100644
--- 
a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
+++ 
b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
@@ -93,22 +93,19 @@ public class PushDownRunnerJdbcImpl implements 
IPushDownRunner {
     }
 
     @Override
-    public boolean executeUpdate(String sql) throws Exception {
+    public void executeUpdate(String sql) throws Exception {
         Statement statement = null;
         Connection connection = this.getConnection();
 
-        boolean success;
         try {
             statement = connection.createStatement();
             statement.execute(sql);
-            success = true;
         } catch (SQLException sqlException) {
             throw sqlException;
         } finally {
             DBUtils.closeQuietly(statement);
             closeConnection(connection);
         }
-        return success;
     }
 
     private Connection getConnection() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 3a42ddb..d1608e9 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -29,6 +29,7 @@ import java.util.Set;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -111,6 +112,7 @@ public class OLAPContext {
     public OLAPSchema olapSchema = null;
     public OLAPTableScan firstTableScan = null; // to be fact table scan 
except "select * from lookupTable"
     public Set<OLAPTableScan> allTableScans = new HashSet<>();
+    public Set<OLAPJoinRel> allOlapJoins = new HashSet<>();
     public Set<MeasureDesc> involvedMeasure = new HashSet<>();
     public TupleInfo returnTupleInfo = null;
     public boolean afterAggregate = false;
@@ -203,14 +205,7 @@ public class OLAPContext {
     // 
============================================================================
 
     public interface IAccessController {
-        /*
-        * @return {TupleFilter} if the filter condition exists
-        * @OLAPAuthentication the authentication info
-        * @columns required columns from logic query plan
-        * @realization the cube used in this query
-        * @OLAPInsufficientException no rights exception
-        */
-        public TupleFilter check(OLAPAuthentication olapAuthentication, 
Collection<TblColRef> columns, IRealization realization) throws 
IllegalArgumentException;
+        public void check(List<OLAPContext> contexts, KylinConfig config) 
throws IllegalStateException;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
index a27cf76..3b5c3cf 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
@@ -124,6 +124,7 @@ public class OLAPJoinRel extends EnumerableJoin implements 
OLAPRel {
         }
 
         this.context = implementor.getContext();
+        this.context.allOlapJoins.add(this);
         this.isTopJoin = !this.context.hasJoin;
         this.context.hasJoin = true;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
index 7ac86b2..c7b0fe2 100644
--- 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
+++ 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
@@ -19,36 +19,24 @@
 package org.apache.kylin.query.relnode;
 
 import java.util.List;
-import java.util.Set;
 
 import org.apache.calcite.adapter.enumerable.EnumerableRel;
 import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
-import org.apache.calcite.adapter.enumerable.PhysType;
-import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
-import org.apache.calcite.linq4j.tree.Blocks;
-import org.apache.calcite.linq4j.tree.Expressions;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterImpl;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.query.routing.RealizationChooser;
-import org.apache.kylin.query.schema.OLAPTable;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 /**
  */
@@ -81,14 +69,11 @@ public class OLAPToEnumerableConverter extends 
ConverterImpl implements Enumerab
         OLAPRel.OLAPImplementor olapImplementor = new 
OLAPRel.OLAPImplementor();
         olapImplementor.visitChild(getInput(), this);
 
-        // identify model
+        // identify model & realization
         List<OLAPContext> contexts = listContextsHavingScan();
         RealizationChooser.selectRealization(contexts);
 
-        // identify realization for each context
-        for (OLAPContext context : contexts) {
-            doAccessControl(context);
-        }
+        doAccessControl(contexts);
 
         // rewrite query if necessary
         OLAPRel.RewriteImplementor rewriteImplementor = new 
OLAPRel.RewriteImplementor();
@@ -122,47 +107,13 @@ public class OLAPToEnumerableConverter extends 
ConverterImpl implements Enumerab
         return result;
     }
 
-    private void doAccessControl(OLAPContext context) {
-        String controllerCls = 
KylinConfig.getInstanceFromEnv().getQueryAccessController();
+    private void doAccessControl(List<OLAPContext> contexts) {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        String controllerCls = config.getQueryAccessController();
         if (null != controllerCls && !controllerCls.isEmpty()) {
             OLAPContext.IAccessController accessController = 
(OLAPContext.IAccessController) ClassUtil.newInstance(controllerCls);
-            TupleFilter tupleFilter = 
accessController.check(context.olapAuthen, context.allColumns, 
context.realization);
-            if (null != tupleFilter) {
-                context.filterColumns.addAll(collectColumns(tupleFilter));
-                context.allColumns.addAll(collectColumns(tupleFilter));
-                context.filter = TupleFilter.and(context.filter, tupleFilter);
-            }
-        }
-    }
-
-    private Set<TblColRef> collectColumns(TupleFilter filter) {
-        Set<TblColRef> ret = Sets.newHashSet();
-        collectColumnsRecursively(filter, ret);
-        return ret;
-    }
-
-    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> 
collector) {
-        if (filter == null)
-            return;
-
-        if (filter instanceof ColumnTupleFilter) {
-            collector.add(((ColumnTupleFilter) filter).getColumn());
+            accessController.check(contexts, config);
         }
-        for (TupleFilter child : filter.getChildren()) {
-            collectColumnsRecursively(child, collector);
-        }
-    }
-
-    @SuppressWarnings("unused")
-    private Result buildHiveResult(EnumerableRelImplementor enumImplementor, 
Prefer pref, OLAPContext context) {
-        RelDataType hiveRowType = getRowType();
-
-        context.setReturnTupleInfo(hiveRowType, null);
-        PhysType physType = PhysTypeImpl.of(enumImplementor.getTypeFactory(), 
hiveRowType, pref.preferArray());
-
-        RelOptTable factTable = context.firstTableScan.getTable();
-        Result result = enumImplementor.result(physType, 
Blocks.toBlock(Expressions.call(factTable.getExpression(OLAPTable.class), 
"executeHiveQuery", enumImplementor.getRootExpression())));
-        return result;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/routing/RoutingIndicatorException.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/routing/RoutingIndicatorException.java
 
b/query/src/main/java/org/apache/kylin/query/routing/RoutingIndicatorException.java
new file mode 100644
index 0000000..a655455
--- /dev/null
+++ 
b/query/src/main/java/org/apache/kylin/query/routing/RoutingIndicatorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing;
+
+/**
+ * A special exception serves as a routing indicator.
+ */
+public class RoutingIndicatorException extends RuntimeException {
+
+    private static final long serialVersionUID = 7631508437415520091L;
+
+    public RoutingIndicatorException(String message, Throwable t) {
+        super(message, t);
+    }
+
+    public RoutingIndicatorException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java 
b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index 82321a4..9262b20 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -47,70 +47,78 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.apache.kylin.query.routing.NoRealizationFoundException;
+import org.apache.kylin.query.routing.RoutingIndicatorException;
 import org.apache.kylin.source.adhocquery.IPushDownConverter;
 import org.apache.kylin.source.adhocquery.IPushDownRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 public class PushDownUtil {
     private static final Logger logger = 
LoggerFactory.getLogger(PushDownUtil.class);
 
-    public static boolean doPushDownQuery(String project, String sql, String 
defaultSchema, List<List<String>> results,
-            List<SelectedColumnMeta> columnMetas, SQLException sqlException) 
throws Exception {
+    public static Pair<List<List<String>>, List<SelectedColumnMeta>> 
tryPushDownQuery(String project, String sql,
+            String defaultSchema, SQLException sqlException) throws Exception {
 
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        if (!kylinConfig.isPushDownEnabled()) {
-            return false;
-        }
-        boolean isSelect = QueryUtil.isSelectStatement(sql);
-        boolean isExpectedCause = true;
-        
-        if (sqlException != null) {
-            Throwable rootCause = ExceptionUtils.getRootCause(sqlException);
-            isExpectedCause = rootCause != null && 
((rootCause.getClass().equals(NoRealizationFoundException.class)) || 
(rootCause.getClass().equals(SqlValidatorException.class)));
-        }
-        
-        if (isExpectedCause) {
-
-            logger.info("Query failed to utilize pre-calculation, routing to 
other engines", sqlException);
-            IPushDownRunner runner = (IPushDownRunner) 
ClassUtil.newInstance(kylinConfig.getPushDownRunnerClassName());
-            runner.init(kylinConfig);
-            logger.debug("Query Pushdown runner {}", runner);
-
-            // default schema in calcite does not apply to other engines.
-            // since this is a universql requirement, it's not implemented as 
a converter
-            if (defaultSchema != null && !defaultSchema.equals("DEFAULT")) {
-                String completed = sql;
-                try {
-                    completed = schemaCompletion(sql, defaultSchema);
-                } catch (SqlParseException e) {
-                    // fail to parse the pushdown sql, ignore
-                    logger.debug("fail to do schema completion on the pushdown 
sql, ignore it.", e.getMessage());
-                }
-                if (!sql.equals(completed)) {
-                    logger.info("the query is converted to {} after schema 
completion", completed);
-                    sql = completed;
-                }
-            }
 
-            for (String converterName : 
kylinConfig.getPushDownConverterClassNames()) {
-                IPushDownConverter converter = (IPushDownConverter) 
ClassUtil.newInstance(converterName);
-                String converted = converter.convert(sql, project, 
defaultSchema);
-                if (!sql.equals(converted)) {
-                    logger.info("the query is converted to {} after applying 
converter {}", converted, converterName);
-                    sql = converted;
-                }
+        if (!kylinConfig.isPushDownEnabled())
+            return null;
+
+        if (!isExpectedCause(sqlException))
+            return null;
+
+        logger.info("Query failed to utilize pre-calculation, routing to other 
engines", sqlException);
+        IPushDownRunner runner = (IPushDownRunner) 
ClassUtil.newInstance(kylinConfig.getPushDownRunnerClassName());
+        runner.init(kylinConfig);
+        logger.debug("Query Pushdown runner {}", runner);
+
+        // default schema in calcite does not apply to other engines.
+        // since this is a universql requirement, it's not implemented as a 
converter
+        if (defaultSchema != null && !defaultSchema.equals("DEFAULT")) {
+            String completed = sql;
+            try {
+                completed = schemaCompletion(sql, defaultSchema);
+            } catch (SqlParseException e) {
+                // fail to parse the pushdown sql, ignore
+                logger.debug("fail to do schema completion on the pushdown 
sql, ignore it.", e.getMessage());
             }
+            if (!sql.equals(completed)) {
+                logger.info("the query is converted to {} after schema 
completion", completed);
+                sql = completed;
+            }
+        }
 
-            if (isSelect == true) {
-                runner.executeQuery(sql, results, columnMetas);
-            } else {
-                runner.executeUpdate(sql);
+        for (String converterName : 
kylinConfig.getPushDownConverterClassNames()) {
+            IPushDownConverter converter = (IPushDownConverter) 
ClassUtil.newInstance(converterName);
+            String converted = converter.convert(sql, project, defaultSchema);
+            if (!sql.equals(converted)) {
+                logger.info("the query is converted to {} after applying 
converter {}", converted, converterName);
+                sql = converted;
             }
-            return true;
+        }
+
+        List<List<String>> returnRows = Lists.newArrayList();
+        List<SelectedColumnMeta> returnColumnMeta = Lists.newArrayList();
+
+        if (QueryUtil.isSelectStatement(sql)) {
+            runner.executeQuery(sql, returnRows, returnColumnMeta);
         } else {
-            return false;
+            runner.executeUpdate(sql);
         }
+        return Pair.newPair(returnRows, returnColumnMeta);
+    }
+
+    private static boolean isExpectedCause(SQLException sqlException) {
+        Preconditions.checkArgument(sqlException != null);
+
+        Throwable rootCause = ExceptionUtils.getRootCause(sqlException);
+        return rootCause != null && //
+                (rootCause instanceof NoRealizationFoundException //
+                        || rootCause instanceof SqlValidatorException // 
+                        || rootCause instanceof RoutingIndicatorException);
     }
 
     static String schemaCompletion(String inputSql, String schema) throws 
SqlParseException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index c227d71..5ac595f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -63,6 +63,7 @@ import 
org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.util.DBUtils;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -187,22 +188,18 @@ public class QueryService extends BasicService {
         // non select operations, only supported when enable pushdown
         logger.debug("Query pushdown enabled, redirect the query to 
alternative engine. ");
         Connection conn = null;
-        List<List<String>> results = Lists.newArrayList();
-        boolean isPushDown;
         try {
             conn = QueryConnection.getConnection(sqlRequest.getProject());
-            isPushDown = PushDownUtil.doPushDownQuery(sqlRequest.getProject(), 
sqlRequest.getSql(), conn.getSchema(), null, null, null);
+            Pair<List<List<String>>, List<SelectedColumnMeta>> r = PushDownUtil
+                    .tryPushDownQuery(sqlRequest.getProject(), 
sqlRequest.getSql(), conn.getSchema(), null);
+            return buildSqlResponse(true, r.getFirst(), r.getSecond());
+            
         } catch (Exception e) {
             logger.error("failed to do pushdown, error is " + e.getMessage(), 
e);
             throw new InternalErrorException(e);
         } finally {
             close(null, null, conn);
         }
-        List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
-        columnMetas.add(new SelectedColumnMeta(false, false, false, false, 1, 
false, Integer.MAX_VALUE, "c0", "c0",
-                null, null, null, Integer.MAX_VALUE, 128, 1, "char", false, 
false, false));
-        SQLResponse sqlResponse = getSqlResponse(isPushDown, results, 
columnMetas);
-        return sqlResponse;
     }
 
     public void saveQuery(final String creator, final Query query) throws 
IOException {
@@ -791,7 +788,7 @@ public class QueryService extends BasicService {
     private SQLResponse execute(String correctedSql, SQLRequest sqlRequest, 
Connection conn) throws Exception {
         Statement stat = null;
         ResultSet resultSet = null;
-        Boolean isPushDown = false;
+        boolean isPushDown = false;
 
         List<List<String>> results = Lists.newArrayList();
         List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
@@ -830,17 +827,22 @@ public class QueryService extends BasicService {
 
                 results.add(oneRow);
             }
+            
         } catch (SQLException sqlException) {
-            isPushDown = PushDownUtil.doPushDownQuery(sqlRequest.getProject(), 
correctedSql, conn.getSchema(), results,
-                    columnMetas, sqlException);
-            if (!isPushDown) {
+            Pair<List<List<String>>, List<SelectedColumnMeta>> r = PushDownUtil
+                    .tryPushDownQuery(sqlRequest.getProject(), correctedSql, 
conn.getSchema(), sqlException);
+            if (r == null)
                 throw sqlException;
-            }
+            
+            isPushDown = true;
+            results = r.getFirst();
+            columnMetas = r.getSecond();
+            
         } finally {
-            close(resultSet, stat, null);//conn is passed in, not my duty to 
close
+            close(resultSet, stat, null); //conn is passed in, not my duty to 
close
         }
 
-        return getSqlResponse(isPushDown, results, columnMetas);
+        return buildSqlResponse(isPushDown, results, columnMetas);
     }
 
     private SQLResponse getPrepareOnlySqlResponse(String correctedSql, 
Connection conn, Boolean isPushDown,
@@ -883,10 +885,10 @@ public class QueryService extends BasicService {
             CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(false);
         }
 
-        return getSqlResponse(isPushDown, results, columnMetas);
+        return buildSqlResponse(isPushDown, results, columnMetas);
     }
 
-    private SQLResponse getSqlResponse(Boolean isPushDown, List<List<String>> 
results,
+    private SQLResponse buildSqlResponse(Boolean isPushDown, 
List<List<String>> results,
             List<SelectedColumnMeta> columnMetas) {
 
         boolean isPartialResult = false;
@@ -918,6 +920,7 @@ public class QueryService extends BasicService {
      * @param param
      * @throws SQLException
      */
+    @SuppressWarnings("unused")
     private void setParam(PreparedStatement preparedState, int index, 
PrepareSqlRequest.StateParam param)
             throws SQLException {
         boolean isNull = (null == param.getValue());

Reply via email to