[ https://issues.apache.org/jira/browse/KYLIN-5200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Liu Zhao reassigned KYLIN-5200: ------------------------------- Assignee: Liu Zhao > Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent > --------------------------------------------------------------------------- > > Key: KYLIN-5200 > URL: https://issues.apache.org/jira/browse/KYLIN-5200 > Project: Kylin > Issue Type: Bug > Components: Metadata > Affects Versions: v4.0.1 > Reporter: Liu Zhao > Assignee: Liu Zhao > Priority: Major > > I created a cube on kylin version 4.0.1. One of the measures is defined as > raw. When I query after building, I find that there are inconsistencies > between parquet schema and spark schema. When building cube, the raw measure > written to parquet is processed with spark max, and the datatype of Max is > child Datatype, in my cube, child Datatype is decimal (19,4). However, when I > query through SQL, raw is uniformly specified as binarytype in tablescanpaln. > Therefore, I wonder if the structtype of raw in tablescanpaln also uses child > dataType ? > when build ,Raw type is child.dataType > @see org.apache.kylin.engine.spark.job.CuboidAggregator > {code:java} > measure.expression.toUpperCase(Locale.ROOT) match { > case "MAX" => > max(columns.head).as(id.toString) > case "MIN" => > min(columns.head).as(id.toString) > case "SUM" => > sum(columns.head).as(id.toString) > case "COUNT" => > if (reuseLayout) { > sum(columns.head).as(id.toString) > } else { > count(columns.head).as(id.toString) > } > case "COUNT_DISTINCT" => > // for test > if (isSparkSql) { > countDistinct(columns.head).as(id.toString) > } else { > val cdAggregate = getCountDistinctAggregate(columns, > measure.returnType, reuseLayout) > new Column(cdAggregate.toAggregateExpression()).as(id.toString) > } > case "TOP_N" => > // Uses new TopN aggregate function > // located in > kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala > val schema = StructType(measure.pra.map { col => > val dateType = col.dataType > if (col == measure) { > StructField(s"MEASURE_${col.columnName}", dateType) > } else { > StructField(s"DIMENSION_${col.columnName}", dateType) > } > }) > if (reuseLayout) { > new Column(ReuseTopN(measure.returnType.precision, schema, > columns.head.expr) > .toAggregateExpression()).as(id.toString) > } else { > new Column(EncodeTopN(measure.returnType.precision, schema, > columns.head.expr, columns.drop(1).map(_.expr)) > .toAggregateExpression()).as(id.toString) > } > case "PERCENTILE_APPROX" => > val udfName = > UdfManager.register(measure.returnType.toKylinDataType, measure.expression, > null, !reuseLayout) > if (!reuseLayout) { > callUDF(udfName, columns.head.cast(StringType)).as(id.toString) > } else { > callUDF(udfName, columns.head).as(id.toString) > } > case _ => > max(columns.head).as(id.toString) // Raw matcher here,but max > dataType is child.dataType > } > }.toSeq > {code} > But when query,Raw StructType is BinaryType. > @see org.apache.kylin.query.runtime.plans.TableScanPlan > ,org.apache.spark.sql.utils.SparkTypeUtil > {code:java} > def toSparkType(dataTp: DataType, isSum: Boolean = false): > org.apache.spark.sql.types.DataType = { > dataTp.getName match { > // org.apache.spark.sql.catalyst.expressions.aggregate.Sum#resultType > case "decimal" => > if (isSum) { > val i = dataTp.getPrecision + 10 > DecimalType(Math.min(DecimalType.MAX_PRECISION, i), dataTp.getScale) > } > else DecimalType(dataTp.getPrecision, dataTp.getScale) > case "date" => DateType > case "time" => DateType > case "timestamp" => TimestampType > case "datetime" => DateType > case "tinyint" => if (isSum) LongType else ByteType > case "smallint" => if (isSum) LongType else ShortType > case "integer" => if (isSum) LongType else IntegerType > case "int4" => if (isSum) LongType else IntegerType > case "bigint" => LongType > case "long8" => LongType > case "float" => if (isSum) DoubleType else FloatType > case "double" => DoubleType > case tp if tp.startsWith("varchar") => StringType > case tp if tp.startsWith("char") => StringType > case "dim_dc" => LongType > case "boolean" => BooleanType > case tp if tp.startsWith("hllc") => BinaryType > case tp if tp.startsWith("bitmap") => BinaryType > case tp if tp.startsWith("extendedcolumn") => BinaryType > case tp if tp.startsWith("percentile") => BinaryType > case tp if tp.startsWith("raw") => BinaryType > case _ => throw new IllegalArgumentException(dataTp.toString) > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)