huaxingao commented on a change in pull request #33639:
URL: https://github.com/apache/spark/pull/33639#discussion_r687325296



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +145,260 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial Aggregates (Max/Min/Count) are pushed down to parquet, 
we don't need to
+   * createRowBaseReader to read data from parquet and aggregate at spark 
layer. Instead we want
+   * to get the partial Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct an InternalRow from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (parquetTypes, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+    val mutableRow = new SpecificInternalRow(aggSchema.fields.map(x => 
x.dataType))
+    val footerFileMetaData = footer.getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+
+    parquetTypes.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        aggSchema.fields(i).dataType match {
+          case ByteType =>
+            mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+          case ShortType =>
+            mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+          case IntegerType =>
+            mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+          case DateType =>
+            val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+              datetimeRebaseMode, "Parquet")
+            mutableRow.update(i, 
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Integer].toLong, 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException(s"Unexpected type 
${aggSchema.fields(i).dataType}" +
+            " for INT32")
+        }
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        aggSchema.fields(i).dataType match {
+          case LongType =>
+            mutableRow.setLong(i, values(i).asInstanceOf[Long])
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Long], d.precision, 
d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException(s"Unexpected type 
${aggSchema.fields(i).dataType}" +
+            " for INT64")
+        }
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        mutableRow.setFloat(i, values(i).asInstanceOf[Float])
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        mutableRow.setDouble(i, values(i).asInstanceOf[Double])
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        mutableRow.setBoolean(i, values(i).asInstanceOf[Boolean])
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        aggSchema.fields(i).dataType match {
+          case StringType =>
+            mutableRow.update(i, UTF8String.fromBytes(bytes))
+          case BinaryType =>
+            mutableRow.update(i, bytes)
+          case d: DecimalType =>
+            val decimal =
+              Decimal(new BigDecimal(new BigInteger(bytes), d.scale), 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException(s"Unexpected type 
${aggSchema.fields(i).dataType}" +
+            " for Binary")
+        }
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        aggSchema.fields(i).dataType match {
+          case d: DecimalType =>
+            val decimal =
+              Decimal(new BigDecimal(new BigInteger(bytes), d.scale), 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException(s"Unexpected type 
${aggSchema.fields(i).dataType}" +
+            " for FIXED_LEN_BYTE_ARRAY")
+        }
+      case _ =>
+        throw new SparkException("Unexpected parquet type name")
+    }
+    mutableRow
+  }
+
+  /**
+   * When the Aggregates (Max/Min/Count) are pushed down to parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from parquet and aggregate at spark layer. Instead we want
+   * to get the Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct a ColumnarBatch from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val (parquetTypes, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+    val capacity = 4 * 1024
+    val footerFileMetaData = footer.getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(capacity, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(capacity, aggSchema)
+    }
+
+    parquetTypes.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>

Review comment:
       seems parquet only has INT32, INT64...?
   
https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java#L71




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to