This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 15bae6e [CARBONDATA-3397]Remove SparkUnknown Expression to Index Server 15bae6e is described below commit 15bae6e5848bc83d4a6f65499fe7dacf88f5a67a Author: BJangir <babulaljangir...@gmail.com> AuthorDate: Mon May 27 14:55:39 2019 +0530 [CARBONDATA-3397]Remove SparkUnknown Expression to Index Server Problem if Query has UDF and it is registered to the Main driver Since UDF function will not be available in Index server , query will be failed in Indexserver (with NoClassDefincationFound). Solution UDF are SparkUnkownFilter(RowLevelFilterExecuterImpl) so Remove the SparkUnknown Expression because anyway for pruning we select all blocks. org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl#isScanRequired. Supply all the UDFs functions and it's related lambda expressions to IndexServer also. But it has below issues a. Spark FunctionRegistry is not writable b. sending All functions from Main Server to Index server will be costly(in Size) & no way to find implicit function and explicit user created functions. So Solution 1 is adopted. This closes #3238 --- .../core/datamap/DistributableDataMapFormat.java | 8 ++++ .../scan/filter/FilterExpressionProcessor.java | 43 ++++++++++++++++++++++ .../carbondata/indexserver/DataMapJobs.scala | 39 ++++++++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java index f76cfec..57540e4 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java @@ -334,4 +334,12 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl public boolean isJobToClearDataMaps() { return isJobToClearDataMaps; } + + public FilterResolverIntf getFilterResolverIntf() { + return filterResolverIntf; + } + + public void setFilterResolverIntf(FilterResolverIntf filterResolverIntf) { + this.filterResolverIntf = filterResolverIntf; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java index fd75496..493e7e7 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java @@ -42,6 +42,8 @@ import org.apache.carbondata.core.scan.expression.conditional.ListExpression; import org.apache.carbondata.core.scan.expression.conditional.StartsWithExpression; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.expression.logical.AndExpression; +import org.apache.carbondata.core.scan.expression.logical.OrExpression; +import org.apache.carbondata.core.scan.expression.logical.TrueExpression; import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; import org.apache.carbondata.core.scan.filter.intf.ExpressionType; @@ -487,4 +489,45 @@ public class FilterExpressionProcessor implements FilterProcessor { return !bitSet.isEmpty(); } } + + /** + * Remove UnknownExpression and change to TrueExpression + * + * @param expressionTree + * @return expressionTree without UnknownExpression + */ + public Expression removeUnknownExpression(Expression expressionTree) { + ExpressionType filterExpressionType = expressionTree.getFilterExpressionType(); + BinaryExpression currentExpression = null; + switch (filterExpressionType) { + case OR: + currentExpression = (BinaryExpression) expressionTree; + return new OrExpression( + removeUnknownExpression(currentExpression.getLeft()), + removeUnknownExpression(currentExpression.getRight()) + ); + case AND: + currentExpression = (BinaryExpression) expressionTree; + return new AndExpression( + removeUnknownExpression(currentExpression.getLeft()), + removeUnknownExpression(currentExpression.getRight()) + ); + case UNKNOWN: + return new TrueExpression(null); + default: + return expressionTree; + } + } + + /** + * Change UnknownReslover to TrueExpression Reslover. + * + * @param tableIdentifier + * @return + */ + public FilterResolverIntf changeUnknownResloverToTrue(AbsoluteTableIdentifier tableIdentifier) { + return getFilterResolverBasedOnExpressionType(ExpressionType.TRUE, false, + new TrueExpression(null), tableIdentifier, new TrueExpression(null)); + + } } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala index 57bdf34..0ee4ebb 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala @@ -26,6 +26,11 @@ import org.apache.spark.util.SizeEstimator import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat} import org.apache.carbondata.core.indexstore.ExtendedBlocklet +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.scan.expression.BinaryExpression +import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.scan.filter.resolver.{FilterResolverIntf, LogicalFilterResolverImpl, RowLevelFilterResolverImpl} import org.apache.carbondata.spark.util.CarbonScalaUtil.logTime /** @@ -42,11 +47,45 @@ class DistributedDataMapJob extends AbstractDataMapJob { LOGGER.debug(s"Size of message sent to Index Server: $messageSize") } val (resonse, time) = logTime { + var filterInf = dataMapFormat.getFilterResolverIntf + val filterProcessor = new FilterExpressionProcessor + filterInf = removeSparkUnknown(filterInf, + dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor) + dataMapFormat.setFilterResolverIntf(filterInf) IndexServer.getClient.getSplits(dataMapFormat).toList.asJava } LOGGER.info(s"Time taken to get response from server: $time ms") resonse } + + /** + * Iterate over FiltersReslover, + * a. Change only RowLevelFilterResolverImpl because SparkUnkown is part of it + * and others FilterReslover like ConditionalFilterResolverImpl so directly return. + * b. Change SparkUnkownExpression to TrueExpression so that isScanRequired + * selects block/blocklet. + * + * @param filterInf FiltersReslover to be changed + * @param tableIdentifer AbsoluteTableIdentifier object + * @param filterProcessor changed FiltersReslover. + * @return + */ + def removeSparkUnknown(filterInf: FilterResolverIntf, + tableIdentifer: AbsoluteTableIdentifier, + filterProcessor: FilterExpressionProcessor): FilterResolverIntf = { + if (filterInf.isInstanceOf[LogicalFilterResolverImpl]) { + return new LogicalFilterResolverImpl( + removeSparkUnknown(filterInf.getLeft, tableIdentifer, filterProcessor), + removeSparkUnknown(filterInf.getRight, tableIdentifer, filterProcessor), + filterProcessor.removeUnknownExpression(filterInf.getFilterExpression). + asInstanceOf[BinaryExpression]) + } + if (filterInf.isInstanceOf[RowLevelFilterResolverImpl] && + filterInf.getFilterExecuterType == ExpressionType.UNKNOWN) { + return filterProcessor.changeUnknownResloverToTrue(tableIdentifer) + } + return filterInf; + } } /**