[ https://issues.apache.org/jira/browse/PARQUET-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17653139#comment-17653139 ]
ASF GitHub Bot commented on PARQUET-2075: ----------------------------------------- wgtmac commented on code in PR #1014: URL: https://github.com/apache/parquet-mr/pull/1014#discussion_r1059401341 ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java: ########## @@ -0,0 +1,733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.ColumnWriteStore; +import org.apache.parquet.column.ColumnWriter; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.InternalColumnEncryptionSetup; +import org.apache.parquet.crypto.InternalFileEncryptor; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; +import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH; +import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT; + +public class ParquetRewriter implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class); + private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2; + private final byte[] pageBuffer = new byte[pageBufferSize]; + private TransParquetFileReader reader; + private ParquetFileWriter writer; + private ParquetMetadata meta; + private MessageType schema; + private String createdBy; + private CompressionCodecName codecName = null; + private List<String> pruneColumns = null; + private Map<ColumnPath, MaskMode> maskColumns = null; + private Set<ColumnPath> encryptColumns = null; + private boolean encryptMode = false; + + public ParquetRewriter(RewriteOptions options) throws IOException { + Path inPath = options.getInputFile(); + Path outPath = options.getOutputFile(); + Configuration conf = options.getConf(); + + // TODO: set more member variables + codecName = options.getCodecName(); + pruneColumns = options.getPruneColumns(); + + // Get file metadata and full schema from the input file + meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER); + schema = meta.getFileMetaData().getSchema(); + createdBy = meta.getFileMetaData().getCreatedBy(); + + // Prune columns if specified + if (pruneColumns != null && !pruneColumns.isEmpty()) { + List<String> paths = new ArrayList<>(); + getPaths(schema, paths, null); + for (String col : pruneColumns) { + if (!paths.contains(col)) { + LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, inPath.getName()); + } + } + + Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns); + schema = pruneColumnsInSchema(schema, prunePaths); + } + + if (options.getMaskColumns() != null) { + this.maskColumns = new HashMap<>(); + for (Map.Entry<String, MaskMode> col : options.getMaskColumns().entrySet()) { + maskColumns.put(ColumnPath.fromDotString(col.getKey()), col.getValue()); + } + } + + if (options.getEncryptColumns() != null && options.getFileEncryptionProperties() != null) { + this.encryptColumns = convertToColumnPaths(options.getEncryptColumns()); + this.encryptMode = true; + // TODO: make sure options.getFileEncryptionProperties() is set + } + + reader = new TransParquetFileReader( + HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build()); + + ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; + writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, writerMode, + DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, + DEFAULT_STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, + options.getFileEncryptionProperties()); + writer.start(); + } + + // Ctor for legacy CompressionConverter and ColumnMasker + public ParquetRewriter(TransParquetFileReader reader, + ParquetFileWriter writer, + ParquetMetadata meta, + MessageType schema, + String createdBy, + CompressionCodecName codecName, + List<String> maskColumns, + MaskMode maskMode) { + this.reader = reader; + this.writer = writer; + this.meta = meta; + this.schema = schema; + this.createdBy = createdBy == null ? meta.getFileMetaData().getCreatedBy() : createdBy; + this.codecName = codecName; + if (maskColumns != null && maskMode != null) { + this.maskColumns = new HashMap<>(); + for (String col : maskColumns) { + this.maskColumns.put(ColumnPath.fromDotString(col), maskMode); + } + } + } + + @Override + public void close() throws IOException { + writer.end(meta.getFileMetaData().getKeyValueMetaData()); + } + + public void processBlocks() throws IOException { + PageReadStore store = reader.readNextRowGroup(); + ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, createdBy); + Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect( + Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x)); + + int blockId = 0; + while (store != null) { + writer.startBlock(store.getRowCount()); + + BlockMetaData blockMetaData = meta.getBlocks().get(blockId); + List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns(); + + for (int i = 0, columnId = 0; i < columnsInOrder.size(); i += 1) { + ColumnChunkMetaData chunk = columnsInOrder.get(i); + ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath()); + + // This column has been pruned. + if (descriptor == null) { + continue; + } + + // If a column is encrypted, we simply throw exception. + // Later we can add a feature to trans-encrypt it with different keys + if (chunk.isEncrypted()) { + throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted"); + } + + reader.setStreamPosition(chunk.getStartingPos()); + CompressionCodecName newCodecName = codecName == null ? chunk.getCodec() : codecName; + EncryptorRunTime encryptorRunTime = null; + if (this.encryptMode) { + encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId); + } + boolean encryptColumn = encryptColumns != null && encryptColumns.contains(chunk.getPath()); + + if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) { + // Mask column and compress it again. + MaskMode maskMode = maskColumns.get(chunk.getPath()); + if (maskMode.equals(MaskMode.NULLIFY)) { + Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition(); + if (repetition.equals(Type.Repetition.REQUIRED)) { + throw new IOException( + "Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified"); + } + nullifyColumn(descriptor, chunk, crStore, writer, schema, newCodecName, encryptorRunTime, encryptColumn); + } else { + throw new UnsupportedOperationException("Only nullify is supported for now"); + } + } else if (encryptMode || codecName != null) { + // Translate compression and/or encryption + writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName); + processChunk(chunk, newCodecName, encryptorRunTime, encryptColumn); + writer.endColumn(); + } else { + // Nothing changed, simply copy the binary data. + BloomFilter bloomFilter = reader.readBloomFilter(chunk); + ColumnIndex columnIndex = reader.readColumnIndex(chunk); + OffsetIndex offsetIndex = reader.readOffsetIndex(chunk); + writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex); + } + + columnId++; + } + + writer.endBlock(); + store = reader.readNextRowGroup(); + blockId++; + } + } + + private void processChunk(ColumnChunkMetaData chunk, + CompressionCodecName newCodecName, + EncryptorRunTime encryptorRunTime, + boolean encryptColumn) throws IOException { + CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + CompressionCodecFactory.BytesInputDecompressor decompressor = null; + CompressionCodecFactory.BytesInputCompressor compressor = null; + if (!newCodecName.equals(chunk.getCodec())) { + // Re-compress only if a different codec has been specified + decompressor = codecFactory.getDecompressor(chunk.getCodec()); + compressor = codecFactory.getCompressor(newCodecName); + } + + // EncryptorRunTime is only provided when encryption is required + BlockCipher.Encryptor metaEncryptor = null; + BlockCipher.Encryptor dataEncryptor = null; + byte[] dictPageAAD = null; + byte[] dataPageAAD = null; + byte[] dictPageHeaderAAD = null; + byte[] dataPageHeaderAAD = null; + if (encryptorRunTime != null) { + metaEncryptor = encryptorRunTime.getMetaDataEncryptor(); + dataEncryptor = encryptorRunTime.getDataEncryptor(); + dictPageAAD = encryptorRunTime.getDictPageAAD(); + dataPageAAD = encryptorRunTime.getDataPageAAD(); + dictPageHeaderAAD = encryptorRunTime.getDictPageHeaderAAD(); + dataPageHeaderAAD = encryptorRunTime.getDataPageHeaderAAD(); + } + + ColumnIndex columnIndex = reader.readColumnIndex(chunk); + OffsetIndex offsetIndex = reader.readOffsetIndex(chunk); + + reader.setStreamPosition(chunk.getStartingPos()); + DictionaryPage dictionaryPage = null; + long readValues = 0; + Statistics statistics = null; + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + int pageOrdinal = 0; + long totalChunkValues = chunk.getValueCount(); + while (readValues < totalChunkValues) { + PageHeader pageHeader = reader.readPageHeader(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + byte[] pageLoad; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + if (dictionaryPage != null) { + throw new IOException("has more than one dictionary page in column chunk"); + } + //No quickUpdatePageAAD needed for dictionary page + DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header; + pageLoad = processPageLoad(reader, + true, + compressor, + decompressor, + pageHeader.getCompressed_page_size(), + pageHeader.getUncompressed_page_size(), + encryptColumn, + dataEncryptor, + dictPageAAD); + writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad), + pageHeader.getUncompressed_page_size(), + dictPageHeader.getNum_values(), + converter.getEncoding(dictPageHeader.getEncoding())), + metaEncryptor, + dictPageHeaderAAD); + break; + case DATA_PAGE: + if (encryptColumn) { + AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); + AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal); + } + DataPageHeader headerV1 = pageHeader.data_page_header; + pageLoad = processPageLoad(reader, + true, + compressor, + decompressor, + pageHeader.getCompressed_page_size(), + pageHeader.getUncompressed_page_size(), + encryptColumn, + dataEncryptor, + dataPageAAD); + statistics = convertStatistics( + createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter); + readValues += headerV1.getNum_values(); + if (offsetIndex != null) { + long rowCount = 1 + offsetIndex.getLastRowIndex( + pageOrdinal, totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal); + writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), + pageHeader.getUncompressed_page_size(), + BytesInput.from(pageLoad), + statistics, + toIntWithCheck(rowCount), + converter.getEncoding(headerV1.getRepetition_level_encoding()), + converter.getEncoding(headerV1.getDefinition_level_encoding()), + converter.getEncoding(headerV1.getEncoding()), + metaEncryptor, + dataPageHeaderAAD); + } else { + writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), + pageHeader.getUncompressed_page_size(), + BytesInput.from(pageLoad), + statistics, + converter.getEncoding(headerV1.getRepetition_level_encoding()), + converter.getEncoding(headerV1.getDefinition_level_encoding()), + converter.getEncoding(headerV1.getEncoding()), + metaEncryptor, + dataPageHeaderAAD); + } + pageOrdinal++; + break; + case DATA_PAGE_V2: + if (encryptColumn) { + AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); + AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal); + } + DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2; + int rlLength = headerV2.getRepetition_levels_byte_length(); + BytesInput rlLevels = readBlockAllocate(rlLength, reader); + int dlLength = headerV2.getDefinition_levels_byte_length(); + BytesInput dlLevels = readBlockAllocate(dlLength, reader); + int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength; + int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength; + pageLoad = processPageLoad( + reader, + headerV2.is_compressed, + compressor, + decompressor, + payLoadLength, + rawDataLength, + encryptColumn, + dataEncryptor, + dataPageAAD); + statistics = convertStatistics( + createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter); + readValues += headerV2.getNum_values(); + writer.writeDataPageV2(headerV2.getNum_rows(), + headerV2.getNum_nulls(), + headerV2.getNum_values(), + rlLevels, + dlLevels, + converter.getEncoding(headerV2.getEncoding()), + BytesInput.from(pageLoad), + rawDataLength, + statistics); + pageOrdinal++; + break; + default: + LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize); + break; + } + } + } + + private Statistics convertStatistics(String createdBy, + PrimitiveType type, + org.apache.parquet.format.Statistics pageStatistics, + ColumnIndex columnIndex, + int pageIndex, + ParquetMetadataConverter converter) throws IOException { + if (columnIndex != null) { + if (columnIndex.getNullPages() == null) { + throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " + + type.getName()); + } + if (pageIndex > columnIndex.getNullPages().size()) { + throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + + columnIndex.getNullPages().size()); + } + org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = + org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type); + statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex)); + + if (!columnIndex.getNullPages().get(pageIndex)) { + statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone()); + statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone()); + } + return statsBuilder.build(); + } else if (pageStatistics != null) { + return converter.fromParquetStatistics(createdBy, pageStatistics, type); + } else { + return null; + } + } + + private byte[] processPageLoad(TransParquetFileReader reader, + boolean isCompressed, + CompressionCodecFactory.BytesInputCompressor compressor, + CompressionCodecFactory.BytesInputDecompressor decompressor, + int payloadLength, + int rawDataLength, + boolean encrypt, + BlockCipher.Encryptor dataEncryptor, + byte[] AAD) throws IOException { + BytesInput data = readBlock(payloadLength, reader); + + // recompress page load + if (compressor != null) { + if (isCompressed) { + data = decompressor.decompress(data, rawDataLength); + } + data = compressor.compress(data); + } + + if (!encrypt) { + return data.toByteArray(); + } + + // encrypt page load + return dataEncryptor.encrypt(data.toByteArray(), AAD); + } + + public BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException { + byte[] data; + if (length > pageBufferSize) { + data = new byte[length]; + } else { + data = pageBuffer; + } + reader.blockRead(data, 0, length); + return BytesInput.from(data, 0, length); + } + + public BytesInput readBlockAllocate(int length, TransParquetFileReader reader) throws IOException { + byte[] data = new byte[length]; + reader.blockRead(data, 0, length); + return BytesInput.from(data, 0, length); + } + + private int toIntWithCheck(long size) { + if ((int)size != size) { + throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size); + } + return (int)size; + } + + // We have to rewrite getPaths because MessageType only get level 0 paths + private void getPaths(GroupType schema, List<String> paths, String parent) { + List<Type> fields = schema.getFields(); + String prefix = (parent == null) ? "" : parent + "."; + for (Type field : fields) { + paths.add(prefix + field.getName()); + if (field instanceof GroupType) { + getPaths(field.asGroupType(), paths, prefix + field.getName()); + } + } + } + + private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> prunePaths) { + List<Type> fields = schema.getFields(); + List<String> currentPath = new ArrayList<>(); + List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, prunePaths); + MessageType newSchema = new MessageType(schema.getName(), prunedFields); + return newSchema; + } + + private List<Type> pruneColumnsInFields(List<Type> fields, List<String> currentPath, Set<ColumnPath> prunePaths) { + List<Type> prunedFields = new ArrayList<>(); + for (Type childField : fields) { + Type prunedChildField = pruneColumnsInField(childField, currentPath, prunePaths); + if (prunedChildField != null) { + prunedFields.add(prunedChildField); + } + } + return prunedFields; + } + + private Type pruneColumnsInField(Type field, List<String> currentPath, Set<ColumnPath> prunePaths) { + String fieldName = field.getName(); + currentPath.add(fieldName); + ColumnPath path = ColumnPath.get(currentPath.toArray(new String[0])); + Type prunedField = null; + if (!prunePaths.contains(path)) { + if (field.isPrimitive()) { + prunedField = field; + } else { + List<Type> childFields = ((GroupType) field).getFields(); + List<Type> prunedFields = pruneColumnsInFields(childFields, currentPath, prunePaths); + if (prunedFields.size() > 0) { + prunedField = ((GroupType) field).withNewFields(prunedFields); + } + } + } + + currentPath.remove(fieldName); + return prunedField; + } + + private Set<ColumnPath> convertToColumnPaths(List<String> cols) { + Set<ColumnPath> prunePaths = new HashSet<>(); + for (String col : cols) { + prunePaths.add(ColumnPath.fromDotString(col)); + } + return prunePaths; + } + + private void nullifyColumn(ColumnDescriptor descriptor, + ColumnChunkMetaData chunk, + ColumnReadStoreImpl crStore, + ParquetFileWriter writer, + MessageType schema, + CompressionCodecName newCodecName, + EncryptorRunTime encryptorRunTime, + boolean encryptColumn) throws IOException { + // TODO: support encryption + if (encryptorRunTime != null) { + throw new RuntimeException("Nullifying and encrypting column is not implemented yet"); + } + long totalChunkValues = chunk.getValueCount(); + int dMax = descriptor.getMaxDefinitionLevel(); + ColumnReader cReader = crStore.getColumnReader(descriptor); + + ParquetProperties.WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ? + ParquetProperties.WriterVersion.PARQUET_2_0 : ParquetProperties.WriterVersion.PARQUET_1_0; + ParquetProperties props = ParquetProperties.builder() + .withWriterVersion(writerVersion) + .build(); + CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold()); + CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(newCodecName); + + // Create new schema that only has the current column + MessageType newSchema = newSchema(schema, descriptor); + ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore( + compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength()); + ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore); + ColumnWriter cWriter = cStore.getColumnWriter(descriptor); + + for (int i = 0; i < totalChunkValues; i++) { + int rlvl = cReader.getCurrentRepetitionLevel(); + int dlvl = cReader.getCurrentDefinitionLevel(); + if (dlvl == dMax) { + // since we checked ether optional or repeated, dlvl should be > 0 + if (dlvl == 0) { + throw new IOException("definition level is detected to be 0 for column " + + chunk.getPath().toDotString() + " to be nullified"); + } + // we just write one null for the whole list at the top level, + // instead of nullify the elements in the list one by one + if (rlvl == 0) { + cWriter.writeNull(rlvl, dlvl - 1); + } + } else { + cWriter.writeNull(rlvl, dlvl); + } + cStore.endRecord(); + } + + cStore.flush(); + cPageStore.flushToFileWriter(writer); + + cStore.close(); + cWriter.close(); + } + + private MessageType newSchema(MessageType schema, ColumnDescriptor descriptor) { + String[] path = descriptor.getPath(); + Type type = schema.getType(path); + if (path.length == 1) { + return new MessageType(schema.getName(), type); + } + + for (Type field : schema.getFields()) { + if (!field.isPrimitive()) { + Type newType = extractField(field.asGroupType(), type); + if (newType != null) { + return new MessageType(schema.getName(), newType); + } + } + } + + // We should never hit this because 'type' is returned by schema.getType(). + throw new RuntimeException("No field is found"); + } + + private Type extractField(GroupType candidate, Type targetField) { + if (targetField.equals(candidate)) { + return targetField; + } + + // In case 'type' is a descendants of candidate + for (Type field : candidate.asGroupType().getFields()) { + if (field.isPrimitive()) { + if (field.equals(targetField)) { + return new GroupType(candidate.getRepetition(), candidate.getName(), targetField); + } + } else { + Type tempField = extractField(field.asGroupType(), targetField); + if (tempField != null) { + return tempField; + } + } + } + + return null; + } + + private static final class DummyGroupConverter extends GroupConverter { + @Override public void start() {} + @Override public void end() {} + @Override public Converter getConverter(int fieldIndex) { return new DummyConverter(); } + } + + private static final class DummyConverter extends PrimitiveConverter { + @Override public GroupConverter asGroupConverter() { return new DummyGroupConverter(); } + } + + private static class EncryptorRunTime { + private final InternalColumnEncryptionSetup colEncrSetup; + private final BlockCipher.Encryptor dataEncryptor; + private final BlockCipher.Encryptor metaDataEncryptor; + private final byte[] fileAAD ; + + private byte[] dataPageHeaderAAD; + private byte[] dataPageAAD; + private byte[] dictPageHeaderAAD; + private byte[] dictPageAAD; + + public EncryptorRunTime(InternalFileEncryptor fileEncryptor, + ColumnChunkMetaData chunk, + int blockId, + int columnId) throws IOException { + if (fileEncryptor == null) { + this.colEncrSetup = null; + this.dataEncryptor = null; + this.metaDataEncryptor = null; + + this.fileAAD = null; + this.dataPageHeaderAAD = null; + this.dataPageAAD = null; + this.dictPageHeaderAAD = null; + this.dictPageAAD = null; + } else { + this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, columnId); + this.dataEncryptor = colEncrSetup.getDataEncryptor(); + this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor(); + + this.fileAAD = fileEncryptor.getFileAAD(); + this.dataPageHeaderAAD = createAAD(colEncrSetup, ModuleType.DataPageHeader, blockId, columnId); + this.dataPageAAD = createAAD(colEncrSetup, ModuleType.DataPage, blockId, columnId); + this.dictPageHeaderAAD = createAAD(colEncrSetup, ModuleType.DictionaryPageHeader, blockId, columnId); + this.dictPageAAD = createAAD(colEncrSetup, ModuleType.DictionaryPage, blockId, columnId); + } + } + + private byte[] createAAD(InternalColumnEncryptionSetup colEncrSetup, + ModuleType moduleType, + int blockId, + int columnId) { + if (colEncrSetup != null && colEncrSetup.isEncrypted()) { Review Comment: If `EncryptorRunTime` was not created for non-encrypted columns, generating the file footer will get some error. It can be reproduced by the ColumnEncryptorTest. > Unified Rewriter Tool > ----------------------- > > Key: PARQUET-2075 > URL: https://issues.apache.org/jira/browse/PARQUET-2075 > Project: Parquet > Issue Type: New Feature > Reporter: Xinli Shang > Assignee: Gang Wu > Priority: Major > > During the discussion of PARQUET-2071, we came up with the idea of a > universal tool to translate the existing file to a different state while > skipping some level steps like encoding/decoding, to gain speed. For example, > only decompress pages and then compress directly. For PARQUET-2071, we only > decrypt and then encrypt directly. This will be useful for the existing data > to onboard Parquet features like column encryption, zstd etc. > We already have tools like trans-compression, column pruning etc. We will > consolidate all these tools with this universal tool. -- This message was sent by Atlassian Jira (v8.20.10#820010)