Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18953#discussion_r134398990 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcUtils.scala --- @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io._ +import org.apache.orc.{OrcFile, TypeDescription} +import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} +import org.apache.orc.storage.common.`type`.HiveDecimal +import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +object OrcUtils { + /** + * Read ORC file schema. This method is used in `inferSchema`. + */ + private[orc] def readSchema(file: Path, conf: Configuration): Option[TypeDescription] = { + try { + val options = OrcFile.readerOptions(conf).filesystem(FileSystem.get(conf)) + val reader = OrcFile.createReader(file, options) + val schema = reader.getSchema + if (schema.getFieldNames.isEmpty) { + None + } else { + Some(schema) + } + } catch { + case _: IOException => None + } + } + + /** + * Return ORC schema with schema field name correction and the total number of rows. + */ + private[orc] def getSchemaAndNumberOfRows( + dataSchema: StructType, + filePath: String, + conf: Configuration) = { + val hdfsPath = new Path(filePath) + val fs = hdfsPath.getFileSystem(conf) + val reader = OrcFile.createReader(hdfsPath, OrcFile.readerOptions(conf).filesystem(fs)) + val rawSchema = reader.getSchema + val orcSchema = if (!rawSchema.getFieldNames.isEmpty && + rawSchema.getFieldNames.asScala.forall(_.startsWith("_col"))) { + var schemaString = rawSchema.toString + dataSchema.zipWithIndex.foreach { case (field: StructField, index: Int) => + schemaString = schemaString.replace(s"_col$index:", s"${field.name}:") + } + TypeDescription.fromString(schemaString) + } else { + rawSchema + } + (orcSchema, reader.getNumberOfRows) + } + + /** + * Return a ORC schema string for ORCStruct. + */ + private[orc] def getSchemaString(schema: StructType): String = { + schema.fields.map(f => s"${f.name}:${f.dataType.catalogString}").mkString("struct<", ",", ">") + } + + private[orc] def getTypeDescription(dataType: DataType) = dataType match { + case st: StructType => TypeDescription.fromString(getSchemaString(st)) + case _ => TypeDescription.fromString(dataType.catalogString) + } + + /** + * Return a Orc value object for the given Spark schema. + */ + private[orc] def createOrcValue(dataType: DataType) = + OrcStruct.createValue(getTypeDescription(dataType)) + + /** + * Convert Apache ORC OrcStruct to Apache Spark InternalRow. + * If internalRow is not None, fill into it. Otherwise, create a SpecificInternalRow and use it. + */ + private[orc] def convertOrcStructToInternalRow( + orcStruct: OrcStruct, + schema: StructType, + internalRow: Option[InternalRow] = None): InternalRow = { + val mutableRow = internalRow.getOrElse(new SpecificInternalRow(schema.map(_.dataType))) + + for (schemaIndex <- 0 until schema.length) { + val writable = orcStruct.getFieldValue(schema(schemaIndex).name) + if (writable == null) { + mutableRow.setNullAt(schemaIndex) + } else { + mutableRow(schemaIndex) = getCatalystValue(writable, schema(schemaIndex).dataType) + } + } + + mutableRow + } + + /** + * Convert Apache Spark InternalRow to Apache ORC OrcStruct. + */ + private[orc] def convertInternalRowToOrcStruct( + row: InternalRow, + schema: StructType, + struct: Option[OrcStruct] = None): OrcStruct = { + val orcStruct = struct.getOrElse(createOrcValue(schema).asInstanceOf[OrcStruct]) + + for (schemaIndex <- 0 until schema.length) { + val fieldType = schema(schemaIndex).dataType + val fieldValue = if (row.isNullAt(schemaIndex)) { + null + } else { + getWritable(row.get(schemaIndex, fieldType), fieldType) + } + orcStruct.setFieldValue(schemaIndex, fieldValue) + } + orcStruct + } + + /** + * Return WritableComparable from Spark catalyst values. + */ + private[orc] def getWritable(value: Object, dataType: DataType): WritableComparable[_] = { + if (value == null) { + null + } else { + dataType match { + case NullType => null + + case BooleanType => new BooleanWritable(value.asInstanceOf[Boolean]) + + case ByteType => new ByteWritable(value.asInstanceOf[Byte]) + case ShortType => new ShortWritable(value.asInstanceOf[Short]) + case IntegerType => new IntWritable(value.asInstanceOf[Int]) + case LongType => new LongWritable(value.asInstanceOf[Long]) + + case FloatType => new FloatWritable(value.asInstanceOf[Float]) + case DoubleType => new DoubleWritable(value.asInstanceOf[Double]) + + case StringType => new Text(value.asInstanceOf[UTF8String].getBytes) + + case BinaryType => new BytesWritable(value.asInstanceOf[Array[Byte]]) + + case DateType => new DateWritable(DateTimeUtils.toJavaDate(value.asInstanceOf[Int])) + + case TimestampType => + val us = value.asInstanceOf[Long] + var seconds = us / DateTimeUtils.MICROS_PER_SECOND + var micros = us % DateTimeUtils.MICROS_PER_SECOND + if (micros < 0) { + micros += DateTimeUtils.MICROS_PER_SECOND + seconds -= 1 + } + val t = new OrcTimestamp(seconds * 1000) + t.setNanos(micros.toInt * 1000) + t + + case _: DecimalType => + new HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[Decimal].toJavaBigDecimal)) + + case st: StructType => + convertInternalRowToOrcStruct(value.asInstanceOf[InternalRow], st) + + case ArrayType(et, _) => + val data = value.asInstanceOf[ArrayData] + val list = createOrcValue(dataType) + for (i <- 0 until data.numElements()) { + list.asInstanceOf[OrcList[WritableComparable[_]]] + .add(getWritable(data.get(i, et), et)) + } + list + + case MapType(keyType, valueType, _) => + val data = value.asInstanceOf[MapData] + val map = createOrcValue(dataType) + .asInstanceOf[OrcMap[WritableComparable[_], WritableComparable[_]]] + data.foreach(keyType, valueType, { case (k, v) => + map.put( + getWritable(k.asInstanceOf[Object], keyType), + getWritable(v.asInstanceOf[Object], valueType)) + }) + map + + case udt: UserDefinedType[_] => + val udtRow = new SpecificInternalRow(Seq(udt.sqlType)) + udtRow(0) = value + convertInternalRowToOrcStruct(udtRow, + StructType(Seq(StructField("tmp", udt.sqlType)))).getFieldValue(0) + + case _ => + throw new UnsupportedOperationException(s"$dataType is not supported yet.") + } + } + + } + + /** + * Return Spark Catalyst value from WritableComparable object. + */ + private[orc] def getCatalystValue(value: WritableComparable[_], dataType: DataType): Any = { --- End diff -- we'd better return a function to avoid per-row pattern matche. cc @HyukjinKwon who fixed similar problems many times.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org