Xinli Shang created SPARK-25858: ----------------------------------- Summary: Passing Field Metadata to Parquet Key: SPARK-25858 URL: https://issues.apache.org/jira/browse/SPARK-25858 Project: Spark Issue Type: New Feature Components: Input/Output Affects Versions: 2.3.2 Reporter: Xinli Shang
h1. Problem Statement The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption. h1. Technical Solution # Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata # Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk. # Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2. The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport. h1. Technical Details h2. Extend SparkToParquetSchemaConverter class *SparkToParquetMetadataSchemaConverter* extends SparkToParquetSchemaConverter { *override* def convert(catalystSchema: StructType): MessageType = { Types ._buildMessage_() .addFields(catalystSchema.map(*convertFieldWithMetadata*): _*) .named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_) } private def *convertFieldWithMetadata*(field: StructField) : Type = { val extField = new ExtType[Any](convertField(field)) val metaBuilder = new MetadataBuilder().withMetadata(field.metadata) val metaData = metaBuilder.getMap extField.setMetadata(metaData) return extField } } h2. Extend ParquetWriteSupport class CryptoParquetWriteSupport extends ParquetWriteSupport { *override* def init(configuration: Configuration): WriteContext = { val converter = new *SparkToParquetMetadataSchemaConverter*(configuration) createContext(configuration, converter) } } h2. Make WriteSupport configurable class ParquetFileFormat{ ** override def prepareWrite(...) { … *if (conf.get(ParquetOutputFormat.**_WRITE_SUPPORT_CLASS_**) == null) {* ParquetOutputFormat._setWriteSupportClass_(job, _classOf_[ParquetWriteSupport]) ** ... } } h1. Verification The [ParquetHelloWorld.java|https://github.com/shangxinli/parquet-writesupport-extensions/blob/master/src/main/java/com/uber/ParquetHelloWorld.java] in the github repository [parquet-writesupport-extensions|https://github.com/shangxinli/parquet-writesupport-extensions] has a sample verification of passing down the field metadata and perform column encryption. h1. Dependency * Parquet-1178 * Parquet-1396 * Parquet-1397 -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org