[SPARK-15979][SQL] Rename various Parquet support classes. ## What changes were proposed in this pull request? This patch renames various Parquet support classes from CatalystAbc to ParquetAbc. This new naming makes more sense for two reasons:
1. These are not optimizer related (i.e. Catalyst) classes. 2. We are in the Spark code base, and as a result it'd be more clear to call out these are Parquet support classes, rather than some Spark classes. ## How was this patch tested? Renamed test cases as well. Author: Reynold Xin <r...@databricks.com> Closes #13696 from rxin/parquet-rename. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/865e7cc3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/865e7cc3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/865e7cc3 Branch: refs/heads/master Commit: 865e7cc38d2b7cf2d4f7e7b04ccb7b17791a693b Parents: 3e6d567 Author: Reynold Xin <r...@databricks.com> Authored: Wed Jun 15 20:05:08 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Jun 15 20:05:08 2016 -0700 ---------------------------------------------------------------------- .../spark/ml/source/libsvm/LibSVMRelation.scala | 6 +- .../SpecificParquetRecordReaderBase.java | 8 +- .../parquet/VectorizedColumnReader.java | 12 +- .../datasources/fileSourceInterfaces.scala | 27 +- .../parquet/CatalystReadSupport.scala | 306 --------- .../parquet/CatalystRecordMaterializer.scala | 41 -- .../parquet/CatalystRowConverter.scala | 672 ------------------- .../parquet/CatalystSchemaConverter.scala | 595 ---------------- .../parquet/CatalystWriteSupport.scala | 8 +- .../datasources/parquet/ParquetFileFormat.scala | 30 +- .../parquet/ParquetReadSupport.scala | 306 +++++++++ .../parquet/ParquetRecordMaterializer.scala | 41 ++ .../parquet/ParquetRowConverter.scala | 672 +++++++++++++++++++ .../parquet/ParquetSchemaConverter.scala | 595 ++++++++++++++++ .../datasources/parquet/ParquetIOSuite.scala | 4 +- .../datasources/parquet/ParquetQuerySuite.scala | 6 +- .../parquet/ParquetSchemaSuite.scala | 8 +- .../datasources/parquet/ParquetTest.scala | 4 +- 18 files changed, 1669 insertions(+), 1672 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index b5b2a68..62e09d2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -141,7 +141,7 @@ class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Map[String, String] = { - def computeNumFeatures(): Int = { + val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse { val dataFiles = files.filterNot(_.getPath.getName startsWith "_") val path = if (dataFiles.length == 1) { dataFiles.head.getPath.toUri.toString @@ -156,10 +156,6 @@ class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister { MLUtils.computeNumFeatures(parsed) } - val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse { - computeNumFeatures() - } - new CaseInsensitiveMap(options + ("numFeatures" -> numFeatures.toString)) } http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 14626e5..d823275 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -136,7 +136,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo ReadSupport.ReadContext readContext = readSupport.init(new InitContext( taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema)); this.requestedSchema = readContext.getRequestedSchema(); - this.sparkSchema = new CatalystSchemaConverter(configuration).convert(requestedSchema); + this.sparkSchema = new ParquetSchemaConverter(configuration).convert(requestedSchema); this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns()); for (BlockMetaData block : blocks) { this.totalRowCount += block.getRowCount(); @@ -195,12 +195,12 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo } builder.addFields(fileSchema.getType(s)); } - this.requestedSchema = builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME()); + this.requestedSchema = builder.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME()); } else { - this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE(); + this.requestedSchema = ParquetSchemaConverter.EMPTY_MESSAGE(); } } - this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema); + this.sparkSchema = new ParquetSchemaConverter(config).convert(requestedSchema); this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns()); for (BlockMetaData block : blocks) { this.totalRowCount += block.getRowCount(); http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index ea37a08..662a03d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -259,7 +259,7 @@ public class VectorizedColumnReader { for (int i = rowId; i < rowId + num; ++i) { // TODO: Convert dictionary of Binaries to dictionary of Longs Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); - column.putLong(i, CatalystRowConverter.binaryToSQLTimestamp(v)); + column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); } } else { throw new NotImplementedException(); @@ -280,12 +280,12 @@ public class VectorizedColumnReader { if (DecimalType.is32BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); - column.putInt(i, (int) CatalystRowConverter.binaryToUnscaledLong(v)); + column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v)); } } else if (DecimalType.is64BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); - column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v)); + column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v)); } } else if (DecimalType.isByteArrayDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { @@ -375,7 +375,7 @@ public class VectorizedColumnReader { if (defColumn.readInteger() == maxDefLevel) { column.putLong(rowId + i, // Read 12 bytes for INT96 - CatalystRowConverter.binaryToSQLTimestamp(data.readBinary(12))); + ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12))); } else { column.putNull(rowId + i); } @@ -394,7 +394,7 @@ public class VectorizedColumnReader { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { column.putInt(rowId + i, - (int) CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); + (int) ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); } else { column.putNull(rowId + i); } @@ -403,7 +403,7 @@ public class VectorizedColumnReader { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { column.putLong(rowId + i, - CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); + ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); } else { column.putNull(rowId + i); } http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 9c18989..641c5cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -207,6 +207,20 @@ trait FileFormat { dataSchema: StructType): OutputWriterFactory /** + * Returns a [[OutputWriterFactory]] for generating output writers that can write data. + * This method is current used only by FileStreamSinkWriter to generate output writers that + * does not use output committers to write data. The OutputWriter generated by the returned + * [[OutputWriterFactory]] must implement the method `newWriter(path)`.. + */ + def buildWriter( + sqlContext: SQLContext, + dataSchema: StructType, + options: Map[String, String]): OutputWriterFactory = { + // TODO: Remove this default implementation when the other formats have been ported + throw new UnsupportedOperationException(s"buildWriter is not supported for $this") + } + + /** * Returns whether this format support returning columnar batch or not. * * TODO: we should just have different traits for the different formats. @@ -293,19 +307,6 @@ trait FileFormat { } } - /** - * Returns a [[OutputWriterFactory]] for generating output writers that can write data. - * This method is current used only by FileStreamSinkWriter to generate output writers that - * does not use output committers to write data. The OutputWriter generated by the returned - * [[OutputWriterFactory]] must implement the method `newWriter(path)`.. - */ - def buildWriter( - sqlContext: SQLContext, - dataSchema: StructType, - options: Map[String, String]): OutputWriterFactory = { - // TODO: Remove this default implementation when the other formats have been ported - throw new UnsupportedOperationException(s"buildWriter is not supported for $this") - } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala deleted file mode 100644 index 9c885b2..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet - -import java.util.{Map => JMap} - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} -import org.apache.parquet.hadoop.api.ReadSupport.ReadContext -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema._ -import org.apache.parquet.schema.Type.Repetition - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types._ - -/** - * A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst - * [[InternalRow]]s. - * - * The API interface of [[ReadSupport]] is a little bit over complicated because of historical - * reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be - * instantiated and initialized twice on both driver side and executor side. The [[init()]] method - * is for driver side initialization, while [[prepareForRead()]] is for executor side. However, - * starting from parquet-mr 1.6.0, it's no longer the case, and [[ReadSupport]] is only instantiated - * and initialized on executor side. So, theoretically, now it's totally fine to combine these two - * methods into a single initialization method. The only reason (I could think of) to still have - * them here is for parquet-mr API backwards-compatibility. - * - * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] - * to [[prepareForRead()]], but use a private `var` for simplicity. - */ -private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { - private var catalystRequestedSchema: StructType = _ - - /** - * Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record - * readers. Responsible for figuring out Parquet requested schema used for column pruning. - */ - override def init(context: InitContext): ReadContext = { - catalystRequestedSchema = { - val conf = context.getConfiguration - val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) - assert(schemaString != null, "Parquet requested schema not set.") - StructType.fromString(schemaString) - } - - val parquetRequestedSchema = - CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) - - new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) - } - - /** - * Called on executor side after [[init()]], before instantiating actual Parquet record readers. - * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet - * records to Catalyst [[InternalRow]]s. - */ - override def prepareForRead( - conf: Configuration, - keyValueMetaData: JMap[String, String], - fileSchema: MessageType, - readContext: ReadContext): RecordMaterializer[InternalRow] = { - log.debug(s"Preparing for read Parquet file with message type: $fileSchema") - val parquetRequestedSchema = readContext.getRequestedSchema - - logInfo { - s"""Going to read the following fields from the Parquet file: - | - |Parquet form: - |$parquetRequestedSchema - |Catalyst form: - |$catalystRequestedSchema - """.stripMargin - } - - new CatalystRecordMaterializer( - parquetRequestedSchema, - CatalystReadSupport.expandUDT(catalystRequestedSchema)) - } -} - -private[parquet] object CatalystReadSupport { - val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" - - val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" - - /** - * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist - * in `catalystSchema`, and adding those only exist in `catalystSchema`. - */ - def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { - val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) - if (clippedParquetFields.isEmpty) { - CatalystSchemaConverter.EMPTY_MESSAGE - } else { - Types - .buildMessage() - .addFields(clippedParquetFields: _*) - .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) - } - } - - private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { - catalystType match { - case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => - // Only clips array types with nested type as element type. - clipParquetListType(parquetType.asGroupType(), t.elementType) - - case t: MapType - if !isPrimitiveCatalystType(t.keyType) || - !isPrimitiveCatalystType(t.valueType) => - // Only clips map types with nested key type or value type - clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType) - - case t: StructType => - clipParquetGroup(parquetType.asGroupType(), t) - - case _ => - // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able - // to be mapped to desired user-space types. So UDTs shouldn't participate schema merging. - parquetType - } - } - - /** - * Whether a Catalyst [[DataType]] is primitive. Primitive [[DataType]] is not equivalent to - * [[AtomicType]]. For example, [[CalendarIntervalType]] is primitive, but it's not an - * [[AtomicType]]. - */ - private def isPrimitiveCatalystType(dataType: DataType): Boolean = { - dataType match { - case _: ArrayType | _: MapType | _: StructType => false - case _ => true - } - } - - /** - * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type - * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a - * [[StructType]]. - */ - private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = { - // Precondition of this method, should only be called for lists with nested element types. - assert(!isPrimitiveCatalystType(elementType)) - - // Unannotated repeated group should be interpreted as required list of required element, so - // list element type is just the group itself. Clip it. - if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { - clipParquetType(parquetList, elementType) - } else { - assert( - parquetList.getOriginalType == OriginalType.LIST, - "Invalid Parquet schema. " + - "Original type of annotated Parquet lists must be LIST: " + - parquetList.toString) - - assert( - parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED), - "Invalid Parquet schema. " + - "LIST-annotated group should only have exactly one repeated field: " + - parquetList) - - // Precondition of this method, should only be called for lists with nested element types. - assert(!parquetList.getType(0).isPrimitive) - - val repeatedGroup = parquetList.getType(0).asGroupType() - - // If the repeated field is a group with multiple fields, or the repeated field is a group - // with one field and is named either "array" or uses the LIST-annotated group's name with - // "_tuple" appended then the repeated type is the element type and elements are required. - // Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the - // only field. - if ( - repeatedGroup.getFieldCount > 1 || - repeatedGroup.getName == "array" || - repeatedGroup.getName == parquetList.getName + "_tuple" - ) { - Types - .buildGroup(parquetList.getRepetition) - .as(OriginalType.LIST) - .addField(clipParquetType(repeatedGroup, elementType)) - .named(parquetList.getName) - } else { - // Otherwise, the repeated field's type is the element type with the repeated field's - // repetition. - Types - .buildGroup(parquetList.getRepetition) - .as(OriginalType.LIST) - .addField( - Types - .repeatedGroup() - .addField(clipParquetType(repeatedGroup.getType(0), elementType)) - .named(repeatedGroup.getName)) - .named(parquetList.getName) - } - } - } - - /** - * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. Either key type or - * value type of the [[MapType]] must be a nested type, namely an [[ArrayType]], a [[MapType]], or - * a [[StructType]]. - */ - private def clipParquetMapType( - parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = { - // Precondition of this method, only handles maps with nested key types or value types. - assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType)) - - val repeatedGroup = parquetMap.getType(0).asGroupType() - val parquetKeyType = repeatedGroup.getType(0) - val parquetValueType = repeatedGroup.getType(1) - - val clippedRepeatedGroup = - Types - .repeatedGroup() - .as(repeatedGroup.getOriginalType) - .addField(clipParquetType(parquetKeyType, keyType)) - .addField(clipParquetType(parquetValueType, valueType)) - .named(repeatedGroup.getName) - - Types - .buildGroup(parquetMap.getRepetition) - .as(parquetMap.getOriginalType) - .addField(clippedRepeatedGroup) - .named(parquetMap.getName) - } - - /** - * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]]. - * - * @return A clipped [[GroupType]], which has at least one field. - * @note Parquet doesn't allow creating empty [[GroupType]] instances except for empty - * [[MessageType]]. Because it's legal to construct an empty requested schema for column - * pruning. - */ - private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = { - val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType) - Types - .buildGroup(parquetRecord.getRepetition) - .as(parquetRecord.getOriginalType) - .addFields(clippedParquetFields: _*) - .named(parquetRecord.getName) - } - - /** - * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]]. - * - * @return A list of clipped [[GroupType]] fields, which can be empty. - */ - private def clipParquetGroupFields( - parquetRecord: GroupType, structType: StructType): Seq[Type] = { - val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap - val toParquet = new CatalystSchemaConverter(writeLegacyParquetFormat = false) - structType.map { f => - parquetFieldMap - .get(f.name) - .map(clipParquetType(_, f.dataType)) - .getOrElse(toParquet.convertField(f)) - } - } - - def expandUDT(schema: StructType): StructType = { - def expand(dataType: DataType): DataType = { - dataType match { - case t: ArrayType => - t.copy(elementType = expand(t.elementType)) - - case t: MapType => - t.copy( - keyType = expand(t.keyType), - valueType = expand(t.valueType)) - - case t: StructType => - val expandedFields = t.fields.map(f => f.copy(dataType = expand(f.dataType))) - t.copy(fields = expandedFields) - - case t: UserDefinedType[_] => - t.sqlType - - case t => - t - } - } - - expand(schema).asInstanceOf[StructType] - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala deleted file mode 100644 index eeead9f..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet - -import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} -import org.apache.parquet.schema.MessageType - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.StructType - -/** - * A [[RecordMaterializer]] for Catalyst rows. - * - * @param parquetSchema Parquet schema of the records to be read - * @param catalystSchema Catalyst schema of the rows to be constructed - */ -private[parquet] class CatalystRecordMaterializer( - parquetSchema: MessageType, catalystSchema: StructType) - extends RecordMaterializer[InternalRow] { - - private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater) - - override def getCurrentRecord: InternalRow = rootConverter.currentRecord - - override def getRootConverter: GroupConverter = rootConverter -} http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala deleted file mode 100644 index 85b0bc1..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ /dev/null @@ -1,672 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet - -import java.math.{BigDecimal, BigInteger} -import java.nio.ByteOrder - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import org.apache.parquet.column.Dictionary -import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} -import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type} -import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8} -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64} - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} -import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -/** - * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some - * corresponding parent container. For example, a converter for a `StructType` field may set - * converted values to a [[MutableRow]]; or a converter for array elements may append converted - * values to an [[ArrayBuffer]]. - */ -private[parquet] trait ParentContainerUpdater { - /** Called before a record field is being converted */ - def start(): Unit = () - - /** Called after a record field is being converted */ - def end(): Unit = () - - def set(value: Any): Unit = () - def setBoolean(value: Boolean): Unit = set(value) - def setByte(value: Byte): Unit = set(value) - def setShort(value: Short): Unit = set(value) - def setInt(value: Int): Unit = set(value) - def setLong(value: Long): Unit = set(value) - def setFloat(value: Float): Unit = set(value) - def setDouble(value: Double): Unit = set(value) -} - -/** A no-op updater used for root converter (who doesn't have a parent). */ -private[parquet] object NoopUpdater extends ParentContainerUpdater - -private[parquet] trait HasParentContainerUpdater { - def updater: ParentContainerUpdater -} - -/** - * A convenient converter class for Parquet group types with a [[HasParentContainerUpdater]]. - */ -private[parquet] abstract class CatalystGroupConverter(val updater: ParentContainerUpdater) - extends GroupConverter with HasParentContainerUpdater - -/** - * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types - * are handled by this converter. Parquet primitive types are only a subset of those of Spark - * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet. - */ -private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUpdater) - extends PrimitiveConverter with HasParentContainerUpdater { - - override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) - override def addInt(value: Int): Unit = updater.setInt(value) - override def addLong(value: Long): Unit = updater.setLong(value) - override def addFloat(value: Float): Unit = updater.setFloat(value) - override def addDouble(value: Double): Unit = updater.setDouble(value) - override def addBinary(value: Binary): Unit = updater.set(value.getBytes) -} - -/** - * A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s. - * Since Catalyst `StructType` is also a Parquet record, this converter can be used as root - * converter. Take the following Parquet type as an example: - * {{{ - * message root { - * required int32 f1; - * optional group f2 { - * required double f21; - * optional binary f22 (utf8); - * } - * } - * }}} - * 5 converters will be created: - * - * - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which contains: - * - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and - * - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, which contains: - * - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and - * - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22` - * - * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have - * any "parent" container. - * - * @param parquetType Parquet schema of Parquet records - * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined - * types should have been expanded. - * @param updater An updater which propagates converted field values to the parent container - */ -private[parquet] class CatalystRowConverter( - parquetType: GroupType, - catalystType: StructType, - updater: ParentContainerUpdater) - extends CatalystGroupConverter(updater) with Logging { - - assert( - parquetType.getFieldCount == catalystType.length, - s"""Field counts of the Parquet schema and the Catalyst schema don't match: - | - |Parquet schema: - |$parquetType - |Catalyst schema: - |${catalystType.prettyJson} - """.stripMargin) - - assert( - !catalystType.existsRecursively(_.isInstanceOf[UserDefinedType[_]]), - s"""User-defined types in Catalyst schema should have already been expanded: - |${catalystType.prettyJson} - """.stripMargin) - - logDebug( - s"""Building row converter for the following schema: - | - |Parquet form: - |$parquetType - |Catalyst form: - |${catalystType.prettyJson} - """.stripMargin) - - /** - * Updater used together with field converters within a [[CatalystRowConverter]]. It propagates - * converted filed values to the `ordinal`-th cell in `currentRow`. - */ - private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { - override def set(value: Any): Unit = row(ordinal) = value - override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) - override def setByte(value: Byte): Unit = row.setByte(ordinal, value) - override def setShort(value: Short): Unit = row.setShort(ordinal, value) - override def setInt(value: Int): Unit = row.setInt(ordinal, value) - override def setLong(value: Long): Unit = row.setLong(ordinal, value) - override def setDouble(value: Double): Unit = row.setDouble(ordinal, value) - override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) - } - - private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType)) - - private val unsafeProjection = UnsafeProjection.create(catalystType) - - /** - * The [[UnsafeRow]] converted from an entire Parquet record. - */ - def currentRecord: UnsafeRow = unsafeProjection(currentRow) - - // Converters for each field. - private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { - parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { - case ((parquetFieldType, catalystField), ordinal) => - // Converted field value should be set to the `ordinal`-th cell of `currentRow` - newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) - }.toArray - } - - override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) - - override def end(): Unit = { - var i = 0 - while (i < currentRow.numFields) { - fieldConverters(i).updater.end() - i += 1 - } - updater.set(currentRow) - } - - override def start(): Unit = { - var i = 0 - while (i < currentRow.numFields) { - fieldConverters(i).updater.start() - currentRow.setNullAt(i) - i += 1 - } - } - - /** - * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type - * `catalystType`. Converted values are handled by `updater`. - */ - private def newConverter( - parquetType: Type, - catalystType: DataType, - updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = { - - catalystType match { - case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => - new CatalystPrimitiveConverter(updater) - - case ByteType => - new CatalystPrimitiveConverter(updater) { - override def addInt(value: Int): Unit = - updater.setByte(value.asInstanceOf[ByteType#InternalType]) - } - - case ShortType => - new CatalystPrimitiveConverter(updater) { - override def addInt(value: Int): Unit = - updater.setShort(value.asInstanceOf[ShortType#InternalType]) - } - - // For INT32 backed decimals - case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 => - new CatalystIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater) - - // For INT64 backed decimals - case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 => - new CatalystLongDictionaryAwareDecimalConverter(t.precision, t.scale, updater) - - // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals - case t: DecimalType - if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY || - parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY => - new CatalystBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, updater) - - case t: DecimalType => - throw new RuntimeException( - s"Unable to create Parquet converter for decimal type ${t.json} whose Parquet type is " + - s"$parquetType. Parquet DECIMAL type can only be backed by INT32, INT64, " + - "FIXED_LEN_BYTE_ARRAY, or BINARY.") - - case StringType => - new CatalystStringConverter(updater) - - case TimestampType => - // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. - new CatalystPrimitiveConverter(updater) { - // Converts nanosecond timestamps stored as INT96 - override def addBinary(value: Binary): Unit = { - assert( - value.length() == 12, - "Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " + - s"but got a ${value.length()}-byte binary.") - - val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) - val timeOfDayNanos = buf.getLong - val julianDay = buf.getInt - updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)) - } - } - - case DateType => - new CatalystPrimitiveConverter(updater) { - override def addInt(value: Int): Unit = { - // DateType is not specialized in `SpecificMutableRow`, have to box it here. - updater.set(value.asInstanceOf[DateType#InternalType]) - } - } - - // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor - // annotated by `LIST` or `MAP` should be interpreted as a required list of required - // elements where the element type is the type of the field. - case t: ArrayType if parquetType.getOriginalType != LIST => - if (parquetType.isPrimitive) { - new RepeatedPrimitiveConverter(parquetType, t.elementType, updater) - } else { - new RepeatedGroupConverter(parquetType, t.elementType, updater) - } - - case t: ArrayType => - new CatalystArrayConverter(parquetType.asGroupType(), t, updater) - - case t: MapType => - new CatalystMapConverter(parquetType.asGroupType(), t, updater) - - case t: StructType => - new CatalystRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater { - override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) - }) - - case t => - throw new RuntimeException( - s"Unable to create Parquet converter for data type ${t.json} " + - s"whose Parquet type is $parquetType") - } - } - - /** - * Parquet converter for strings. A dictionary is used to minimize string decoding cost. - */ - private final class CatalystStringConverter(updater: ParentContainerUpdater) - extends CatalystPrimitiveConverter(updater) { - - private var expandedDictionary: Array[UTF8String] = null - - override def hasDictionarySupport: Boolean = true - - override def setDictionary(dictionary: Dictionary): Unit = { - this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i => - UTF8String.fromBytes(dictionary.decodeToBinary(i).getBytes) - } - } - - override def addValueFromDictionary(dictionaryId: Int): Unit = { - updater.set(expandedDictionary(dictionaryId)) - } - - override def addBinary(value: Binary): Unit = { - // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we - // are using `Binary.toByteBuffer.array()` to steal the underlying byte array without copying - // it. - val buffer = value.toByteBuffer - val offset = buffer.arrayOffset() + buffer.position() - val numBytes = buffer.remaining() - updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes)) - } - } - - /** - * Parquet converter for fixed-precision decimals. - */ - private abstract class CatalystDecimalConverter( - precision: Int, scale: Int, updater: ParentContainerUpdater) - extends CatalystPrimitiveConverter(updater) { - - protected var expandedDictionary: Array[Decimal] = _ - - override def hasDictionarySupport: Boolean = true - - override def addValueFromDictionary(dictionaryId: Int): Unit = { - updater.set(expandedDictionary(dictionaryId)) - } - - // Converts decimals stored as INT32 - override def addInt(value: Int): Unit = { - addLong(value: Long) - } - - // Converts decimals stored as INT64 - override def addLong(value: Long): Unit = { - updater.set(decimalFromLong(value)) - } - - // Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY - override def addBinary(value: Binary): Unit = { - updater.set(decimalFromBinary(value)) - } - - protected def decimalFromLong(value: Long): Decimal = { - Decimal(value, precision, scale) - } - - protected def decimalFromBinary(value: Binary): Decimal = { - if (precision <= Decimal.MAX_LONG_DIGITS) { - // Constructs a `Decimal` with an unscaled `Long` value if possible. - val unscaled = CatalystRowConverter.binaryToUnscaledLong(value) - Decimal(unscaled, precision, scale) - } else { - // Otherwise, resorts to an unscaled `BigInteger` instead. - Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale) - } - } - } - - private class CatalystIntDictionaryAwareDecimalConverter( - precision: Int, scale: Int, updater: ParentContainerUpdater) - extends CatalystDecimalConverter(precision, scale, updater) { - - override def setDictionary(dictionary: Dictionary): Unit = { - this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id => - decimalFromLong(dictionary.decodeToInt(id).toLong) - } - } - } - - private class CatalystLongDictionaryAwareDecimalConverter( - precision: Int, scale: Int, updater: ParentContainerUpdater) - extends CatalystDecimalConverter(precision, scale, updater) { - - override def setDictionary(dictionary: Dictionary): Unit = { - this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id => - decimalFromLong(dictionary.decodeToLong(id)) - } - } - } - - private class CatalystBinaryDictionaryAwareDecimalConverter( - precision: Int, scale: Int, updater: ParentContainerUpdater) - extends CatalystDecimalConverter(precision, scale, updater) { - - override def setDictionary(dictionary: Dictionary): Unit = { - this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id => - decimalFromBinary(dictionary.decodeToBinary(id)) - } - } - } - - /** - * Parquet converter for arrays. Spark SQL arrays are represented as Parquet lists. Standard - * Parquet lists are represented as a 3-level group annotated by `LIST`: - * {{{ - * <list-repetition> group <name> (LIST) { <-- parquetSchema points here - * repeated group list { - * <element-repetition> <element-type> element; - * } - * } - * }}} - * The `parquetSchema` constructor argument points to the outermost group. - * - * However, before this representation is standardized, some Parquet libraries/tools also use some - * non-standard formats to represent list-like structures. Backwards-compatibility rules for - * handling these cases are described in Parquet format spec. - * - * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - */ - private final class CatalystArrayConverter( - parquetSchema: GroupType, - catalystSchema: ArrayType, - updater: ParentContainerUpdater) - extends CatalystGroupConverter(updater) { - - private var currentArray: ArrayBuffer[Any] = _ - - private val elementConverter: Converter = { - val repeatedType = parquetSchema.getType(0) - val elementType = catalystSchema.elementType - val parentName = parquetSchema.getName - - if (isElementType(repeatedType, elementType, parentName)) { - newConverter(repeatedType, elementType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentArray += value - }) - } else { - new ElementConverter(repeatedType.asGroupType().getType(0), elementType) - } - } - - override def getConverter(fieldIndex: Int): Converter = elementConverter - - override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray)) - - // NOTE: We can't reuse the mutable `ArrayBuffer` here and must instantiate a new buffer for the - // next value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored - // in row cells. - override def start(): Unit = currentArray = ArrayBuffer.empty[Any] - - // scalastyle:off - /** - * Returns whether the given type is the element type of a list or is a syntactic group with - * one field that is the element type. This is determined by checking whether the type can be - * a syntactic group and by checking whether a potential syntactic group matches the expected - * schema. - * {{{ - * <list-repetition> group <name> (LIST) { - * repeated group list { <-- repeatedType points here - * <element-repetition> <element-type> element; - * } - * } - * }}} - * In short, here we handle Parquet list backwards-compatibility rules on the read path. This - * method is based on `AvroIndexedRecordConverter.isElementType`. - * - * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules - */ - // scalastyle:on - private def isElementType( - parquetRepeatedType: Type, catalystElementType: DataType, parentName: String): Boolean = { - (parquetRepeatedType, catalystElementType) match { - case (t: PrimitiveType, _) => true - case (t: GroupType, _) if t.getFieldCount > 1 => true - case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => true - case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parentName + "_tuple" => true - case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true - case _ => false - } - } - - /** Array element converter */ - private final class ElementConverter(parquetType: Type, catalystType: DataType) - extends GroupConverter { - - private var currentElement: Any = _ - - private val converter = newConverter(parquetType, catalystType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentElement = value - }) - - override def getConverter(fieldIndex: Int): Converter = converter - - override def end(): Unit = currentArray += currentElement - - override def start(): Unit = currentElement = null - } - } - - /** Parquet converter for maps */ - private final class CatalystMapConverter( - parquetType: GroupType, - catalystType: MapType, - updater: ParentContainerUpdater) - extends CatalystGroupConverter(updater) { - - private var currentKeys: ArrayBuffer[Any] = _ - private var currentValues: ArrayBuffer[Any] = _ - - private val keyValueConverter = { - val repeatedType = parquetType.getType(0).asGroupType() - new KeyValueConverter( - repeatedType.getType(0), - repeatedType.getType(1), - catalystType.keyType, - catalystType.valueType) - } - - override def getConverter(fieldIndex: Int): Converter = keyValueConverter - - override def end(): Unit = - updater.set(ArrayBasedMapData(currentKeys.toArray, currentValues.toArray)) - - // NOTE: We can't reuse the mutable Map here and must instantiate a new `Map` for the next - // value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored in row - // cells. - override def start(): Unit = { - currentKeys = ArrayBuffer.empty[Any] - currentValues = ArrayBuffer.empty[Any] - } - - /** Parquet converter for key-value pairs within the map. */ - private final class KeyValueConverter( - parquetKeyType: Type, - parquetValueType: Type, - catalystKeyType: DataType, - catalystValueType: DataType) - extends GroupConverter { - - private var currentKey: Any = _ - - private var currentValue: Any = _ - - private val converters = Array( - // Converter for keys - newConverter(parquetKeyType, catalystKeyType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentKey = value - }), - - // Converter for values - newConverter(parquetValueType, catalystValueType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentValue = value - })) - - override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) - - override def end(): Unit = { - currentKeys += currentKey - currentValues += currentValue - } - - override def start(): Unit = { - currentKey = null - currentValue = null - } - } - } - - private trait RepeatedConverter { - private var currentArray: ArrayBuffer[Any] = _ - - protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater { - override def start(): Unit = currentArray = ArrayBuffer.empty[Any] - override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray)) - override def set(value: Any): Unit = currentArray += value - } - } - - /** - * A primitive converter for converting unannotated repeated primitive values to required arrays - * of required primitives values. - */ - private final class RepeatedPrimitiveConverter( - parquetType: Type, - catalystType: DataType, - parentUpdater: ParentContainerUpdater) - extends PrimitiveConverter with RepeatedConverter with HasParentContainerUpdater { - - val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater) - - private val elementConverter: PrimitiveConverter = - newConverter(parquetType, catalystType, updater).asPrimitiveConverter() - - override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value) - override def addInt(value: Int): Unit = elementConverter.addInt(value) - override def addLong(value: Long): Unit = elementConverter.addLong(value) - override def addFloat(value: Float): Unit = elementConverter.addFloat(value) - override def addDouble(value: Double): Unit = elementConverter.addDouble(value) - override def addBinary(value: Binary): Unit = elementConverter.addBinary(value) - - override def setDictionary(dict: Dictionary): Unit = elementConverter.setDictionary(dict) - override def hasDictionarySupport: Boolean = elementConverter.hasDictionarySupport - override def addValueFromDictionary(id: Int): Unit = elementConverter.addValueFromDictionary(id) - } - - /** - * A group converter for converting unannotated repeated group values to required arrays of - * required struct values. - */ - private final class RepeatedGroupConverter( - parquetType: Type, - catalystType: DataType, - parentUpdater: ParentContainerUpdater) - extends GroupConverter with HasParentContainerUpdater with RepeatedConverter { - - val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater) - - private val elementConverter: GroupConverter = - newConverter(parquetType, catalystType, updater).asGroupConverter() - - override def getConverter(field: Int): Converter = elementConverter.getConverter(field) - override def end(): Unit = elementConverter.end() - override def start(): Unit = elementConverter.start() - } -} - -private[parquet] object CatalystRowConverter { - def binaryToUnscaledLong(binary: Binary): Long = { - // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here - // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without - // copying it. - val buffer = binary.toByteBuffer - val bytes = buffer.array() - val start = buffer.arrayOffset() + buffer.position() - val end = buffer.arrayOffset() + buffer.limit() - - var unscaled = 0L - var i = start - - while (i < end) { - unscaled = (unscaled << 8) | (bytes(i) & 0xff) - i += 1 - } - - val bits = 8 * (end - start) - unscaled = (unscaled << (64 - bits)) >> (64 - bits) - unscaled - } - - def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = { - assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" + - s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.") - val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) - val timeOfDayNanos = buffer.getLong - val julianDay = buffer.getInt - DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala deleted file mode 100644 index 3688c3e..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ /dev/null @@ -1,595 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.schema._ -import org.apache.parquet.schema.OriginalType._ -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ -import org.apache.parquet.schema.Type.Repetition._ - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.maxPrecisionForBytes -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types._ - -/** - * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] and - * vice versa. - * - * Parquet format backwards-compatibility rules are respected when converting Parquet - * [[MessageType]] schemas. - * - * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - * @constructor - * @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL - * [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL - * [[StructType]]. This argument only affects Parquet read path. - * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL - * [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL - * [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which - * has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS` - * described in Parquet format spec. This argument only affects Parquet read path. - * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4 - * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]]. - * When set to false, use standard format defined in parquet-format spec. This argument only - * affects Parquet write path. - */ -private[parquet] class CatalystSchemaConverter( - assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, - assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, - writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) { - - def this(conf: SQLConf) = this( - assumeBinaryIsString = conf.isParquetBinaryAsString, - assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, - writeLegacyParquetFormat = conf.writeLegacyParquetFormat) - - def this(conf: Configuration) = this( - assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, - assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, - writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean) - - /** - * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. - */ - def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType()) - - private def convert(parquetSchema: GroupType): StructType = { - val fields = parquetSchema.getFields.asScala.map { field => - field.getRepetition match { - case OPTIONAL => - StructField(field.getName, convertField(field), nullable = true) - - case REQUIRED => - StructField(field.getName, convertField(field), nullable = false) - - case REPEATED => - // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor - // annotated by `LIST` or `MAP` should be interpreted as a required list of required - // elements where the element type is the type of the field. - val arrayType = ArrayType(convertField(field), containsNull = false) - StructField(field.getName, arrayType, nullable = false) - } - } - - StructType(fields) - } - - /** - * Converts a Parquet [[Type]] to a Spark SQL [[DataType]]. - */ - def convertField(parquetType: Type): DataType = parquetType match { - case t: PrimitiveType => convertPrimitiveField(t) - case t: GroupType => convertGroupField(t.asGroupType()) - } - - private def convertPrimitiveField(field: PrimitiveType): DataType = { - val typeName = field.getPrimitiveTypeName - val originalType = field.getOriginalType - - def typeString = - if (originalType == null) s"$typeName" else s"$typeName ($originalType)" - - def typeNotSupported() = - throw new AnalysisException(s"Parquet type not supported: $typeString") - - def typeNotImplemented() = - throw new AnalysisException(s"Parquet type not yet supported: $typeString") - - def illegalType() = - throw new AnalysisException(s"Illegal Parquet type: $typeString") - - // When maxPrecision = -1, we skip precision range check, and always respect the precision - // specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored - // as binaries with variable lengths. - def makeDecimalType(maxPrecision: Int = -1): DecimalType = { - val precision = field.getDecimalMetadata.getPrecision - val scale = field.getDecimalMetadata.getScale - - CatalystSchemaConverter.checkConversionRequirement( - maxPrecision == -1 || 1 <= precision && precision <= maxPrecision, - s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)") - - DecimalType(precision, scale) - } - - typeName match { - case BOOLEAN => BooleanType - - case FLOAT => FloatType - - case DOUBLE => DoubleType - - case INT32 => - originalType match { - case INT_8 => ByteType - case INT_16 => ShortType - case INT_32 | null => IntegerType - case DATE => DateType - case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS) - case UINT_8 => typeNotSupported() - case UINT_16 => typeNotSupported() - case UINT_32 => typeNotSupported() - case TIME_MILLIS => typeNotImplemented() - case _ => illegalType() - } - - case INT64 => - originalType match { - case INT_64 | null => LongType - case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS) - case UINT_64 => typeNotSupported() - case TIMESTAMP_MILLIS => typeNotImplemented() - case _ => illegalType() - } - - case INT96 => - CatalystSchemaConverter.checkConversionRequirement( - assumeInt96IsTimestamp, - "INT96 is not supported unless it's interpreted as timestamp. " + - s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") - TimestampType - - case BINARY => - originalType match { - case UTF8 | ENUM | JSON => StringType - case null if assumeBinaryIsString => StringType - case null => BinaryType - case BSON => BinaryType - case DECIMAL => makeDecimalType() - case _ => illegalType() - } - - case FIXED_LEN_BYTE_ARRAY => - originalType match { - case DECIMAL => makeDecimalType(maxPrecisionForBytes(field.getTypeLength)) - case INTERVAL => typeNotImplemented() - case _ => illegalType() - } - - case _ => illegalType() - } - } - - private def convertGroupField(field: GroupType): DataType = { - Option(field.getOriginalType).fold(convert(field): DataType) { - // A Parquet list is represented as a 3-level structure: - // - // <list-repetition> group <name> (LIST) { - // repeated group list { - // <element-repetition> <element-type> element; - // } - // } - // - // However, according to the most recent Parquet format spec (not released yet up until - // writing), some 2-level structures are also recognized for backwards-compatibility. Thus, - // we need to check whether the 2nd level or the 3rd level refers to list element type. - // - // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - case LIST => - CatalystSchemaConverter.checkConversionRequirement( - field.getFieldCount == 1, s"Invalid list type $field") - - val repeatedType = field.getType(0) - CatalystSchemaConverter.checkConversionRequirement( - repeatedType.isRepetition(REPEATED), s"Invalid list type $field") - - if (isElementType(repeatedType, field.getName)) { - ArrayType(convertField(repeatedType), containsNull = false) - } else { - val elementType = repeatedType.asGroupType().getType(0) - val optional = elementType.isRepetition(OPTIONAL) - ArrayType(convertField(elementType), containsNull = optional) - } - - // scalastyle:off - // `MAP_KEY_VALUE` is for backwards-compatibility - // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1 - // scalastyle:on - case MAP | MAP_KEY_VALUE => - CatalystSchemaConverter.checkConversionRequirement( - field.getFieldCount == 1 && !field.getType(0).isPrimitive, - s"Invalid map type: $field") - - val keyValueType = field.getType(0).asGroupType() - CatalystSchemaConverter.checkConversionRequirement( - keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2, - s"Invalid map type: $field") - - val keyType = keyValueType.getType(0) - CatalystSchemaConverter.checkConversionRequirement( - keyType.isPrimitive, - s"Map key type is expected to be a primitive type, but found: $keyType") - - val valueType = keyValueType.getType(1) - val valueOptional = valueType.isRepetition(OPTIONAL) - MapType( - convertField(keyType), - convertField(valueType), - valueContainsNull = valueOptional) - - case _ => - throw new AnalysisException(s"Unrecognized Parquet type: $field") - } - } - - // scalastyle:off - // Here we implement Parquet LIST backwards-compatibility rules. - // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules - // scalastyle:on - private def isElementType(repeatedType: Type, parentName: String): Boolean = { - { - // For legacy 2-level list types with primitive element type, e.g.: - // - // // List<Integer> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated int32 element; - // } - // - repeatedType.isPrimitive - } || { - // For legacy 2-level list types whose element type is a group type with 2 or more fields, - // e.g.: - // - // // List<Tuple<String, Integer>> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group element { - // required binary str (UTF8); - // required int32 num; - // }; - // } - // - repeatedType.asGroupType().getFieldCount > 1 - } || { - // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.: - // - // // List<OneTuple<String>> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group array { - // required binary str (UTF8); - // }; - // } - // - repeatedType.getName == "array" - } || { - // For Parquet data generated by parquet-thrift, e.g.: - // - // // List<OneTuple<String>> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group my_list_tuple { - // required binary str (UTF8); - // }; - // } - // - repeatedType.getName == s"${parentName}_tuple" - } - } - - /** - * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]]. - */ - def convert(catalystSchema: StructType): MessageType = { - Types - .buildMessage() - .addFields(catalystSchema.map(convertField): _*) - .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) - } - - /** - * Converts a Spark SQL [[StructField]] to a Parquet [[Type]]. - */ - def convertField(field: StructField): Type = { - convertField(field, if (field.nullable) OPTIONAL else REQUIRED) - } - - private def convertField(field: StructField, repetition: Type.Repetition): Type = { - CatalystSchemaConverter.checkFieldName(field.name) - - field.dataType match { - // =================== - // Simple atomic types - // =================== - - case BooleanType => - Types.primitive(BOOLEAN, repetition).named(field.name) - - case ByteType => - Types.primitive(INT32, repetition).as(INT_8).named(field.name) - - case ShortType => - Types.primitive(INT32, repetition).as(INT_16).named(field.name) - - case IntegerType => - Types.primitive(INT32, repetition).named(field.name) - - case LongType => - Types.primitive(INT64, repetition).named(field.name) - - case FloatType => - Types.primitive(FLOAT, repetition).named(field.name) - - case DoubleType => - Types.primitive(DOUBLE, repetition).named(field.name) - - case StringType => - Types.primitive(BINARY, repetition).as(UTF8).named(field.name) - - case DateType => - Types.primitive(INT32, repetition).as(DATE).named(field.name) - - // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec. - // - // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond - // timestamp in Impala for some historical reasons. It's not recommended to be used for any - // other types and will probably be deprecated in some future version of parquet-format spec. - // That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and - // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`. - // - // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting - // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store - // a timestamp into a `Long`. This design decision is subject to change though, for example, - // we may resort to microsecond precision in the future. - // - // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's - // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using) - // hasn't implemented `TIMESTAMP_MICROS` yet. - // - // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that. - case TimestampType => - Types.primitive(INT96, repetition).named(field.name) - - case BinaryType => - Types.primitive(BINARY, repetition).named(field.name) - - // ====================== - // Decimals (legacy mode) - // ====================== - - // Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and - // always store decimals in fixed-length byte arrays. To keep compatibility with these older - // versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated - // by `DECIMAL`. - case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat => - Types - .primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .length(CatalystSchemaConverter.minBytesForPrecision(precision)) - .named(field.name) - - // ======================== - // Decimals (standard mode) - // ======================== - - // Uses INT32 for 1 <= precision <= 9 - case DecimalType.Fixed(precision, scale) - if precision <= Decimal.MAX_INT_DIGITS && !writeLegacyParquetFormat => - Types - .primitive(INT32, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .named(field.name) - - // Uses INT64 for 1 <= precision <= 18 - case DecimalType.Fixed(precision, scale) - if precision <= Decimal.MAX_LONG_DIGITS && !writeLegacyParquetFormat => - Types - .primitive(INT64, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .named(field.name) - - // Uses FIXED_LEN_BYTE_ARRAY for all other precisions - case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat => - Types - .primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .length(CatalystSchemaConverter.minBytesForPrecision(precision)) - .named(field.name) - - // =================================== - // ArrayType and MapType (legacy mode) - // =================================== - - // Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level - // `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro - // (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element - // field name "array" is borrowed from parquet-avro. - case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat => - // <list-repetition> group <name> (LIST) { - // optional group bag { - // repeated <element-type> array; - // } - // } - ConversionPatterns.listType( - repetition, - field.name, - Types - .buildGroup(REPEATED) - // "array_element" is the name chosen by parquet-hive (1.7.0 and prior version) - .addField(convertField(StructField("array", elementType, nullable))) - .named("bag")) - - // Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level - // LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is - // covered by the backwards-compatibility rules implemented in `isElementType()`. - case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat => - // <list-repetition> group <name> (LIST) { - // repeated <element-type> element; - // } - ConversionPatterns.listType( - repetition, - field.name, - // "array" is the name chosen by parquet-avro (1.7.0 and prior version) - convertField(StructField("array", elementType, nullable), REPEATED)) - - // Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by - // MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`. - case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat => - // <map-repetition> group <name> (MAP) { - // repeated group map (MAP_KEY_VALUE) { - // required <key-type> key; - // <value-repetition> <value-type> value; - // } - // } - ConversionPatterns.mapType( - repetition, - field.name, - convertField(StructField("key", keyType, nullable = false)), - convertField(StructField("value", valueType, valueContainsNull))) - - // ===================================== - // ArrayType and MapType (standard mode) - // ===================================== - - case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat => - // <list-repetition> group <name> (LIST) { - // repeated group list { - // <element-repetition> <element-type> element; - // } - // } - Types - .buildGroup(repetition).as(LIST) - .addField( - Types.repeatedGroup() - .addField(convertField(StructField("element", elementType, containsNull))) - .named("list")) - .named(field.name) - - case MapType(keyType, valueType, valueContainsNull) => - // <map-repetition> group <name> (MAP) { - // repeated group key_value { - // required <key-type> key; - // <value-repetition> <value-type> value; - // } - // } - Types - .buildGroup(repetition).as(MAP) - .addField( - Types - .repeatedGroup() - .addField(convertField(StructField("key", keyType, nullable = false))) - .addField(convertField(StructField("value", valueType, valueContainsNull))) - .named("key_value")) - .named(field.name) - - // =========== - // Other types - // =========== - - case StructType(fields) => - fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) => - builder.addField(convertField(field)) - }.named(field.name) - - case udt: UserDefinedType[_] => - convertField(field.copy(dataType = udt.sqlType)) - - case _ => - throw new AnalysisException(s"Unsupported data type $field.dataType") - } - } -} - -private[parquet] object CatalystSchemaConverter { - val SPARK_PARQUET_SCHEMA_NAME = "spark_schema" - - // !! HACK ALERT !! - // - // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType, - // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`. - // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT. - // - // To workaround this problem, here we first construct a `MessageType` with a single dummy - // field, and then remove the field to obtain an empty `MessageType`. - // - // TODO Reverts this change after upgrading parquet-mr to 1.8.2+ - val EMPTY_MESSAGE = Types - .buildMessage() - .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy") - .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) - EMPTY_MESSAGE.getFields.clear() - - def checkFieldName(name: String): Unit = { - // ,;{}()\n\t= and space are special characters in Parquet schema - checkConversionRequirement( - !name.matches(".*[ ,;{}()\n\t=].*"), - s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=". - |Please use alias to rename it. - """.stripMargin.split("\n").mkString(" ").trim) - } - - def checkFieldNames(schema: StructType): StructType = { - schema.fieldNames.foreach(checkFieldName) - schema - } - - def checkConversionRequirement(f: => Boolean, message: String): Unit = { - if (!f) { - throw new AnalysisException(message) - } - } - - private def computeMinBytesForPrecision(precision : Int) : Int = { - var numBytes = 1 - while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) { - numBytes += 1 - } - numBytes - } - - // Returns the minimum number of bytes needed to store a decimal with a given `precision`. - val minBytesForPrecision = Array.tabulate[Int](39)(computeMinBytesForPrecision) - - // Max precision of a decimal value stored in `numBytes` bytes - def maxPrecisionForBytes(numBytes: Int): Int = { - Math.round( // convert double to long - Math.floor(Math.log10( // number of base-10 digits - Math.pow(2, 8 * numBytes - 1) - 1))) // max value stored in numBytes - .asInstanceOf[Int] - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 67bfd39..cf974af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.minBytesForPrecision +import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -82,8 +82,8 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi } this.rootFieldWriters = schema.map(_.dataType).map(makeWriter) - val messageType = new CatalystSchemaConverter(configuration).convert(schema) - val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schemaString).asJava + val messageType = new ParquetSchemaConverter(configuration).convert(schema) + val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava logInfo( s"""Initialized Parquet WriteSupport with Catalyst schema: @@ -427,7 +427,7 @@ private[parquet] object CatalystWriteSupport { val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" def setSchema(schema: StructType, configuration: Configuration): Unit = { - schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) + schema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName) configuration.set(SPARK_ROW_SCHEMA, schema.json) configuration.setIfUnset( ParquetOutputFormat.WRITER_VERSION, http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 6b25e36..2cce3db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -288,13 +288,13 @@ private[sql] class ParquetFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( - CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - CatalystSchemaConverter.checkFieldNames(requiredSchema).json) + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + ParquetSchemaConverter.checkFieldNames(requiredSchema).json) hadoopConf.set( CatalystWriteSupport.SPARK_ROW_SCHEMA, - CatalystSchemaConverter.checkFieldNames(requiredSchema).json) + ParquetSchemaConverter.checkFieldNames(requiredSchema).json) // We want to clear this temporary metadata from saving into Parquet file. // This metadata is only useful for detecting optional columns when pushdowning filters. @@ -369,10 +369,10 @@ private[sql] class ParquetFileFormat val reader = pushed match { case Some(filter) => new ParquetRecordReader[InternalRow]( - new CatalystReadSupport, + new ParquetReadSupport, FilterCompat.get(filter, null)) case _ => - new ParquetRecordReader[InternalRow](new CatalystReadSupport) + new ParquetRecordReader[InternalRow](new ParquetReadSupport) } reader.initialize(split, hadoopAttemptContext) reader @@ -590,7 +590,7 @@ private[sql] object ParquetFileFormat extends Logging { assumeBinaryIsString: Boolean, assumeInt96IsTimestamp: Boolean)(job: Job): Unit = { val conf = job.getConfiguration - conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) + conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) // Try to push down filters when filter push-down is enabled. if (parquetFilterPushDown) { @@ -603,14 +603,14 @@ private[sql] object ParquetFileFormat extends Logging { .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) } - conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { + conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) - CatalystSchemaConverter.checkFieldNames(requestedSchema).json + ParquetSchemaConverter.checkFieldNames(requestedSchema).json }) conf.set( CatalystWriteSupport.SPARK_ROW_SCHEMA, - CatalystSchemaConverter.checkFieldNames(dataSchema).json) + ParquetSchemaConverter.checkFieldNames(dataSchema).json) // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) @@ -639,7 +639,7 @@ private[sql] object ParquetFileFormat extends Logging { footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { def parseParquetSchema(schema: MessageType): StructType = { - val converter = new CatalystSchemaConverter( + val converter = new ParquetSchemaConverter( sparkSession.sessionState.conf.isParquetBinaryAsString, sparkSession.sessionState.conf.isParquetBinaryAsString, sparkSession.sessionState.conf.writeLegacyParquetFormat) @@ -653,7 +653,7 @@ private[sql] object ParquetFileFormat extends Logging { val serializedSchema = metadata .getKeyValueMetaData .asScala.toMap - .get(CatalystReadSupport.SPARK_METADATA_KEY) + .get(ParquetReadSupport.SPARK_METADATA_KEY) if (serializedSchema.isEmpty) { // Falls back to Parquet schema if no Spark SQL schema found. Some(parseParquetSchema(metadata.getSchema)) @@ -820,7 +820,7 @@ private[sql] object ParquetFileFormat extends Logging { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = - new CatalystSchemaConverter( + new ParquetSchemaConverter( assumeBinaryIsString = assumeBinaryIsString, assumeInt96IsTimestamp = assumeInt96IsTimestamp, writeLegacyParquetFormat = writeLegacyParquetFormat) @@ -864,12 +864,12 @@ private[sql] object ParquetFileFormat extends Logging { * a [[StructType]] converted from the [[MessageType]] stored in this footer. */ def readSchemaFromFooter( - footer: Footer, converter: CatalystSchemaConverter): StructType = { + footer: Footer, converter: ParquetSchemaConverter): StructType = { val fileMetaData = footer.getParquetMetadata.getFileMetaData fileMetaData .getKeyValueMetaData .asScala.toMap - .get(CatalystReadSupport.SPARK_METADATA_KEY) + .get(ParquetReadSupport.SPARK_METADATA_KEY) .flatMap(deserializeSchemaString) .getOrElse(converter.convert(fileMetaData.getSchema)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org