http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java index 124413d..747b064 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java +++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java @@ -18,17 +18,14 @@ package org.apache.carbondata.stream; import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; @@ -78,12 +75,11 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; - import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.CarbonVectorProxy; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.*; /** * Stream record reader @@ -117,7 +113,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { // vectorized reader private StructType outputSchema; - private Object vectorProxy; + private CarbonVectorProxy vectorProxy; private boolean isFinished = false; // filter @@ -143,9 +139,6 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { // InputMetricsStats private InputMetricsStats inputMetricsStats; - private static final LogService LOGGER = - LogServiceFactory.getLogService(CarbonStreamRecordReader.class.getName()); - public CarbonStreamRecordReader(boolean isVectorReader, InputMetricsStats inputMetricsStats, QueryModel mdl, boolean useRawRow) { this.isVectorReader = isVectorReader; @@ -400,21 +393,15 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { return null; } - @Override public Object getCurrentValue() throws IOException, InterruptedException { - if (isVectorReader) { - Method method = null; - try { - method = vectorProxy.getClass().getMethod("numRows"); - int value = (int) method.invoke(vectorProxy); - if (inputMetricsStats != null) { - inputMetricsStats.incrementRecordRead((long) value); + @Override public Object getCurrentValue() throws IOException, InterruptedException { + if (isVectorReader) { + int value = vectorProxy.numRows(); + if (inputMetricsStats != null) { + inputMetricsStats.incrementRecordRead((long) value); + } + + return vectorProxy.getColumnarBatch(); } - method = vectorProxy.getClass().getMethod("getColumnarBatch"); - return method.invoke(vectorProxy); - } catch (Exception e) { - throw new IOException(e); - } - } if (inputMetricsStats != null) { inputMetricsStats.incrementRecordRead(1L); @@ -440,50 +427,39 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { return true; } - private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException { - Constructor cons = null; - // if filter is null and output projection is empty, use the row number of blocklet header - int rowNum = 0; - String methodName = "setNumRows"; - try { - String vectorReaderClassName = "org.apache.spark.sql.CarbonVectorProxy"; - cons = CarbonStreamUtils.getConstructorWithReflection(vectorReaderClassName, MemoryMode.class, - StructType.class, int.class); - if (skipScanData) { - - int rowNums = header.getBlocklet_info().getNum_rows(); - vectorProxy = cons.newInstance(MemoryMode.OFF_HEAP, outputSchema, rowNums); - Method setNumRowsMethod = vectorProxy.getClass().getMethod(methodName, int.class); - setNumRowsMethod.invoke(vectorProxy, rowNums); - input.skipBlockletData(true); - return rowNums > 0; - } - input.readBlockletData(header); - vectorProxy = cons.newInstance(MemoryMode.OFF_HEAP,outputSchema, input.getRowNums()); - if (null == filter) { - while (input.hasNext()) { - readRowFromStream(); - putRowToColumnBatch(rowNum++); + private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException { + // if filter is null and output projection is empty, use the row number of blocklet header + if (skipScanData) { + int rowNums = header.getBlocklet_info().getNum_rows(); + vectorProxy= new CarbonVectorProxy(MemoryMode.OFF_HEAP,outputSchema,rowNums); + vectorProxy.setNumRows(rowNums); + input.skipBlockletData(true); + return rowNums > 0; } - } else { - try { - while (input.hasNext()) { - readRowFromStream(); - if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) { - putRowToColumnBatch(rowNum++); + + input.readBlockletData(header); + vectorProxy= new CarbonVectorProxy(MemoryMode.OFF_HEAP,outputSchema,input.getRowNums()); + int rowNum = 0; + if (null == filter) { + while (input.hasNext()) { + readRowFromStream(); + putRowToColumnBatch(rowNum++); + } + } else { + try { + while (input.hasNext()) { + readRowFromStream(); + if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) { + putRowToColumnBatch(rowNum++); + } + } + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in vector reader", e); } - } - } catch (FilterUnsupportedException e) { - throw new IOException("Failed to filter row in vector reader", e); } - } - Method setNumRowsMethod = vectorProxy.getClass().getMethod(methodName, int.class); - setNumRowsMethod.invoke(vectorProxy, rowNum); - } catch (Exception e) { - throw new IOException("Failed to fill row in vector reader", e); + vectorProxy.setNumRows(rowNum); + return rowNum > 0; } - return rowNum > 0; - } private void readRowFromStream() { input.nextRow(); @@ -719,43 +695,24 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { } } - private void putRowToColumnBatch(int rowId) { - Class<?>[] paramTypes = {int.class, Object.class, int.class}; - Method putRowToColumnBatch = null; - try { - putRowToColumnBatch = vectorProxy.getClass().getMethod("putRowToColumnBatch", paramTypes); + private void putRowToColumnBatch(int rowId) { + for (int i = 0; i < projection.length; i++) { + Object value = outputValues[i]; + vectorProxy.putRowToColumnBatch(rowId,value,i); - } catch (Exception e) { - LOGGER.error( - "Unable to put the row in the vector" + "rowid: " + rowId + e); - } - for (int i = 0; i < projection.length; i++) { - Object value = outputValues[i]; - try { - putRowToColumnBatch.invoke(vectorProxy, rowId, value, i); - } catch (Exception e) { - LOGGER.error( - "Unable to put the row in the vector" + "rowid: " + rowId + e); - } + } } - } - @Override public float getProgress() throws IOException, InterruptedException { - return 0; - } - - @Override public void close() throws IOException { - if (null != input) { - input.close(); + @Override public float getProgress() throws IOException, InterruptedException { + return 0; } - if (null != vectorProxy) { - try { - Method closeMethod = vectorProxy.getClass().getMethod("close"); - closeMethod.invoke(vectorProxy); - } catch (Exception e) { - LOGGER.error( - "Unable to close the stream vector reader" + e); - } + + @Override public void close() throws IOException { + if (null != input) { + input.close(); + } + if (null != vectorProxy) { + vectorProxy.close(); + } } - } }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala index 6f038eb..41fc013 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala @@ -20,7 +20,6 @@ import java.lang.Long import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.InputMetrics -import org.apache.spark.sql.execution.vectorized.ColumnarBatch import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.util.TaskMetricsMap http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala index d433470..450ead1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala @@ -21,16 +21,11 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.Count +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count} import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _} import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hive.HiveSessionCatalog import org.apache.spark.sql.optimizer.CarbonDecoderRelation -import org.apache.spark.sql.types.{StringType, TimestampType} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.CarbonAliasDecoderRelation case class CarbonDictionaryCatalystDecoder( @@ -125,22 +120,40 @@ case class InsertIntoCarbonTable (table: CarbonDatasourceHadoopRelation, object CountStarPlan { type ReturnType = (mutable.MutableList[Attribute], LogicalPlan) - /** - * It fill count star query attribute. - */ - private def fillCountStarAttribute( - expr: Expression, - outputColumns: mutable.MutableList[Attribute]) { - expr match { - case par@Alias(_, _) => - val head = par.children.head.children.head - head match { - case count: Count if count.children.head.isInstanceOf[Literal] => - outputColumns += par.toAttribute - case _ => - } - } - } + /** + * It fill count star query attribute. + * 2.2.1 plan + * Aggregate [count(1) AS count(1)#30L] + * +- Project + * + *2.3.0 plan + * Aggregate [cast(count(1) as string) AS count(1)#29] + * +- Project + */ + private def fillCountStarAttribute( + expr: Expression, + outputColumns: mutable.MutableList[Attribute]) { + expr match { + case par@Alias(cast: Cast, _) => + if (cast.child.isInstanceOf[AggregateExpression]) { + val head = cast.child.children.head + head match { + case count: Count if count.children.head.isInstanceOf[Literal] => + outputColumns += par.toAttribute + case _ => + } + } + case par@Alias(child, _) => + if (child.isInstanceOf[AggregateExpression]) { + val head = child.children.head + head match { + case count: Count if count.children.head.isInstanceOf[Literal] => + outputColumns += par.toAttribute + case _ => + } + } + } + } def unapply(plan: LogicalPlan): Option[ReturnType] = { plan match { http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala index 9b78db0..ac8eb64 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala @@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier @@ -64,7 +66,13 @@ case class CarbonCountStar( carbonTable.getTableName, Some(carbonTable.getDatabaseName))).map(_.asJava).orNull), carbonTable) - val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]]) + val valueRaw = + attributesRaw.head.dataType match { + case StringType => Seq(UTF8String.fromString(Long.box(rowCount).toString)).toArray + .asInstanceOf[Array[Any]] + case _ => Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]] + } + val value = new GenericInternalRow(valueRaw) val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray) val row = if (outUnsafeRows) unsafeProjection(value) else value sparkContext.parallelize(Seq(row)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 96a8162..d6117de 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -356,12 +356,7 @@ object CarbonSession { // Register a successfully instantiated context to the singleton. This should be at the // end of the class definition so that the singleton is updated only if there is no // exception in the construction of the instance. - sparkContext.addSparkListener(new SparkListener { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - SparkSession.setDefaultSession(null) - SparkSession.sqlListener.set(null) - } - }) + CarbonToSparkAdapater.addSparkListener(sparkContext) session.streams.addListener(new CarbonStreamingQueryListener(session)) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala index 6312746..1ff9bc3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala @@ -34,8 +34,6 @@ case class CustomDeterministicExpression(nonDt: Expression ) extends Expression override def children: Seq[Expression] = Seq() - override def deterministic: Boolean = true - def childexp : Expression = nonDt override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("") http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 516f9af..52801b1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -884,17 +884,21 @@ case class CarbonLoadDataCommand( // datatype is always int val column = table.getColumnByName(table.getTableName, attr.name) if (column.hasEncoding(Encoding.DICTIONARY)) { - AttributeReference( - attr.name, + CarbonToSparkAdapater.createAttributeReference(attr.name, IntegerType, attr.nullable, - attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated) + attr.metadata, + attr.exprId, + attr.qualifier, + attr) } else if (attr.dataType == TimestampType || attr.dataType == DateType) { - AttributeReference( - attr.name, + CarbonToSparkAdapater.createAttributeReference(attr.name, LongType, attr.nullable, - attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated) + attr.metadata, + attr.exprId, + attr.qualifier, + attr) } else { attr } @@ -1095,7 +1099,8 @@ case class CarbonLoadDataCommand( CarbonReflectionUtils.getLogicalRelation(hdfsRelation, hdfsRelation.schema.toAttributes, - Some(catalogTable)) + Some(catalogTable), + false) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 1fcba3e..8f128fe 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types._ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil +import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -48,6 +49,7 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.spark.util.CarbonScalaUtil + /** * Carbon specific optimization for late decode (convert dictionary key to value as late as * possible), which can improve the aggregation performance and reduce memory usage @@ -55,17 +57,33 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { val PUSHED_FILTERS = "PushedFilters" + /* + Spark 2.3.1 plan there can be case of multiple projections like below + Project [substring(name, 1, 2)#124, name#123, tupleId#117, cast(rand(-6778822102499951904)#125 + as string) AS rand(-6778822102499951904)#137] + +- Project [substring(name#123, 1, 2) AS substring(name, 1, 2)#124, name#123, UDF:getTupleId() + AS tupleId#117, + customdeterministicexpression(rand(-6778822102499951904)) AS rand(-6778822102499951904)#125] + +- Relation[imei#118,age#119,task#120L,num#121,level#122,name#123] + CarbonDatasourceHadoopRelation [] + */ def apply(plan: LogicalPlan): Seq[SparkPlan] = { plan match { case PhysicalOperation(projects, filters, l: LogicalRelation) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation] - pruneFilterProject( - l, - projects, - filters, - (a, f, needDecoder, p) => toCatalystRDD(l, a, relation.buildScan( - a.map(_.name).toArray, filters, projects, f, p), needDecoder)) :: Nil + // In Spark 2.3.1 there is case of multiple projections like below + // if 1 projection is failed then need to continue to other + try { + pruneFilterProject( + l, + projects, + filters, + (a, f, needDecoder, p) => toCatalystRDD(l, a, relation.buildScan( + a.map(_.name).toArray, filters, projects, f, p), needDecoder)) :: Nil + } catch { + case e: CarbonPhysicalPlanException => Nil + } case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) => if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) || !CarbonDictionaryDecoder. @@ -157,17 +175,20 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { if (names.nonEmpty) { val partitionSet = AttributeSet(names .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get)) - val partitionKeyFilters = - ExpressionSet(ExpressionSet(filterPredicates).filter(_.references.subsetOf(partitionSet))) + val partitionKeyFilters = CarbonToSparkAdapater + .getPartitionKeyFilter(partitionSet, filterPredicates) // Update the name with lower case as it is case sensitive while getting partition info. val updatedPartitionFilters = partitionKeyFilters.map { exp => exp.transform { case attr: AttributeReference => - AttributeReference( + CarbonToSparkAdapater.createAttributeReference( attr.name.toLowerCase, attr.dataType, attr.nullable, - attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated) + attr.metadata, + attr.exprId, + attr.qualifier, + attr) } } partitions = @@ -224,7 +245,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } } - val (unhandledPredicates, pushedFilters) = + val (unhandledPredicates, pushedFilters, handledFilters ) = selectFilters(relation.relation, candidatePredicates) // A set of column attributes that are only referenced by pushed down filters. We can eliminate @@ -232,10 +253,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { val handledSet = { val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains) val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references)) - AttributeSet(handledPredicates.flatMap(_.references)) -- - (projectSet ++ unhandledSet).map(relation.attributeMap) + try { + AttributeSet(handledPredicates.flatMap(_.references)) -- + (projectSet ++ unhandledSet).map(relation.attributeMap) + } catch { + case e => throw new CarbonPhysicalPlanException + } } - // Combines all Catalyst filter `Expression`s that are either not convertible to data source // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) @@ -309,6 +333,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { scanBuilder, candidatePredicates, pushedFilters, + handledFilters, metadata, needDecoder, updateRequestedColumns.asInstanceOf[Seq[Attribute]]) @@ -345,6 +370,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { scanBuilder, candidatePredicates, pushedFilters, + handledFilters, metadata, needDecoder, updateRequestedColumns.asInstanceOf[Seq[Attribute]]) @@ -359,9 +385,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { output: Seq[Attribute], partitions: Seq[PartitionSpec], scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter], - ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow], + ArrayBuffer[AttributeReference], Seq[PartitionSpec]) + => RDD[InternalRow], candidatePredicates: Seq[Expression], - pushedFilters: Seq[Filter], + pushedFilters: Seq[Filter], handledFilters: Seq[Filter], metadata: Map[String, String], needDecoder: ArrayBuffer[AttributeReference], updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = { @@ -380,19 +407,16 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { metadata, relation.catalogTable.map(_.identifier), relation) } else { - RowDataSourceScanExec(output, - scanBuilder(updateRequestedColumns, - candidatePredicates, - pushedFilters, - needDecoder, - partitions), - relation.relation, - getPartitioning(table.carbonTable, updateRequestedColumns), - metadata, - relation.catalogTable.map(_.identifier)) + val partition = getPartitioning(table.carbonTable, updateRequestedColumns) + val rdd = scanBuilder(updateRequestedColumns, candidatePredicates, + pushedFilters, needDecoder, partitions) + CarbonReflectionUtils.getRowDataSourceScanExecObj(relation, output, + pushedFilters, handledFilters, + rdd, partition, metadata) } } + def updateRequestedColumnsFunc(requestedColumns: Seq[Expression], relation: CarbonDatasourceHadoopRelation, needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = { @@ -471,7 +495,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { protected[sql] def selectFilters( relation: BaseRelation, - predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = { + predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Seq[Filter]) = { // In case of ComplexType dataTypes no filters should be pushed down. IsNotNull is being // explicitly added by spark and pushed. That also has to be handled and pushed back to @@ -536,7 +560,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { // a filter to every row or not. val (_, translatedFilters) = translated.unzip - (unrecognizedPredicates ++ unhandledPredicates, translatedFilters) + (unrecognizedPredicates ++ unhandledPredicates, translatedFilters, handledFilters) } /** @@ -701,3 +725,5 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } } } + +class CarbonPhysicalPlanException extends Exception http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 0c9490d..4499b19 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.strategy import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, UnresolvedRelation} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.command._ @@ -38,9 +39,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} -/** - * Carbon strategies for ddl commands - */ + /** + * Carbon strategies for ddl commands + * CreateDataSourceTableAsSelectCommand class has extra argument in + * 2.3, so need to add wrapper to match the case + */ +object MatchCreateDataSourceTable { + def unapply(plan: LogicalPlan): Option[(CatalogTable, SaveMode, LogicalPlan)] = plan match { + case t: CreateDataSourceTableAsSelectCommand => Some(t.table, t.mode, t.query) + case _ => None + } +} class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { val LOGGER: LogService = @@ -231,7 +240,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil - case cmd@CreateDataSourceTableAsSelectCommand(tableDesc, mode, query) + case MatchCreateDataSourceTable(tableDesc, mode, query) if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") || tableDesc.provider.get.equalsIgnoreCase("carbondata")) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index 97c37df..62e2d85 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCom import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CarbonException -import org.apache.spark.util.CarbonReflectionUtils +import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -74,9 +74,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica "Update operation is not supported for table which has index datamaps") } } - val tableRelation = if (SPARK_VERSION.startsWith("2.1")) { + val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) { relation - } else if (SPARK_VERSION.startsWith("2.2")) { + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { alias match { case Some(_) => CarbonReflectionUtils.getSubqueryAlias( @@ -206,9 +206,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica } } // include tuple id in subquery - if (SPARK_VERSION.startsWith("2.1")) { + if (SparkUtil.isSparkVersionEqualTo("2.1")) { Project(projList, relation) - } else if (SPARK_VERSION.startsWith("2.2")) { + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { alias match { case Some(_) => val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias( @@ -277,14 +277,13 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi case attr => attr } } - val version = SPARK_VERSION val newChild: LogicalPlan = if (newChildOutput == child.output) { - if (version.startsWith("2.1")) { + if (SparkUtil.isSparkVersionEqualTo("2.1")) { CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan] - } else if (version.startsWith("2.2")) { + } else if (SparkUtil.isSparkVersionEqualTo("2.2")) { CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan] } else { - throw new UnsupportedOperationException(s"Spark version $version is not supported") + throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported") } } else { Project(newChildOutput, child) http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 70e61bc..1840c5d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -23,7 +23,6 @@ import java.net.URI import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.permission.{FsAction, FsPermission} -import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession} import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias} import org.apache.spark.sql.catalyst.TableIdentifier @@ -31,7 +30,8 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.util.CarbonReflectionUtils +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree @@ -43,9 +43,8 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema -import org.apache.carbondata.core.metadata.schema.table +import org.apache.carbondata.core.metadata.schema.{table, SchemaReader} import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.metadata.schema.SchemaReader import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.ThriftWriter @@ -72,6 +71,13 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { } } +object MatchLogicalRelation { + def unapply(logicalPlan: LogicalPlan): Option[(BaseRelation, Any, Any)] = logicalPlan match { + case l: LogicalRelation => Some(l.relation, l.output, l.catalogTable) + case _ => None + } +} + class CarbonFileMetastore extends CarbonMetaStore { @transient @@ -143,13 +149,13 @@ class CarbonFileMetastore extends CarbonMetaStore { sparkSession.catalog.currentDatabase) val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => + MatchLogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => carbonDatasourceHadoopRelation.carbonRelation - case LogicalRelation( + case MatchLogicalRelation( carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => carbonDatasourceHadoopRelation.carbonRelation case SubqueryAlias(_, c) - if SPARK_VERSION.startsWith("2.2") && + if (SparkUtil.isSparkVersionXandAbove("2.2")) && (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") || c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") || c.getClass.getName.equals( @@ -598,13 +604,13 @@ class CarbonFileMetastore extends CarbonMetaStore { val relation: LogicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) relation match { case SubqueryAlias(_, - LogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => + MatchLogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => carbonDataSourceHadoopRelation - case LogicalRelation( + case MatchLogicalRelation( carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => carbonDataSourceHadoopRelation case SubqueryAlias(_, c) - if SPARK_VERSION.startsWith("2.2") && + if (SparkUtil.isSparkVersionXandAbove("2.2")) && (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") || c.getClass.getName .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") || http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index c59246d..76ff41a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -273,11 +273,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule case attr: AttributeReference => updatedExpression.find { p => p._1.sameRef(attr) } match { case Some((_, childAttr)) => - AttributeReference( + CarbonToSparkAdapater.createAttributeReference( childAttr.name, childAttr.dataType, childAttr.nullable, - childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated) + childAttr.metadata, + childAttr.exprId, + attr.qualifier, + attr) case None => attr } @@ -296,11 +299,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule case attr: AttributeReference => updatedExpression.find { p => p._1.sameRef(attr) } match { case Some((_, childAttr)) => - AttributeReference( + CarbonToSparkAdapater.createAttributeReference( childAttr.name, childAttr.dataType, childAttr.nullable, - childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated) + childAttr.metadata, + childAttr.exprId, + attr.qualifier, + attr) case None => attr } @@ -777,24 +783,36 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule val factAlias = factPlanExpForStreaming(name) // create attribute reference object for each expression val attrs = factAlias.map { factAlias => - AttributeReference( + CarbonToSparkAdapater.createAttributeReference( name, alias.dataType, - alias.nullable) (factAlias.exprId, alias.qualifier, alias.isGenerated) + alias.nullable, + Metadata.empty, + factAlias.exprId, + alias.qualifier, + alias) } // add aggregate function in Aggregate node added for handling streaming // to aggregate results from fact and aggregate table val updatedAggExp = getAggregateExpressionForAggregation(aggExp, attrs) // same reference id will be used as it can be used by above nodes in the plan like // sort, project, join - Alias( + CarbonToSparkAdapater.createAliasRef( updatedAggExp.head, - name)(alias.exprId, alias.qualifier, Option(alias.metadata), alias.isGenerated) + name, + alias.exprId, + alias.qualifier, + Option(alias.metadata), + Some(alias)) case alias@Alias(expression, name) => - AttributeReference( + CarbonToSparkAdapater.createAttributeReference( name, alias.dataType, - alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated) + alias.nullable, + Metadata.empty, + alias.exprId, + alias.qualifier, + alias) } updatedExp } @@ -897,9 +915,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule case attr: AttributeReference => newAggExp += attr case exp: Expression => - newAggExp += Alias( + newAggExp += CarbonToSparkAdapater.createAliasRef( exp, - "dummy_" + counter)(NamedExpression.newExprId, None, None, false) + "dummy_" + counter, + NamedExpression.newExprId) counter = counter + 1 } } @@ -923,12 +942,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // get the new aggregate expression val newAggExp = getAggFunctionForFactStreaming(aggExp) val updatedExp = newAggExp.map { exp => - Alias(exp, - name)( - NamedExpression.newExprId, - alias.qualifier, + CarbonToSparkAdapater.createAliasRef(exp, + name, + NamedExpression.newExprId, + alias.qualifier, Some(alias.metadata), - alias.isGenerated) + Some(alias)) } // adding to map which will be used while Adding an Aggregate node for handling streaming // table plan change @@ -936,10 +955,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule updatedExp case alias@Alias(exp: Expression, name) => val newAlias = Seq(alias) - val attr = AttributeReference(name, - alias.dataType, - alias.nullable, - alias.metadata) (alias.exprId, alias.qualifier, alias.isGenerated) + val attr = CarbonToSparkAdapater.createAttributeReference(name, + alias.dataType, + alias.nullable, + alias.metadata, + alias.exprId, + alias.qualifier, + alias) factPlanGrpExpForStreaming.put( AggExpToColumnMappingModel( removeQualifiers(PreAggregateUtil.normalizeExprId(exp, plan.allAttributes))), @@ -1093,11 +1115,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule private def removeQualifiers(expression: Expression) : Expression = { expression.transform { case attr: AttributeReference => - AttributeReference( + CarbonToSparkAdapater.createAttributeReference( attr.name, attr.dataType, attr.nullable, - attr.metadata)(attr.exprId, None, attr.isGenerated) + attr.metadata, + attr.exprId, + None, + attr) } } @@ -1363,10 +1388,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule attr, attributes) val newExpressionId = NamedExpression.newExprId - val childTableAttr = AttributeReference(attr.name, + val childTableAttr = CarbonToSparkAdapater.createAttributeReference(attr.name, childAttr.dataType, childAttr.nullable, - childAttr.metadata)(newExpressionId, childAttr.qualifier, attr.isGenerated) + childAttr.metadata, + newExpressionId, + childAttr.qualifier, + attr) updatedExpression.put(attr, childTableAttr) // returning the alias to show proper column name in output Seq(Alias(childAttr, @@ -1378,12 +1406,20 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule attr, attributes) val newExpressionId = NamedExpression.newExprId - val parentTableAttr = AttributeReference(name, + val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name, alias.dataType, - alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated) - val childTableAttr = AttributeReference(name, + alias.nullable, + Metadata.empty, + alias.exprId, + alias.qualifier, + alias) + val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name, alias.dataType, - alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated) + alias.nullable, + Metadata.empty, + newExpressionId, + alias.qualifier, + alias) updatedExpression.put(parentTableAttr, childTableAttr) // returning alias with child attribute reference Seq(Alias(childAttr, @@ -1409,13 +1445,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule val newExpressionId = NamedExpression.newExprId // create a parent attribute reference which will be replced on node which may be referred // by node like sort join - val parentTableAttr = AttributeReference(name, + val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name, alias.dataType, - alias.nullable)(alias.exprId, alias.qualifier, alias.isGenerated) + alias.nullable, + Metadata.empty, + alias.exprId, + alias.qualifier, + alias) // creating a child attribute reference which will be replced - val childTableAttr = AttributeReference(name, + val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name, alias.dataType, - alias.nullable)(newExpressionId, alias.qualifier, alias.isGenerated) + alias.nullable, + Metadata.empty, + newExpressionId, + alias.qualifier, + alias) // adding to map, will be used during other node updation like sort, join, project updatedExpression.put(parentTableAttr, childTableAttr) // returning alias with child attribute reference @@ -1426,12 +1470,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // for streaming table // create alias for aggregate table val aggExpForStreaming = aggExp.map{ exp => - Alias(exp, - name)( + CarbonToSparkAdapater.createAliasRef(exp, + name, NamedExpression.newExprId, alias.qualifier, Some(alias.metadata), - alias.isGenerated).asInstanceOf[NamedExpression] + Some(alias)).asInstanceOf[NamedExpression] } aggExpForStreaming } @@ -1460,12 +1504,20 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } val newExpressionId = NamedExpression.newExprId - val parentTableAttr = AttributeReference(name, + val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name, alias.dataType, - alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated) - val childTableAttr = AttributeReference(name, + alias.nullable, + Metadata.empty, + alias.exprId, + alias.qualifier, + alias) + val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name, alias.dataType, - alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated) + alias.nullable, + Metadata.empty, + newExpressionId, + alias.qualifier, + alias) updatedExpression.put(parentTableAttr, childTableAttr) Seq(Alias(updatedExp, name)(newExpressionId, alias.qualifier).asInstanceOf[NamedExpression]) @@ -1787,20 +1839,23 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession) // named expression list otherwise update the list and add it to set if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) { namedExpressionList += - Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId, + CarbonToSparkAdapater.createAliasRef(expressions.head, + name + "_ sum", + NamedExpression.newExprId, alias.qualifier, Some(alias.metadata), - alias.isGenerated) + Some(alias)) validExpressionsMap += AggExpToColumnMappingModel(sumExp) } // check with same expression already count is present then do not add to // named expression list otherwise update the list and add it to set if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) { namedExpressionList += - Alias(expressions.last, name + "_ count")(NamedExpression.newExprId, - alias.qualifier, - Some(alias.metadata), - alias.isGenerated) + CarbonToSparkAdapater.createAliasRef(expressions.last, name + "_ count", + NamedExpression.newExprId, + alias.qualifier, + Some(alias.metadata), + Some(alias)) validExpressionsMap += AggExpToColumnMappingModel(countExp) } } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index 450902a..b96b6a7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.CarbonSessionCatalog +import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment @@ -49,6 +50,7 @@ import org.apache.carbondata.datamap.{TextMatch, TextMatchLimit} import org.apache.carbondata.spark.CarbonAliasDecoderRelation + /** * All filter conversions are done here. */ @@ -297,6 +299,27 @@ object CarbonFilters { filters.flatMap(translate(_, false)).toArray } + /** + * This API checks whether StringTrim object is compatible with + * carbon,carbon only deals with the space any other symbol should + * be ignored.So condition is SPARK version < 2.3. + * If it is 2.3 then trimStr field should be empty + * + * @param stringTrim + * @return + */ + def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = { + var isCompatible = true + if (SparkUtil.isSparkVersionXandAbove("2.3")) { + val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim) + .asInstanceOf[Option[Expression]] + if (trimStr.isDefined) { + isCompatible = false + } + } + isCompatible + } + def transformExpression(expr: Expression): CarbonExpression = { expr match { case Or(left, right) @@ -385,7 +408,8 @@ object CarbonFilters { new CarbonLiteralExpression(maxValueLimit, CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType))) new AndExpression(l, r) - case StringTrim(child) => transformExpression(child) + case strTrim: StringTrim if isStringTrimCompatibleWithCarbon(strTrim) => + transformExpression(strTrim) case s: ScalaUDF => new MatchExpression(s.children.head.toString()) case _ => http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index b81d0a1..48c6377 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -786,8 +786,12 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { p.transformAllExpressions { case a@Alias(exp, _) if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => - Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier, - a.explicitMetadata, a.isGenerated) + CarbonToSparkAdapater.createAliasRef(CustomDeterministicExpression(exp), + a.name, + a.exprId, + a.qualifier, + a.explicitMetadata, + Some(a)) case exp: NamedExpression if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => CustomDeterministicExpression(exp) @@ -800,8 +804,12 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { f.transformAllExpressions { case a@Alias(exp, _) if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => - Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier, - a.explicitMetadata, a.isGenerated) + CarbonToSparkAdapater.createAliasRef(CustomDeterministicExpression(exp), + a.name, + a.exprId, + a.qualifier, + a.explicitMetadata, + Some(a)) case exp: NamedExpression if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => CustomDeterministicExpression(exp) http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 8eb47fc..1622724 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.parser import scala.collection.mutable import scala.language.implicitConversions -import org.apache.spark.sql.{CarbonEnv, DeleteRecords, UpdateTable} +import org.apache.spark.sql.{CarbonToSparkAdapater, DeleteRecords, UpdateTable} import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier} import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -479,7 +479,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { logicalPlan match { case _: CarbonCreateTableCommand => ExplainCommand(logicalPlan, extended = isExtended.isDefined) - case _ => ExplainCommand(OneRowRelation) + case _ => CarbonToSparkAdapater.getExplainCommandObj } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala new file mode 100644 index 0000000..d5fe6a4 --- /dev/null +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation +import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.types.{DataType, Metadata} + +object CarbonToSparkAdapater { + + def addSparkListener(sparkContext: SparkContext) = { + sparkContext.addSparkListener(new SparkListener { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + SparkSession.setDefaultSession(null) + SparkSession.sqlListener.set(null) + } + }) + } + + def createAttributeReference(name: String, dataType: DataType, nullable: Boolean, + metadata: Metadata,exprId: ExprId, qualifier: Option[String], + attrRef : NamedExpression): AttributeReference = { + AttributeReference( + name, + dataType, + nullable, + metadata)(exprId, qualifier,attrRef.isGenerated) + } + + def createAliasRef(child: Expression, + name: String, + exprId: ExprId = NamedExpression.newExprId, + qualifier: Option[String] = None, + explicitMetadata: Option[Metadata] = None, + namedExpr: Option[NamedExpression] = None): Alias = { + val isGenerated:Boolean = if (namedExpr.isDefined) { + namedExpr.get.isGenerated + } else { + false + } + Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated) + } + + def getExplainCommandObj() : ExplainCommand = { + ExplainCommand(OneRowRelation) + } + + def getPartitionKeyFilter( + partitionSet: AttributeSet, + filterPredicates: Seq[Expression]): ExpressionSet = { + ExpressionSet( + ExpressionSet(filterPredicates) + .filter(_.references.subsetOf(partitionSet))) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java deleted file mode 100644 index 7fa01e9..0000000 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql; - -import java.math.BigInteger; - -import org.apache.spark.memory.MemoryMode; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.CalendarIntervalType; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.unsafe.types.CalendarInterval; -import org.apache.spark.unsafe.types.UTF8String; - -/** - * Adapter class which handles the columnar vector reading of the carbondata - * based on the spark ColumnVector and ColumnarBatch API. This proxy class - * handles the complexity of spark 2.1 version related api changes since - * spark ColumnVector and ColumnarBatch interfaces are still evolving. - */ -public class CarbonVectorProxy { - - private ColumnarBatch columnarBatch; - - /** - * Adapter class which handles the columnar vector reading of the carbondata - * based on the spark ColumnVector and ColumnarBatch API. This proxy class - * handles the complexity of spark 2.3 version related api changes since - * spark ColumnVector and ColumnarBatch interfaces are still evolving. - * - * @param memMode which represent the type onheap or offheap vector. - * @param rowNum rows number for vector reading - * @param structFileds, metadata related to current schema of table. - */ - public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) { - columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum); - } - - public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) { - columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum); - } - - /** - * Sets the number of rows in this batch. - */ - public void setNumRows(int numRows) { - columnarBatch.setNumRows(numRows); - } - - /** - * Returns the number of rows for read, including filtered rows. - */ - public int numRows() { - return columnarBatch.capacity(); - } - - /** - * Called to close all the columns in this batch. It is not valid to access the data after - * calling this. This must be called at the end to clean up memory allocations. - */ - public void close() { - columnarBatch.close(); - } - - /** - * Returns the row in this batch at `rowId`. Returned row is reused across calls. - */ - public InternalRow getRow(int rowId) { - return columnarBatch.getRow(rowId); - } - - /** - * Returns the row in this batch at `rowId`. Returned row is reused across calls. - */ - public Object getColumnarBatch() { - return columnarBatch; - } - - public void resetDictionaryIds(int ordinal) { - columnarBatch.column(ordinal).getDictionaryIds().reset(); - } - - /** - * Resets this column for writing. The currently stored values are no longer accessible. - */ - public void reset() { - columnarBatch.reset(); - } - - public void putRowToColumnBatch(int rowId, Object value, int offset) { - org.apache.spark.sql.types.DataType t = dataType(offset); - if (null == value) { - putNull(rowId, offset); - } else { - if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { - putBoolean(rowId, (boolean) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) { - putByte(rowId, (byte) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) { - putShort(rowId, (short) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) { - putInt(rowId, (int) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.LongType) { - putLong(rowId, (long) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) { - putFloat(rowId, (float) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) { - putDouble(rowId, (double) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.StringType) { - UTF8String v = (UTF8String) value; - putByteArray(rowId, v.getBytes(), offset); - } else if (t instanceof org.apache.spark.sql.types.DecimalType) { - DecimalType dt = (DecimalType) t; - Decimal d = Decimal.fromDecimal(value); - if (dt.precision() <= Decimal.MAX_INT_DIGITS()) { - putInt(rowId, (int) d.toUnscaledLong(), offset); - } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) { - putLong(rowId, d.toUnscaledLong(), offset); - } else { - final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); - byte[] bytes = integer.toByteArray(); - putByteArray(rowId, bytes, 0, bytes.length, offset); - } - } else if (t instanceof CalendarIntervalType) { - CalendarInterval c = (CalendarInterval) value; - columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months); - columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds); - } else if (t instanceof org.apache.spark.sql.types.DateType) { - putInt(rowId, (int) value, offset); - } else if (t instanceof org.apache.spark.sql.types.TimestampType) { - putLong(rowId, (long) value, offset); - } - } - } - - public void putBoolean(int rowId, boolean value, int ordinal) { - columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value); - } - - public void putByte(int rowId, byte value, int ordinal) { - columnarBatch.column(ordinal).putByte(rowId, (byte) value); - } - - public void putShort(int rowId, short value, int ordinal) { - columnarBatch.column(ordinal).putShort(rowId, (short) value); - } - - public void putInt(int rowId, int value, int ordinal) { - columnarBatch.column(ordinal).putInt(rowId, (int) value); - } - - public void putFloat(int rowId, float value, int ordinal) { - columnarBatch.column(ordinal).putFloat(rowId, (float) value); - } - - public void putLong(int rowId, long value, int ordinal) { - columnarBatch.column(ordinal).putLong(rowId, (long) value); - } - - public void putDouble(int rowId, double value, int ordinal) { - columnarBatch.column(ordinal).putDouble(rowId, (double) value); - } - - public void putByteArray(int rowId, byte[] value, int ordinal) { - columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value); - } - - public void putInts(int rowId, int count, int value, int ordinal) { - columnarBatch.column(ordinal).putInts(rowId, count, value); - } - - public void putShorts(int rowId, int count, short value, int ordinal) { - columnarBatch.column(ordinal).putShorts(rowId, count, value); - } - - public void putLongs(int rowId, int count, long value, int ordinal) { - columnarBatch.column(ordinal).putLongs(rowId, count, value); - } - - public void putDecimal(int rowId, Decimal value, int precision, int ordinal) { - columnarBatch.column(ordinal).putDecimal(rowId, value, precision); - - } - - public void putDoubles(int rowId, int count, double value, int ordinal) { - columnarBatch.column(ordinal).putDoubles(rowId, count, value); - } - - public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) { - columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length); - } - - public void putNull(int rowId, int ordinal) { - columnarBatch.column(ordinal).putNull(rowId); - } - - public void putNulls(int rowId, int count, int ordinal) { - columnarBatch.column(ordinal).putNulls(rowId, count); - } - - public boolean isNullAt(int rowId, int ordinal) { - return columnarBatch.column(ordinal).isNullAt(rowId); - } - - public DataType dataType(int ordinal) { - return columnarBatch.column(ordinal).dataType(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala index 989b1d5..dd690e4 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala @@ -34,11 +34,9 @@ import org.apache.spark.sql.types.StructType /** * Create table 'using carbondata' and insert the query result into it. - * * @param table the Catalog Table * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append * @param query the query whose result will be insert into the new relation - * */ case class CreateCarbonSourceTableAsSelectCommand( http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala new file mode 100644 index 0000000..7a68e3e --- /dev/null +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataType, Metadata} + +object CarbonToSparkAdapater { + + def addSparkListener(sparkContext: SparkContext) = { + sparkContext.addSparkListener(new SparkListener { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + SparkSession.setDefaultSession(null) + SparkSession.sqlListener.set(null) + } + }) + } + + def createAttributeReference(name: String, dataType: DataType, nullable: Boolean, + metadata: Metadata,exprId: ExprId, qualifier: Option[String], + attrRef : NamedExpression): AttributeReference = { + AttributeReference( + name, + dataType, + nullable, + metadata)(exprId, qualifier,attrRef.isGenerated) + } + + def createAliasRef(child: Expression, + name: String, + exprId: ExprId = NamedExpression.newExprId, + qualifier: Option[String] = None, + explicitMetadata: Option[Metadata] = None, + namedExpr: Option[NamedExpression] = None): Alias = { + val isGenerated:Boolean = if (namedExpr.isDefined) { + namedExpr.get.isGenerated + } else { + false + } + Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated) + } + + def getExplainCommandObj() : ExplainCommand = { + ExplainCommand(OneRowRelation) + } + + def getPartitionKeyFilter( + partitionSet: AttributeSet, + filterPredicates: Seq[Expression]): ExpressionSet = { + ExpressionSet( + ExpressionSet(filterPredicates) + .filter(_.references.subsetOf(partitionSet))) + } + + def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = { + Seq(OptimizeCodegen(conf)) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java deleted file mode 100644 index 944b32e..0000000 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql; - -import java.math.BigInteger; - -import org.apache.spark.memory.MemoryMode; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.CalendarIntervalType; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.unsafe.types.CalendarInterval; -import org.apache.spark.unsafe.types.UTF8String; - -/** - * Adapter class which handles the columnar vector reading of the carbondata - * based on the spark ColumnVector and ColumnarBatch API. This proxy class - * handles the complexity of spark 2.2 version related api changes since - * spark ColumnVector and ColumnarBatch interfaces are still evolving. - */ -public class CarbonVectorProxy { - - private ColumnarBatch columnarBatch; - - /** - * Adapter class which handles the columnar vector reading of the carbondata - * based on the spark ColumnVector and ColumnarBatch API. This proxy class - * handles the complexity of spark 2.3 version related api changes since - * spark ColumnVector and ColumnarBatch interfaces are still evolving. - * - * @param memMode which represent the type onheap or offheap vector. - * @param rowNum rows number for vector reading - * @param structFileds, metadata related to current schema of table. - */ - public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) { - columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum); - } - - public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) { - columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum); - } - - /** - * Sets the number of rows in this batch. - */ - public void setNumRows(int numRows) { - columnarBatch.setNumRows(numRows); - } - - /** - * Returns the number of rows for read, including filtered rows. - */ - public int numRows() { - return columnarBatch.capacity(); - } - - /** - * Called to close all the columns in this batch. It is not valid to access the data after - * calling this. This must be called at the end to clean up memory allocations. - */ - public void close() { - columnarBatch.close(); - } - - /** - * Returns the row in this batch at `rowId`. Returned row is reused across calls. - */ - public InternalRow getRow(int rowId) { - return columnarBatch.getRow(rowId); - } - - /** - * Returns the row in this batch at `rowId`. Returned row is reused across calls. - */ - public Object getColumnarBatch() { - return columnarBatch; - } - - public Object reserveDictionaryIds(int capacity , int dummyOrdinal) { - return columnarBatch.column(ordinal).reserveDictionaryIds(capacity); - } - - public void resetDictionaryIds(int ordinal) { - columnarBatch.column(ordinal).getDictionaryIds().reset(); - } - - /** - * Resets this column for writing. The currently stored values are no longer accessible. - */ - public void reset() { - columnarBatch.reset(); - } - - public void putRowToColumnBatch(int rowId, Object value, int offset) { - org.apache.spark.sql.types.DataType t = dataType(offset); - if (null == value) { - putNull(rowId, offset); - } else { - if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { - putBoolean(rowId, (boolean) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) { - putByte(rowId, (byte) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) { - putShort(rowId, (short) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) { - putInt(rowId, (int) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.LongType) { - putLong(rowId, (long) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) { - putFloat(rowId, (float) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) { - putDouble(rowId, (double) value, offset); - } else if (t == org.apache.spark.sql.types.DataTypes.StringType) { - UTF8String v = (UTF8String) value; - putByteArray(rowId, v.getBytes(), offset); - } else if (t instanceof org.apache.spark.sql.types.DecimalType) { - DecimalType dt = (DecimalType) t; - Decimal d = Decimal.fromDecimal(value); - if (dt.precision() <= Decimal.MAX_INT_DIGITS()) { - putInt(rowId, (int) d.toUnscaledLong(), offset); - } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) { - putLong(rowId, d.toUnscaledLong(), offset); - } else { - final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); - byte[] bytes = integer.toByteArray(); - putByteArray(rowId, bytes, 0, bytes.length, offset); - } - } else if (t instanceof CalendarIntervalType) { - CalendarInterval c = (CalendarInterval) value; - columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months); - columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds); - } else if (t instanceof org.apache.spark.sql.types.DateType) { - putInt(rowId, (int) value, offset); - } else if (t instanceof org.apache.spark.sql.types.TimestampType) { - putLong(rowId, (long) value, offset); - } - } - } - - public void putBoolean(int rowId, boolean value, int ordinal) { - columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value); - } - - public void putByte(int rowId, byte value, int ordinal) { - columnarBatch.column(ordinal).putByte(rowId, (byte) value); - } - - public void putShort(int rowId, short value, int ordinal) { - columnarBatch.column(ordinal).putShort(rowId, (short) value); - } - - public void putInt(int rowId, int value, int ordinal) { - columnarBatch.column(ordinal).putInt(rowId, (int) value); - } - - public void putFloat(int rowId, float value, int ordinal) { - columnarBatch.column(ordinal).putFloat(rowId, (float) value); - } - - public void putLong(int rowId, long value, int ordinal) { - columnarBatch.column(ordinal).putLong(rowId, (long) value); - } - - public void putDouble(int rowId, double value, int ordinal) { - columnarBatch.column(ordinal).putDouble(rowId, (double) value); - } - - public void putByteArray(int rowId, byte[] value, int ordinal) { - columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value); - } - - public void putInts(int rowId, int count, int value, int ordinal) { - columnarBatch.column(ordinal).putInts(rowId, count, value); - } - - public void putShorts(int rowId, int count, short value, int ordinal) { - columnarBatch.column(ordinal).putShorts(rowId, count, value); - } - - public void putLongs(int rowId, int count, long value, int ordinal) { - columnarBatch.column(ordinal).putLongs(rowId, count, value); - } - - public void putDecimal(int rowId, Decimal value, int precision, int ordinal) { - columnarBatch.column(ordinal).putDecimal(rowId, value, precision); - } - - public void putDoubles(int rowId, int count, double value, int ordinal) { - columnarBatch.column(ordinal).putDoubles(rowId, count, value); - } - - public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) { - columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length); - } - - public void putNull(int rowId, int ordinal) { - columnarBatch.column(ordinal).putNull(rowId); - } - - public void putNulls(int rowId, int count, int ordinal) { - columnarBatch.column(ordinal).putNulls(rowId, count); - } - - public boolean isNullAt(int rowId, int ordinal) { - return columnarBatch.column(ordinal).isNullAt(rowId); - } - - public DataType dataType(int ordinal) { - return columnarBatch.column(ordinal).dataType(); - } -}