[ https://issues.apache.org/jira/browse/SPARK-25858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xinli Shang updated SPARK-25858: -------------------------------- Description: h1. Problem Statement The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption. h1. Technical Solution # Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata # Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk. # Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2. The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport. h1. Technical Details {{Note: The code below kind of in messy format. The link below shows correct format. }} h2. Extend SparkToParquetSchemaConverter class *SparkToParquetMetadataSchemaConverter* extends SparkToParquetSchemaConverter { *override* def convert(catalystSchema: StructType): MessageType = { Types ._buildMessage_() .addFields(catalystSchema.map(*convertFieldWithMetadata*): _*) .named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_) } private def *convertFieldWithMetadata*(field: StructField) : Type = { val extField = new ExtType[Any](convertField(field)) val metaBuilder = new MetadataBuilder().withMetadata(field.metadata) val metaData = metaBuilder.getMap extField.setMetadata(metaData) return extField } } h2. Extend ParquetWriteSupport class CryptoParquetWriteSupport extends ParquetWriteSupport { *override* def init(configuration: Configuration): WriteContext = { val converter = new *SparkToParquetMetadataSchemaConverter*(configuration) createContext(configuration, converter) } } h2. Make WriteSupport configurable class ParquetFileFormat{ ** override def prepareWrite(...) { … *if (conf.get(ParquetOutputFormat.**_WRITE_SUPPORT_CLASS_**) == null) {* ParquetOutputFormat._setWriteSupportClass_(job, _classOf_[ParquetWriteSupport]) ** ... } } h1. Verification The [ParquetHelloWorld.java|https://github.com/shangxinli/parquet-writesupport-extensions/blob/master/src/main/java/com/uber/ParquetHelloWorld.java] in the github repository [parquet-writesupport-extensions|https://github.com/shangxinli/parquet-writesupport-extensions] has a sample verification of passing down the field metadata and perform column encryption. h1. Dependency * Parquet-1178 * Parquet-1396 * Parquet-1397 was: h1. Problem Statement The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption. h1. Technical Solution # Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata # Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk. # Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2. The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport. h1. Technical Details h2. Extend SparkToParquetSchemaConverter class *SparkToParquetMetadataSchemaConverter* extends SparkToParquetSchemaConverter { *override* def convert(catalystSchema: StructType): MessageType = { Types ._buildMessage_() .addFields(catalystSchema.map(*convertFieldWithMetadata*): _*) .named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_) } private def *convertFieldWithMetadata*(field: StructField) : Type = { val extField = new ExtType[Any](convertField(field)) val metaBuilder = new MetadataBuilder().withMetadata(field.metadata) val metaData = metaBuilder.getMap extField.setMetadata(metaData) return extField } } h2. Extend ParquetWriteSupport class CryptoParquetWriteSupport extends ParquetWriteSupport { *override* def init(configuration: Configuration): WriteContext = { val converter = new *SparkToParquetMetadataSchemaConverter*(configuration) createContext(configuration, converter) } } h2. Make WriteSupport configurable class ParquetFileFormat{ ** override def prepareWrite(...) { … *if (conf.get(ParquetOutputFormat.**_WRITE_SUPPORT_CLASS_**) == null) {* ParquetOutputFormat._setWriteSupportClass_(job, _classOf_[ParquetWriteSupport]) ** ... } } h1. Verification The [ParquetHelloWorld.java|https://github.com/shangxinli/parquet-writesupport-extensions/blob/master/src/main/java/com/uber/ParquetHelloWorld.java] in the github repository [parquet-writesupport-extensions|https://github.com/shangxinli/parquet-writesupport-extensions] has a sample verification of passing down the field metadata and perform column encryption. h1. Dependency * Parquet-1178 * Parquet-1396 * Parquet-1397 > Passing Field Metadata to Parquet > --------------------------------- > > Key: SPARK-25858 > URL: https://issues.apache.org/jira/browse/SPARK-25858 > Project: Spark > Issue Type: New Feature > Components: Input/Output > Affects Versions: 2.3.2 > Reporter: Xinli Shang > Priority: Major > > h1. Problem Statement > The Spark WriteSupport class for Parquet is hardcoded to use > org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which > is not configurable. Currently, this class doesn’t carry over the field > metadata in StructType to MessageType. However, Parquet column encryption > (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType > of Parquet, so that the metadata can be used to control column encryption. > h1. Technical Solution > # Extend SparkToParquetSchemaConverter class and override convert() method > to add the functionality of carrying over the field metadata > # Extend ParquetWriteSupport and use the extended converter in #1. The > extension avoids changing the built-in WriteSupport to mitigate the risk. > # Change Spark code to make the WriteSupport class configurable to let the > user configure to use the extended WriteSupport in #2. The default > WriteSupport is still > org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport. > h1. Technical Details > {{Note: The code below kind of in messy format. The link below shows correct > format. }} > h2. Extend SparkToParquetSchemaConverter class > *SparkToParquetMetadataSchemaConverter* extends > SparkToParquetSchemaConverter { > > *override* def convert(catalystSchema: StructType): MessageType = > { Types ._buildMessage_() > .addFields(catalystSchema.map(*convertFieldWithMetadata*): _*) > .named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_) > } > > private def *convertFieldWithMetadata*(field: StructField) : Type = > { val extField = new ExtType[Any](convertField(field)) > val metaBuilder = new MetadataBuilder().withMetadata(field.metadata) > val metaData = metaBuilder.getMap > extField.setMetadata(metaData) return extField } > } > h2. Extend ParquetWriteSupport > class CryptoParquetWriteSupport extends ParquetWriteSupport { > *override* def init(configuration: Configuration): WriteContext = > { val converter = new > *SparkToParquetMetadataSchemaConverter*(configuration) > createContext(configuration, converter) } > } > h2. Make WriteSupport configurable > class ParquetFileFormat{ > > ** override def prepareWrite(...) { > … > *if (conf.get(ParquetOutputFormat.**_WRITE_SUPPORT_CLASS_**) == null) > {* > ParquetOutputFormat._setWriteSupportClass_(job, > _classOf_[ParquetWriteSupport]) > ** > ... > } > } > h1. Verification > The > [ParquetHelloWorld.java|https://github.com/shangxinli/parquet-writesupport-extensions/blob/master/src/main/java/com/uber/ParquetHelloWorld.java] > in the github repository > [parquet-writesupport-extensions|https://github.com/shangxinli/parquet-writesupport-extensions] > has a sample verification of passing down the field metadata and perform > column encryption. > h1. Dependency > * Parquet-1178 > * Parquet-1396 > * Parquet-1397 -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org