[ 
https://issues.apache.org/jira/browse/KYLIN-5200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liu Zhao reassigned KYLIN-5200:
-------------------------------

    Assignee: Liu Zhao

> Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent
> ---------------------------------------------------------------------------
>
>                 Key: KYLIN-5200
>                 URL: https://issues.apache.org/jira/browse/KYLIN-5200
>             Project: Kylin
>          Issue Type: Bug
>          Components: Metadata
>    Affects Versions: v4.0.1
>            Reporter: Liu Zhao
>            Assignee: Liu Zhao
>            Priority: Major
>
> I created a cube on kylin version 4.0.1. One of the measures is defined as 
> raw. When I query after building, I find that there are inconsistencies 
> between parquet schema and spark schema. When building cube, the raw measure 
> written to parquet is processed with spark max, and the datatype of Max is 
> child Datatype, in my cube, child Datatype is decimal (19,4). However, when I 
> query through SQL, raw is uniformly specified as binarytype in tablescanpaln. 
> Therefore, I wonder if the structtype of raw in tablescanpaln also uses child 
> dataType ?
> when build ,Raw type is child.dataType
> @see org.apache.kylin.engine.spark.job.CuboidAggregator
> {code:java}
> measure.expression.toUpperCase(Locale.ROOT) match {
>         case "MAX" =>
>           max(columns.head).as(id.toString)
>         case "MIN" =>
>           min(columns.head).as(id.toString)
>         case "SUM" =>
>           sum(columns.head).as(id.toString)
>         case "COUNT" =>
>           if (reuseLayout) {
>             sum(columns.head).as(id.toString)
>           } else {
>             count(columns.head).as(id.toString)
>           }
>         case "COUNT_DISTINCT" =>
>           // for test
>           if (isSparkSql) {
>             countDistinct(columns.head).as(id.toString)
>           } else {
>             val cdAggregate = getCountDistinctAggregate(columns, 
> measure.returnType, reuseLayout)
>             new Column(cdAggregate.toAggregateExpression()).as(id.toString)
>           }
>         case "TOP_N" =>
>           // Uses new TopN aggregate function
>           // located in 
> kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
>           val schema = StructType(measure.pra.map { col =>
>             val dateType = col.dataType
>             if (col == measure) {
>               StructField(s"MEASURE_${col.columnName}", dateType)
>             } else {
>               StructField(s"DIMENSION_${col.columnName}", dateType)
>             }
>           })
>           if (reuseLayout) {
>             new Column(ReuseTopN(measure.returnType.precision, schema, 
> columns.head.expr)
>               .toAggregateExpression()).as(id.toString)
>           } else {
>             new Column(EncodeTopN(measure.returnType.precision, schema, 
> columns.head.expr, columns.drop(1).map(_.expr))
>               .toAggregateExpression()).as(id.toString)
>           }
>         case "PERCENTILE_APPROX" =>
>           val udfName = 
> UdfManager.register(measure.returnType.toKylinDataType, measure.expression, 
> null, !reuseLayout)
>           if (!reuseLayout) {
>             callUDF(udfName, columns.head.cast(StringType)).as(id.toString)
>           } else {
>             callUDF(udfName, columns.head).as(id.toString)
>           }
>         case _ =>
>           max(columns.head).as(id.toString) // Raw matcher here,but max 
> dataType is child.dataType
>       }
>     }.toSeq
> {code}
> But when query,Raw StructType is BinaryType.
> @see org.apache.kylin.query.runtime.plans.TableScanPlan 
> ,org.apache.spark.sql.utils.SparkTypeUtil
> {code:java}
> def toSparkType(dataTp: DataType, isSum: Boolean = false): 
> org.apache.spark.sql.types.DataType = {
>     dataTp.getName match {
>       // org.apache.spark.sql.catalyst.expressions.aggregate.Sum#resultType
>       case "decimal" =>
>         if (isSum) {
>           val i = dataTp.getPrecision + 10
>           DecimalType(Math.min(DecimalType.MAX_PRECISION, i), dataTp.getScale)
>         }
>         else DecimalType(dataTp.getPrecision, dataTp.getScale)
>       case "date" => DateType
>       case "time" => DateType
>       case "timestamp" => TimestampType
>       case "datetime" => DateType
>       case "tinyint" => if (isSum) LongType else ByteType
>       case "smallint" => if (isSum) LongType else ShortType
>       case "integer" => if (isSum) LongType else IntegerType
>       case "int4" => if (isSum) LongType else IntegerType
>       case "bigint" => LongType
>       case "long8" => LongType
>       case "float" => if (isSum) DoubleType else FloatType
>       case "double" => DoubleType
>       case tp if tp.startsWith("varchar") => StringType
>       case tp if tp.startsWith("char") => StringType
>       case "dim_dc" => LongType
>       case "boolean" => BooleanType
>       case tp if tp.startsWith("hllc") => BinaryType
>       case tp if tp.startsWith("bitmap") => BinaryType
>       case tp if tp.startsWith("extendedcolumn") => BinaryType
>       case tp if tp.startsWith("percentile") => BinaryType
>       case tp if tp.startsWith("raw") => BinaryType
>       case _ => throw new IllegalArgumentException(dataTp.toString)
>     }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to