http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala deleted file mode 100644 index 975fec1..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.util.{Map => JMap} - -import scala.collection.JavaConversions.{iterableAsScalaIterable, mapAsJavaMap, mapAsScalaMap} - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.ReadSupport.ReadContext -import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.StructType - -private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { - override def prepareForRead( - conf: Configuration, - keyValueMetaData: JMap[String, String], - fileSchema: MessageType, - readContext: ReadContext): RecordMaterializer[InternalRow] = { - log.debug(s"Preparing for read Parquet file with message type: $fileSchema") - - val toCatalyst = new CatalystSchemaConverter(conf) - val parquetRequestedSchema = readContext.getRequestedSchema - - val catalystRequestedSchema = - Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata => - metadata - // First tries to read requested schema, which may result from projections - .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) - // If not available, tries to read Catalyst schema from file metadata. It's only - // available if the target file is written by Spark SQL. - .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY)) - }.map(StructType.fromString).getOrElse { - logDebug("Catalyst schema not available, falling back to Parquet schema") - toCatalyst.convert(parquetRequestedSchema) - } - - logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema") - new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) - } - - override def init(context: InitContext): ReadContext = { - val conf = context.getConfiguration - - // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst - // schema of this file from its the metadata. - val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA)) - - // Optional schema of requested columns, in the form of a string serialized from a Catalyst - // `StructType` containing all requested columns. - val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) - - // Below we construct a Parquet schema containing all requested columns. This schema tells - // Parquet which columns to read. - // - // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise, - // we have to fallback to the full file schema which contains all columns in the file. - // Obviously this may waste IO bandwidth since it may read more columns than requested. - // - // Two things to note: - // - // 1. It's possible that some requested columns don't exist in the target Parquet file. For - // example, in the case of schema merging, the globally merged schema may contain extra - // columns gathered from other Parquet files. These columns will be simply filled with nulls - // when actually reading the target Parquet file. - // - // 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to - // Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to - // non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file - // containing a single integer array field `f1` may have the following legacy 2-level - // structure: - // - // message root { - // optional group f1 (LIST) { - // required INT32 element; - // } - // } - // - // while `CatalystSchemaConverter` may generate a standard 3-level structure: - // - // message root { - // optional group f1 (LIST) { - // repeated group list { - // required INT32 element; - // } - // } - // } - // - // Apparently, we can't use the 2nd schema to read the target Parquet file as they have - // different physical structures. - val parquetRequestedSchema = - maybeRequestedSchema.fold(context.getFileSchema) { schemaString => - val toParquet = new CatalystSchemaConverter(conf) - val fileSchema = context.getFileSchema.asGroupType() - val fileFieldNames = fileSchema.getFields.map(_.getName).toSet - - StructType - // Deserializes the Catalyst schema of requested columns - .fromString(schemaString) - .map { field => - if (fileFieldNames.contains(field.name)) { - // If the field exists in the target Parquet file, extracts the field type from the - // full file schema and makes a single-field Parquet schema - new MessageType("root", fileSchema.getType(field.name)) - } else { - // Otherwise, just resorts to `CatalystSchemaConverter` - toParquet.convert(StructType(Array(field))) - } - } - // Merges all single-field Parquet schemas to form a complete schema for all requested - // columns. Note that it's possible that no columns are requested at all (e.g., count - // some partition column of a partitioned Parquet table). That's why `fold` is used here - // and always fallback to an empty Parquet schema. - .fold(new MessageType("root")) { - _ union _ - } - } - - val metadata = - Map.empty[String, String] ++ - maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ - maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) - - logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema") - new ReadContext(parquetRequestedSchema, metadata) - } -} - -private[parquet] object CatalystReadSupport { - val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" - - val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" -}
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala deleted file mode 100644 index 84f1dcc..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} -import org.apache.parquet.schema.MessageType - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.StructType - -/** - * A [[RecordMaterializer]] for Catalyst rows. - * - * @param parquetSchema Parquet schema of the records to be read - * @param catalystSchema Catalyst schema of the rows to be constructed - */ -private[parquet] class CatalystRecordMaterializer( - parquetSchema: MessageType, catalystSchema: StructType) - extends RecordMaterializer[InternalRow] { - - private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater) - - override def getCurrentRecord: InternalRow = rootConverter.currentRow - - override def getRootConverter: GroupConverter = rootConverter -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala deleted file mode 100644 index 4fe8a39..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala +++ /dev/null @@ -1,449 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.math.{BigDecimal, BigInteger} -import java.nio.ByteOrder - -import scala.collection.JavaConversions._ -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import org.apache.parquet.column.Dictionary -import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} -import org.apache.parquet.schema.Type.Repetition -import org.apache.parquet.schema.{GroupType, PrimitiveType, Type} - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -/** - * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some - * corresponding parent container. For example, a converter for a `StructType` field may set - * converted values to a [[MutableRow]]; or a converter for array elements may append converted - * values to an [[ArrayBuffer]]. - */ -private[parquet] trait ParentContainerUpdater { - def set(value: Any): Unit = () - def setBoolean(value: Boolean): Unit = set(value) - def setByte(value: Byte): Unit = set(value) - def setShort(value: Short): Unit = set(value) - def setInt(value: Int): Unit = set(value) - def setLong(value: Long): Unit = set(value) - def setFloat(value: Float): Unit = set(value) - def setDouble(value: Double): Unit = set(value) -} - -/** A no-op updater used for root converter (who doesn't have a parent). */ -private[parquet] object NoopUpdater extends ParentContainerUpdater - -/** - * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s. - * Since any Parquet record is also a struct, this converter can also be used as root converter. - * - * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have - * any "parent" container. - * - * @param parquetType Parquet schema of Parquet records - * @param catalystType Spark SQL schema that corresponds to the Parquet record type - * @param updater An updater which propagates converted field values to the parent container - */ -private[parquet] class CatalystRowConverter( - parquetType: GroupType, - catalystType: StructType, - updater: ParentContainerUpdater) - extends GroupConverter { - - /** - * Updater used together with field converters within a [[CatalystRowConverter]]. It propagates - * converted filed values to the `ordinal`-th cell in `currentRow`. - */ - private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { - override def set(value: Any): Unit = row(ordinal) = value - override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) - override def setByte(value: Byte): Unit = row.setByte(ordinal, value) - override def setShort(value: Short): Unit = row.setShort(ordinal, value) - override def setInt(value: Int): Unit = row.setInt(ordinal, value) - override def setLong(value: Long): Unit = row.setLong(ordinal, value) - override def setDouble(value: Double): Unit = row.setDouble(ordinal, value) - override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) - } - - /** - * Represents the converted row object once an entire Parquet record is converted. - * - * @todo Uses [[UnsafeRow]] for better performance. - */ - val currentRow = new SpecificMutableRow(catalystType.map(_.dataType)) - - // Converters for each field. - private val fieldConverters: Array[Converter] = { - parquetType.getFields.zip(catalystType).zipWithIndex.map { - case ((parquetFieldType, catalystField), ordinal) => - // Converted field value should be set to the `ordinal`-th cell of `currentRow` - newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) - }.toArray - } - - override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) - - override def end(): Unit = updater.set(currentRow) - - override def start(): Unit = { - var i = 0 - while (i < currentRow.numFields) { - currentRow.setNullAt(i) - i += 1 - } - } - - /** - * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type - * `catalystType`. Converted values are handled by `updater`. - */ - private def newConverter( - parquetType: Type, - catalystType: DataType, - updater: ParentContainerUpdater): Converter = { - - catalystType match { - case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => - new CatalystPrimitiveConverter(updater) - - case ByteType => - new PrimitiveConverter { - override def addInt(value: Int): Unit = - updater.setByte(value.asInstanceOf[ByteType#InternalType]) - } - - case ShortType => - new PrimitiveConverter { - override def addInt(value: Int): Unit = - updater.setShort(value.asInstanceOf[ShortType#InternalType]) - } - - case t: DecimalType => - new CatalystDecimalConverter(t, updater) - - case StringType => - new CatalystStringConverter(updater) - - case TimestampType => - // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. - new PrimitiveConverter { - // Converts nanosecond timestamps stored as INT96 - override def addBinary(value: Binary): Unit = { - assert( - value.length() == 12, - "Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " + - s"but got a ${value.length()}-byte binary.") - - val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) - val timeOfDayNanos = buf.getLong - val julianDay = buf.getInt - updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)) - } - } - - case DateType => - new PrimitiveConverter { - override def addInt(value: Int): Unit = { - // DateType is not specialized in `SpecificMutableRow`, have to box it here. - updater.set(value.asInstanceOf[DateType#InternalType]) - } - } - - case t: ArrayType => - new CatalystArrayConverter(parquetType.asGroupType(), t, updater) - - case t: MapType => - new CatalystMapConverter(parquetType.asGroupType(), t, updater) - - case t: StructType => - new CatalystRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater { - override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) - }) - - case t: UserDefinedType[_] => - val catalystTypeForUDT = t.sqlType - val nullable = parquetType.isRepetition(Repetition.OPTIONAL) - val field = StructField("udt", catalystTypeForUDT, nullable) - val parquetTypeForUDT = new CatalystSchemaConverter().convertField(field) - newConverter(parquetTypeForUDT, catalystTypeForUDT, updater) - - case _ => - throw new RuntimeException( - s"Unable to create Parquet converter for data type ${catalystType.json}") - } - } - - /** - * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types - * are handled by this converter. Parquet primitive types are only a subset of those of Spark - * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet. - */ - private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater) - extends PrimitiveConverter { - - override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) - override def addInt(value: Int): Unit = updater.setInt(value) - override def addLong(value: Long): Unit = updater.setLong(value) - override def addFloat(value: Float): Unit = updater.setFloat(value) - override def addDouble(value: Double): Unit = updater.setDouble(value) - override def addBinary(value: Binary): Unit = updater.set(value.getBytes) - } - - /** - * Parquet converter for strings. A dictionary is used to minimize string decoding cost. - */ - private final class CatalystStringConverter(updater: ParentContainerUpdater) - extends PrimitiveConverter { - - private var expandedDictionary: Array[UTF8String] = null - - override def hasDictionarySupport: Boolean = true - - override def setDictionary(dictionary: Dictionary): Unit = { - this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i => - UTF8String.fromBytes(dictionary.decodeToBinary(i).getBytes) - } - } - - override def addValueFromDictionary(dictionaryId: Int): Unit = { - updater.set(expandedDictionary(dictionaryId)) - } - - override def addBinary(value: Binary): Unit = { - updater.set(UTF8String.fromBytes(value.getBytes)) - } - } - - /** - * Parquet converter for fixed-precision decimals. - */ - private final class CatalystDecimalConverter( - decimalType: DecimalType, - updater: ParentContainerUpdater) - extends PrimitiveConverter { - - // Converts decimals stored as INT32 - override def addInt(value: Int): Unit = { - addLong(value: Long) - } - - // Converts decimals stored as INT64 - override def addLong(value: Long): Unit = { - updater.set(Decimal(value, decimalType.precision, decimalType.scale)) - } - - // Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY - override def addBinary(value: Binary): Unit = { - updater.set(toDecimal(value)) - } - - private def toDecimal(value: Binary): Decimal = { - val precision = decimalType.precision - val scale = decimalType.scale - val bytes = value.getBytes - - if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) { - // Constructs a `Decimal` with an unscaled `Long` value if possible. - var unscaled = 0L - var i = 0 - - while (i < bytes.length) { - unscaled = (unscaled << 8) | (bytes(i) & 0xff) - i += 1 - } - - val bits = 8 * bytes.length - unscaled = (unscaled << (64 - bits)) >> (64 - bits) - Decimal(unscaled, precision, scale) - } else { - // Otherwise, resorts to an unscaled `BigInteger` instead. - Decimal(new BigDecimal(new BigInteger(bytes), scale), precision, scale) - } - } - } - - /** - * Parquet converter for arrays. Spark SQL arrays are represented as Parquet lists. Standard - * Parquet lists are represented as a 3-level group annotated by `LIST`: - * {{{ - * <list-repetition> group <name> (LIST) { <-- parquetSchema points here - * repeated group list { - * <element-repetition> <element-type> element; - * } - * } - * }}} - * The `parquetSchema` constructor argument points to the outermost group. - * - * However, before this representation is standardized, some Parquet libraries/tools also use some - * non-standard formats to represent list-like structures. Backwards-compatibility rules for - * handling these cases are described in Parquet format spec. - * - * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - */ - private final class CatalystArrayConverter( - parquetSchema: GroupType, - catalystSchema: ArrayType, - updater: ParentContainerUpdater) - extends GroupConverter { - - private var currentArray: ArrayBuffer[Any] = _ - - private val elementConverter: Converter = { - val repeatedType = parquetSchema.getType(0) - val elementType = catalystSchema.elementType - - if (isElementType(repeatedType, elementType)) { - newConverter(repeatedType, elementType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentArray += value - }) - } else { - new ElementConverter(repeatedType.asGroupType().getType(0), elementType) - } - } - - override def getConverter(fieldIndex: Int): Converter = elementConverter - - override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray)) - - // NOTE: We can't reuse the mutable `ArrayBuffer` here and must instantiate a new buffer for the - // next value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored - // in row cells. - override def start(): Unit = currentArray = ArrayBuffer.empty[Any] - - // scalastyle:off - /** - * Returns whether the given type is the element type of a list or is a syntactic group with - * one field that is the element type. This is determined by checking whether the type can be - * a syntactic group and by checking whether a potential syntactic group matches the expected - * schema. - * {{{ - * <list-repetition> group <name> (LIST) { - * repeated group list { <-- repeatedType points here - * <element-repetition> <element-type> element; - * } - * } - * }}} - * In short, here we handle Parquet list backwards-compatibility rules on the read path. This - * method is based on `AvroIndexedRecordConverter.isElementType`. - * - * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules - */ - // scalastyle:on - private def isElementType(parquetRepeatedType: Type, catalystElementType: DataType): Boolean = { - (parquetRepeatedType, catalystElementType) match { - case (t: PrimitiveType, _) => true - case (t: GroupType, _) if t.getFieldCount > 1 => true - case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true - case _ => false - } - } - - /** Array element converter */ - private final class ElementConverter(parquetType: Type, catalystType: DataType) - extends GroupConverter { - - private var currentElement: Any = _ - - private val converter = newConverter(parquetType, catalystType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentElement = value - }) - - override def getConverter(fieldIndex: Int): Converter = converter - - override def end(): Unit = currentArray += currentElement - - override def start(): Unit = currentElement = null - } - } - - /** Parquet converter for maps */ - private final class CatalystMapConverter( - parquetType: GroupType, - catalystType: MapType, - updater: ParentContainerUpdater) - extends GroupConverter { - - private var currentKeys: ArrayBuffer[Any] = _ - private var currentValues: ArrayBuffer[Any] = _ - - private val keyValueConverter = { - val repeatedType = parquetType.getType(0).asGroupType() - new KeyValueConverter( - repeatedType.getType(0), - repeatedType.getType(1), - catalystType.keyType, - catalystType.valueType) - } - - override def getConverter(fieldIndex: Int): Converter = keyValueConverter - - override def end(): Unit = - updater.set(ArrayBasedMapData(currentKeys.toArray, currentValues.toArray)) - - // NOTE: We can't reuse the mutable Map here and must instantiate a new `Map` for the next - // value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored in row - // cells. - override def start(): Unit = { - currentKeys = ArrayBuffer.empty[Any] - currentValues = ArrayBuffer.empty[Any] - } - - /** Parquet converter for key-value pairs within the map. */ - private final class KeyValueConverter( - parquetKeyType: Type, - parquetValueType: Type, - catalystKeyType: DataType, - catalystValueType: DataType) - extends GroupConverter { - - private var currentKey: Any = _ - - private var currentValue: Any = _ - - private val converters = Array( - // Converter for keys - newConverter(parquetKeyType, catalystKeyType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentKey = value - }), - - // Converter for values - newConverter(parquetValueType, catalystValueType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentValue = value - })) - - override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) - - override def end(): Unit = { - currentKeys += currentKey - currentValues += currentValue - } - - override def start(): Unit = { - currentKey = null - currentValue = null - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala deleted file mode 100644 index b12149d..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala +++ /dev/null @@ -1,592 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import scala.collection.JavaConversions._ - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.schema.OriginalType._ -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ -import org.apache.parquet.schema.Type.Repetition._ -import org.apache.parquet.schema._ - -import org.apache.spark.sql.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, maxPrecisionForBytes} -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, SQLConf} - -/** - * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] and - * vice versa. - * - * Parquet format backwards-compatibility rules are respected when converting Parquet - * [[MessageType]] schemas. - * - * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - * - * @constructor - * @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL - * [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL - * [[StructType]]. - * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL - * [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL - * [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which - * has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS` - * described in Parquet format spec. - * @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, and MAP structure when - * converting Spark SQL [[StructType]] to Parquet [[MessageType]]. For Spark 1.4.x and - * prior versions, Spark SQL only supports decimals with a max precision of 18 digits, and - * uses non-standard LIST and MAP structure. Note that the current Parquet format spec is - * backwards-compatible with these settings. If this argument is set to `false`, we fallback - * to old style non-standard behaviors. - */ -private[parquet] class CatalystSchemaConverter( - private val assumeBinaryIsString: Boolean, - private val assumeInt96IsTimestamp: Boolean, - private val followParquetFormatSpec: Boolean) { - - // Only used when constructing converter for converting Spark SQL schema to Parquet schema, in - // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant. - def this() = this( - assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, - assumeInt96IsTimestamp = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, - followParquetFormatSpec = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get) - - def this(conf: SQLConf) = this( - assumeBinaryIsString = conf.isParquetBinaryAsString, - assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, - followParquetFormatSpec = conf.followParquetFormatSpec) - - def this(conf: Configuration) = this( - assumeBinaryIsString = - conf.getBoolean( - SQLConf.PARQUET_BINARY_AS_STRING.key, - SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get), - assumeInt96IsTimestamp = - conf.getBoolean( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get), - followParquetFormatSpec = - conf.getBoolean( - SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, - SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)) - - /** - * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. - */ - def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType()) - - private def convert(parquetSchema: GroupType): StructType = { - val fields = parquetSchema.getFields.map { field => - field.getRepetition match { - case OPTIONAL => - StructField(field.getName, convertField(field), nullable = true) - - case REQUIRED => - StructField(field.getName, convertField(field), nullable = false) - - case REPEATED => - throw new AnalysisException( - s"REPEATED not supported outside LIST or MAP. Type: $field") - } - } - - StructType(fields) - } - - /** - * Converts a Parquet [[Type]] to a Spark SQL [[DataType]]. - */ - def convertField(parquetType: Type): DataType = parquetType match { - case t: PrimitiveType => convertPrimitiveField(t) - case t: GroupType => convertGroupField(t.asGroupType()) - } - - private def convertPrimitiveField(field: PrimitiveType): DataType = { - val typeName = field.getPrimitiveTypeName - val originalType = field.getOriginalType - - def typeString = - if (originalType == null) s"$typeName" else s"$typeName ($originalType)" - - def typeNotImplemented() = - throw new AnalysisException(s"Parquet type not yet supported: $typeString") - - def illegalType() = - throw new AnalysisException(s"Illegal Parquet type: $typeString") - - // When maxPrecision = -1, we skip precision range check, and always respect the precision - // specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored - // as binaries with variable lengths. - def makeDecimalType(maxPrecision: Int = -1): DecimalType = { - val precision = field.getDecimalMetadata.getPrecision - val scale = field.getDecimalMetadata.getScale - - CatalystSchemaConverter.analysisRequire( - maxPrecision == -1 || 1 <= precision && precision <= maxPrecision, - s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)") - - DecimalType(precision, scale) - } - - typeName match { - case BOOLEAN => BooleanType - - case FLOAT => FloatType - - case DOUBLE => DoubleType - - case INT32 => - originalType match { - case INT_8 => ByteType - case INT_16 => ShortType - case INT_32 | null => IntegerType - case DATE => DateType - case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT32) - case TIME_MILLIS => typeNotImplemented() - case _ => illegalType() - } - - case INT64 => - originalType match { - case INT_64 | null => LongType - case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT64) - case TIMESTAMP_MILLIS => typeNotImplemented() - case _ => illegalType() - } - - case INT96 => - CatalystSchemaConverter.analysisRequire( - assumeInt96IsTimestamp, - "INT96 is not supported unless it's interpreted as timestamp. " + - s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") - TimestampType - - case BINARY => - originalType match { - case UTF8 | ENUM => StringType - case null if assumeBinaryIsString => StringType - case null => BinaryType - case DECIMAL => makeDecimalType() - case _ => illegalType() - } - - case FIXED_LEN_BYTE_ARRAY => - originalType match { - case DECIMAL => makeDecimalType(maxPrecisionForBytes(field.getTypeLength)) - case INTERVAL => typeNotImplemented() - case _ => illegalType() - } - - case _ => illegalType() - } - } - - private def convertGroupField(field: GroupType): DataType = { - Option(field.getOriginalType).fold(convert(field): DataType) { - // A Parquet list is represented as a 3-level structure: - // - // <list-repetition> group <name> (LIST) { - // repeated group list { - // <element-repetition> <element-type> element; - // } - // } - // - // However, according to the most recent Parquet format spec (not released yet up until - // writing), some 2-level structures are also recognized for backwards-compatibility. Thus, - // we need to check whether the 2nd level or the 3rd level refers to list element type. - // - // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - case LIST => - CatalystSchemaConverter.analysisRequire( - field.getFieldCount == 1, s"Invalid list type $field") - - val repeatedType = field.getType(0) - CatalystSchemaConverter.analysisRequire( - repeatedType.isRepetition(REPEATED), s"Invalid list type $field") - - if (isElementType(repeatedType, field.getName)) { - ArrayType(convertField(repeatedType), containsNull = false) - } else { - val elementType = repeatedType.asGroupType().getType(0) - val optional = elementType.isRepetition(OPTIONAL) - ArrayType(convertField(elementType), containsNull = optional) - } - - // scalastyle:off - // `MAP_KEY_VALUE` is for backwards-compatibility - // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1 - // scalastyle:on - case MAP | MAP_KEY_VALUE => - CatalystSchemaConverter.analysisRequire( - field.getFieldCount == 1 && !field.getType(0).isPrimitive, - s"Invalid map type: $field") - - val keyValueType = field.getType(0).asGroupType() - CatalystSchemaConverter.analysisRequire( - keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2, - s"Invalid map type: $field") - - val keyType = keyValueType.getType(0) - CatalystSchemaConverter.analysisRequire( - keyType.isPrimitive, - s"Map key type is expected to be a primitive type, but found: $keyType") - - val valueType = keyValueType.getType(1) - val valueOptional = valueType.isRepetition(OPTIONAL) - MapType( - convertField(keyType), - convertField(valueType), - valueContainsNull = valueOptional) - - case _ => - throw new AnalysisException(s"Unrecognized Parquet type: $field") - } - } - - // scalastyle:off - // Here we implement Parquet LIST backwards-compatibility rules. - // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules - // scalastyle:on - private def isElementType(repeatedType: Type, parentName: String): Boolean = { - { - // For legacy 2-level list types with primitive element type, e.g.: - // - // // List<Integer> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated int32 element; - // } - // - repeatedType.isPrimitive - } || { - // For legacy 2-level list types whose element type is a group type with 2 or more fields, - // e.g.: - // - // // List<Tuple<String, Integer>> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group element { - // required binary str (UTF8); - // required int32 num; - // }; - // } - // - repeatedType.asGroupType().getFieldCount > 1 - } || { - // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.: - // - // // List<OneTuple<String>> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group array { - // required binary str (UTF8); - // }; - // } - // - repeatedType.getName == "array" - } || { - // For Parquet data generated by parquet-thrift, e.g.: - // - // // List<OneTuple<String>> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group my_list_tuple { - // required binary str (UTF8); - // }; - // } - // - repeatedType.getName == s"${parentName}_tuple" - } - } - - /** - * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]]. - */ - def convert(catalystSchema: StructType): MessageType = { - Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root") - } - - /** - * Converts a Spark SQL [[StructField]] to a Parquet [[Type]]. - */ - def convertField(field: StructField): Type = { - convertField(field, if (field.nullable) OPTIONAL else REQUIRED) - } - - private def convertField(field: StructField, repetition: Type.Repetition): Type = { - CatalystSchemaConverter.checkFieldName(field.name) - - field.dataType match { - // =================== - // Simple atomic types - // =================== - - case BooleanType => - Types.primitive(BOOLEAN, repetition).named(field.name) - - case ByteType => - Types.primitive(INT32, repetition).as(INT_8).named(field.name) - - case ShortType => - Types.primitive(INT32, repetition).as(INT_16).named(field.name) - - case IntegerType => - Types.primitive(INT32, repetition).named(field.name) - - case LongType => - Types.primitive(INT64, repetition).named(field.name) - - case FloatType => - Types.primitive(FLOAT, repetition).named(field.name) - - case DoubleType => - Types.primitive(DOUBLE, repetition).named(field.name) - - case StringType => - Types.primitive(BINARY, repetition).as(UTF8).named(field.name) - - case DateType => - Types.primitive(INT32, repetition).as(DATE).named(field.name) - - // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec. - // - // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond - // timestamp in Impala for some historical reasons, it's not recommended to be used for any - // other types and will probably be deprecated in future Parquet format spec. That's the - // reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which - // are both logical types annotating `INT64`. - // - // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting - // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store - // a timestamp into a `Long`. This design decision is subject to change though, for example, - // we may resort to microsecond precision in the future. - // - // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's - // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using) - // hasn't implemented `TIMESTAMP_MICROS` yet. - // - // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. - case TimestampType => - Types.primitive(INT96, repetition).named(field.name) - - case BinaryType => - Types.primitive(BINARY, repetition).named(field.name) - - // ===================================== - // Decimals (for Spark version <= 1.4.x) - // ===================================== - - // Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and - // always store decimals in fixed-length byte arrays. To keep compatibility with these older - // versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated - // by `DECIMAL`. - case DecimalType.Fixed(precision, scale) if !followParquetFormatSpec => - Types - .primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .length(CatalystSchemaConverter.minBytesForPrecision(precision)) - .named(field.name) - - // ===================================== - // Decimals (follow Parquet format spec) - // ===================================== - - // Uses INT32 for 1 <= precision <= 9 - case DecimalType.Fixed(precision, scale) - if precision <= MAX_PRECISION_FOR_INT32 && followParquetFormatSpec => - Types - .primitive(INT32, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .named(field.name) - - // Uses INT64 for 1 <= precision <= 18 - case DecimalType.Fixed(precision, scale) - if precision <= MAX_PRECISION_FOR_INT64 && followParquetFormatSpec => - Types - .primitive(INT64, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .named(field.name) - - // Uses FIXED_LEN_BYTE_ARRAY for all other precisions - case DecimalType.Fixed(precision, scale) if followParquetFormatSpec => - Types - .primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .length(CatalystSchemaConverter.minBytesForPrecision(precision)) - .named(field.name) - - // =================================================== - // ArrayType and MapType (for Spark versions <= 1.4.x) - // =================================================== - - // Spark 1.4.x and prior versions convert ArrayType with nullable elements into a 3-level - // LIST structure. This behavior mimics parquet-hive (1.6.0rc3). Note that this case is - // covered by the backwards-compatibility rules implemented in `isElementType()`. - case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec => - // <list-repetition> group <name> (LIST) { - // optional group bag { - // repeated <element-type> element; - // } - // } - ConversionPatterns.listType( - repetition, - field.name, - Types - .buildGroup(REPEATED) - // "array_element" is the name chosen by parquet-hive (1.7.0 and prior version) - .addField(convertField(StructField("array_element", elementType, nullable))) - .named(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME)) - - // Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level - // LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is - // covered by the backwards-compatibility rules implemented in `isElementType()`. - case ArrayType(elementType, nullable @ false) if !followParquetFormatSpec => - // <list-repetition> group <name> (LIST) { - // repeated <element-type> element; - // } - ConversionPatterns.listType( - repetition, - field.name, - // "array" is the name chosen by parquet-avro (1.7.0 and prior version) - convertField(StructField("array", elementType, nullable), REPEATED)) - - // Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by - // MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`. - case MapType(keyType, valueType, valueContainsNull) if !followParquetFormatSpec => - // <map-repetition> group <name> (MAP) { - // repeated group map (MAP_KEY_VALUE) { - // required <key-type> key; - // <value-repetition> <value-type> value; - // } - // } - ConversionPatterns.mapType( - repetition, - field.name, - convertField(StructField("key", keyType, nullable = false)), - convertField(StructField("value", valueType, valueContainsNull))) - - // ================================================== - // ArrayType and MapType (follow Parquet format spec) - // ================================================== - - case ArrayType(elementType, containsNull) if followParquetFormatSpec => - // <list-repetition> group <name> (LIST) { - // repeated group list { - // <element-repetition> <element-type> element; - // } - // } - Types - .buildGroup(repetition).as(LIST) - .addField( - Types.repeatedGroup() - .addField(convertField(StructField("element", elementType, containsNull))) - .named("list")) - .named(field.name) - - case MapType(keyType, valueType, valueContainsNull) => - // <map-repetition> group <name> (MAP) { - // repeated group key_value { - // required <key-type> key; - // <value-repetition> <value-type> value; - // } - // } - Types - .buildGroup(repetition).as(MAP) - .addField( - Types - .repeatedGroup() - .addField(convertField(StructField("key", keyType, nullable = false))) - .addField(convertField(StructField("value", valueType, valueContainsNull))) - .named("key_value")) - .named(field.name) - - // =========== - // Other types - // =========== - - case StructType(fields) => - fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) => - builder.addField(convertField(field)) - }.named(field.name) - - case udt: UserDefinedType[_] => - convertField(field.copy(dataType = udt.sqlType)) - - case _ => - throw new AnalysisException(s"Unsupported data type $field.dataType") - } - } -} - - -private[parquet] object CatalystSchemaConverter { - def checkFieldName(name: String): Unit = { - // ,;{}()\n\t= and space are special characters in Parquet schema - analysisRequire( - !name.matches(".*[ ,;{}()\n\t=].*"), - s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=". - |Please use alias to rename it. - """.stripMargin.split("\n").mkString(" ")) - } - - def checkFieldNames(schema: StructType): StructType = { - schema.fieldNames.foreach(checkFieldName) - schema - } - - def analysisRequire(f: => Boolean, message: String): Unit = { - if (!f) { - throw new AnalysisException(message) - } - } - - private def computeMinBytesForPrecision(precision : Int) : Int = { - var numBytes = 1 - while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) { - numBytes += 1 - } - numBytes - } - - private val MIN_BYTES_FOR_PRECISION = Array.tabulate[Int](39)(computeMinBytesForPrecision) - - // Returns the minimum number of bytes needed to store a decimal with a given `precision`. - def minBytesForPrecision(precision : Int) : Int = { - if (precision < MIN_BYTES_FOR_PRECISION.length) { - MIN_BYTES_FOR_PRECISION(precision) - } else { - computeMinBytesForPrecision(precision) - } - } - - val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) - - val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8) - - // Max precision of a decimal value stored in `numBytes` bytes - def maxPrecisionForBytes(numBytes: Int): Int = { - Math.round( // convert double to long - Math.floor(Math.log10( // number of base-10 digits - Math.pow(2, 8 * numBytes - 1) - 1))) // max value stored in numBytes - .asInstanceOf[Int] - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala deleted file mode 100644 index 1551afd..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter -import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} -import org.apache.parquet.Log -import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat} - -/** - * An output committer for writing Parquet files. In stead of writing to the `_temporary` folder - * like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the - * destination folder. This can be useful for data stored in S3, where directory operations are - * relatively expensive. - * - * To enable this output committer, users may set the "spark.sql.parquet.output.committer.class" - * property via Hadoop [[Configuration]]. Not that this property overrides - * "spark.sql.sources.outputCommitterClass". - * - * *NOTE* - * - * NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's - * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are - * left * empty). - */ -private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) - extends ParquetOutputCommitter(outputPath, context) { - val LOG = Log.getLog(classOf[ParquetOutputCommitter]) - - override def getWorkPath: Path = outputPath - override def abortTask(taskContext: TaskAttemptContext): Unit = {} - override def commitTask(taskContext: TaskAttemptContext): Unit = {} - override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true - override def setupJob(jobContext: JobContext): Unit = {} - override def setupTask(taskContext: TaskAttemptContext): Unit = {} - - override def commitJob(jobContext: JobContext) { - val configuration = ContextUtil.getConfiguration(jobContext) - val fileSystem = outputPath.getFileSystem(configuration) - - if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) { - try { - val outputStatus = fileSystem.getFileStatus(outputPath) - val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus) - try { - ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers) - } catch { case e: Exception => - LOG.warn("could not write summary file for " + outputPath, e) - val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE) - if (fileSystem.exists(metadataPath)) { - fileSystem.delete(metadataPath, true) - } - } - } catch { - case e: Exception => LOG.warn("could not write summary file for " + outputPath, e) - } - } - - if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) { - try { - val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) - fileSystem.create(successPath).close() - } catch { - case e: Exception => LOG.warn("could not write success file for " + outputPath, e) - } - } - } -} - http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala deleted file mode 100644 index 6ed3580..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.{MapData, ArrayData} - -// TODO Removes this while fixing SPARK-8848 -private[sql] object CatalystConverter { - // This is mostly Parquet convention (see, e.g., `ConversionPatterns`). - // Note that "array" for the array elements is chosen by ParquetAvro. - // Using a different value will result in Parquet silently dropping columns. - val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag" - val ARRAY_ELEMENTS_SCHEMA_NAME = "array" - - val MAP_KEY_SCHEMA_NAME = "key" - val MAP_VALUE_SCHEMA_NAME = "value" - val MAP_SCHEMA_NAME = "map" - - // TODO: consider using Array[T] for arrays to avoid boxing of primitive types - type ArrayScalaType = ArrayData - type StructScalaType = InternalRow - type MapScalaType = MapData -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala deleted file mode 100644 index d57b789..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ /dev/null @@ -1,360 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.io.Serializable -import java.nio.ByteBuffer - -import com.google.common.io.BaseEncoding -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.compat.FilterCompat._ -import org.apache.parquet.filter2.predicate.FilterApi._ -import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Statistics} -import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.io.api.Binary - -import org.apache.spark.SparkEnv -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -private[sql] object ParquetFilters { - val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" - - def createRecordFilter(filterExpressions: Seq[Expression]): Option[Filter] = { - filterExpressions.flatMap { filter => - createFilter(filter) - }.reduceOption(FilterApi.and).map(FilterCompat.get) - } - - case class SetInFilter[T <: Comparable[T]]( - valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable { - - override def keep(value: T): Boolean = { - value != null && valueSet.contains(value) - } - - override def canDrop(statistics: Statistics[T]): Boolean = false - - override def inverseCanDrop(statistics: Statistics[T]): Boolean = false - } - - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { - case BooleanType => - (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) - case IntegerType => - (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer]) - case LongType => - (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long]) - case FloatType => - (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) - case DoubleType => - (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - - // Binary.fromString and Binary.fromByteArray don't accept null values - case StringType => - (n: String, v: Any) => FilterApi.eq( - binaryColumn(n), - Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull) - case BinaryType => - (n: String, v: Any) => FilterApi.eq( - binaryColumn(n), - Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) - } - - private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { - case BooleanType => - (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) - case IntegerType => - (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer]) - case LongType => - (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long]) - case FloatType => - (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) - case DoubleType => - (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => - (n: String, v: Any) => FilterApi.notEq( - binaryColumn(n), - Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull) - case BinaryType => - (n: String, v: Any) => FilterApi.notEq( - binaryColumn(n), - Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) - } - - private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { - case IntegerType => - (n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer]) - case LongType => - (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long]) - case FloatType => - (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float]) - case DoubleType => - (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => - (n: String, v: Any) => - FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes)) - case BinaryType => - (n: String, v: Any) => - FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - } - - private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { - case IntegerType => - (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer]) - case LongType => - (n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long]) - case FloatType => - (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) - case DoubleType => - (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => - (n: String, v: Any) => - FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes)) - case BinaryType => - (n: String, v: Any) => - FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - } - - private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { - case IntegerType => - (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer]) - case LongType => - (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long]) - case FloatType => - (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float]) - case DoubleType => - (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => - (n: String, v: Any) => - FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes)) - case BinaryType => - (n: String, v: Any) => - FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - } - - private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { - case IntegerType => - (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer]) - case LongType => - (n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long]) - case FloatType => - (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) - case DoubleType => - (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => - (n: String, v: Any) => - FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes)) - case BinaryType => - (n: String, v: Any) => - FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - } - - private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { - case IntegerType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]])) - case LongType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]])) - case FloatType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]])) - case DoubleType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]])) - case StringType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[UTF8String].getBytes)))) - case BinaryType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]])))) - } - - /** - * Converts data sources filters to Parquet filter predicates. - */ - def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { - val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap - - // NOTE: - // - // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, - // which can be casted to `false` implicitly. Please refer to the `eval` method of these - // operators and the `SimplifyFilters` rule for details. - predicate match { - case sources.IsNull(name) => - makeEq.lift(dataTypeOf(name)).map(_(name, null)) - case sources.IsNotNull(name) => - makeNotEq.lift(dataTypeOf(name)).map(_(name, null)) - - case sources.EqualTo(name, value) => - makeEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.Not(sources.EqualTo(name, value)) => - makeNotEq.lift(dataTypeOf(name)).map(_(name, value)) - - case sources.LessThan(name, value) => - makeLt.lift(dataTypeOf(name)).map(_(name, value)) - case sources.LessThanOrEqual(name, value) => - makeLtEq.lift(dataTypeOf(name)).map(_(name, value)) - - case sources.GreaterThan(name, value) => - makeGt.lift(dataTypeOf(name)).map(_(name, value)) - case sources.GreaterThanOrEqual(name, value) => - makeGtEq.lift(dataTypeOf(name)).map(_(name, value)) - - case sources.And(lhs, rhs) => - (createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and) - - case sources.Or(lhs, rhs) => - for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) - } yield FilterApi.or(lhsFilter, rhsFilter) - - case sources.Not(pred) => - createFilter(schema, pred).map(FilterApi.not) - - case _ => None - } - } - - /** - * Converts Catalyst predicate expressions to Parquet filter predicates. - * - * @todo This can be removed once we get rid of the old Parquet support. - */ - def createFilter(predicate: Expression): Option[FilterPredicate] = { - // NOTE: - // - // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, - // which can be casted to `false` implicitly. Please refer to the `eval` method of these - // operators and the `SimplifyFilters` rule for details. - predicate match { - case IsNull(NamedExpression(name, dataType)) => - makeEq.lift(dataType).map(_(name, null)) - case IsNotNull(NamedExpression(name, dataType)) => - makeNotEq.lift(dataType).map(_(name, null)) - - case EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType)) => - makeEq.lift(dataType).map(_(name, value)) - case EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => - makeEq.lift(dataType).map(_(name, value)) - case EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _)) => - makeEq.lift(dataType).map(_(name, value)) - case EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => - makeEq.lift(dataType).map(_(name, value)) - - case Not(EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType))) => - makeNotEq.lift(dataType).map(_(name, value)) - case Not(EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _))) => - makeNotEq.lift(dataType).map(_(name, value)) - case Not(EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _))) => - makeNotEq.lift(dataType).map(_(name, value)) - case Not(EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType))) => - makeNotEq.lift(dataType).map(_(name, value)) - - case LessThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) => - makeLt.lift(dataType).map(_(name, value)) - case LessThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => - makeLt.lift(dataType).map(_(name, value)) - case LessThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) => - makeGt.lift(dataType).map(_(name, value)) - case LessThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => - makeGt.lift(dataType).map(_(name, value)) - - case LessThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) => - makeLtEq.lift(dataType).map(_(name, value)) - case LessThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => - makeLtEq.lift(dataType).map(_(name, value)) - case LessThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) => - makeGtEq.lift(dataType).map(_(name, value)) - case LessThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => - makeGtEq.lift(dataType).map(_(name, value)) - - case GreaterThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) => - makeGt.lift(dataType).map(_(name, value)) - case GreaterThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => - makeGt.lift(dataType).map(_(name, value)) - case GreaterThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) => - makeLt.lift(dataType).map(_(name, value)) - case GreaterThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => - makeLt.lift(dataType).map(_(name, value)) - - case GreaterThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) => - makeGtEq.lift(dataType).map(_(name, value)) - case GreaterThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => - makeGtEq.lift(dataType).map(_(name, value)) - case GreaterThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) => - makeLtEq.lift(dataType).map(_(name, value)) - case GreaterThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => - makeLtEq.lift(dataType).map(_(name, value)) - - case And(lhs, rhs) => - (createFilter(lhs) ++ createFilter(rhs)).reduceOption(FilterApi.and) - - case Or(lhs, rhs) => - for { - lhsFilter <- createFilter(lhs) - rhsFilter <- createFilter(rhs) - } yield FilterApi.or(lhsFilter, rhsFilter) - - case Not(pred) => - createFilter(pred).map(FilterApi.not) - - case InSet(NamedExpression(name, dataType), valueSet) => - makeInSet.lift(dataType).map(_(name, valueSet)) - - case _ => None - } - } - - /** - * Note: Inside the Hadoop API we only have access to `Configuration`, not to - * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey - * the actual filter predicate. - */ - def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = { - if (filters.nonEmpty) { - val serialized: Array[Byte] = - SparkEnv.get.closureSerializer.newInstance().serialize(filters).array() - val encoded: String = BaseEncoding.base64().encode(serialized) - conf.set(PARQUET_FILTER_DATA, encoded) - } - } - - /** - * Note: Inside the Hadoop API we only have access to `Configuration`, not to - * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey - * the actual filter predicate. - */ - def deserializeFilterExpressions(conf: Configuration): Seq[Expression] = { - val data = conf.get(PARQUET_FILTER_DATA) - if (data != null) { - val decoded: Array[Byte] = BaseEncoding.base64().decode(data) - SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded)) - } else { - Seq() - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org