[
https://issues.apache.org/jira/browse/PARQUET-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651840#comment-17651840
]
ASF GitHub Bot commented on PARQUET-2075:
-----------------------------------------
shangxinli commented on code in PR #1014:
URL: https://github.com/apache/parquet-mr/pull/1014#discussion_r1056881216
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static
org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static
org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ParquetRewriter.class);
+ private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+ private final byte[] pageBuffer = new byte[pageBufferSize];
+ private TransParquetFileReader reader;
+ private ParquetFileWriter writer;
+ private ParquetMetadata meta;
+ private MessageType schema;
+ private String createdBy;
+ private CompressionCodecName codecName = null;
+ private List<String> pruneColumns = null;
+ private Map<ColumnPath, MaskMode> maskColumns = null;
+ private Set<ColumnPath> encryptColumns = null;
+ private boolean encryptMode = false;
+
+ public ParquetRewriter(RewriteOptions options) throws IOException {
+ Path inPath = options.getInputFile();
+ Path outPath = options.getOutputFile();
+ Configuration conf = options.getConf();
+
+ // TODO: set more member variables
+ codecName = options.getCodecName();
+ pruneColumns = options.getPruneColumns();
+
+ // Get file metadata and full schema from the input file
+ meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+ schema = meta.getFileMetaData().getSchema();
+ createdBy = meta.getFileMetaData().getCreatedBy();
+
+ // Prune columns if specified
+ if (pruneColumns != null && !pruneColumns.isEmpty()) {
+ List<String> paths = new ArrayList<>();
+ getPaths(schema, paths, null);
+ for (String col : pruneColumns) {
+ if (!paths.contains(col)) {
+ LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, inPath.getName());
+ }
+ }
+
+ Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+ schema = pruneColumnsInSchema(schema, prunePaths);
+ }
+
+ if (options.getMaskColumns() != null) {
+ this.maskColumns = new HashMap<>();
+ for (Map.Entry<String, MaskMode> col :
options.getMaskColumns().entrySet()) {
+ maskColumns.put(ColumnPath.fromDotString(col.getKey()),
col.getValue());
+ }
+ }
+
+ if (options.getEncryptColumns() != null &&
options.getFileEncryptionProperties() != null) {
+ this.encryptColumns = convertToColumnPaths(options.getEncryptColumns());
+ this.encryptMode = true;
+ // TODO: make sure options.getFileEncryptionProperties() is set
+ }
+
+ reader = new TransParquetFileReader(
+ HadoopInputFile.fromPath(inPath, conf),
HadoopReadOptions.builder(conf).build());
+
+ ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
+ writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf),
schema, writerMode,
+ DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT,
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+ DEFAULT_STATISTICS_TRUNCATE_LENGTH,
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
+ options.getFileEncryptionProperties());
+ writer.start();
+ }
+
+ // Ctor for legacy CompressionConverter and ColumnMasker
+ public ParquetRewriter(TransParquetFileReader reader,
+ ParquetFileWriter writer,
+ ParquetMetadata meta,
+ MessageType schema,
+ String createdBy,
+ CompressionCodecName codecName,
+ List<String> maskColumns,
+ MaskMode maskMode) {
+ this.reader = reader;
+ this.writer = writer;
+ this.meta = meta;
+ this.schema = schema;
+ this.createdBy = createdBy == null ? meta.getFileMetaData().getCreatedBy()
: createdBy;
+ this.codecName = codecName;
+ if (maskColumns != null && maskMode != null) {
+ this.maskColumns = new HashMap<>();
+ for (String col : maskColumns) {
+ this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.end(meta.getFileMetaData().getKeyValueMetaData());
+ }
+
+ public void processBlocks() throws IOException {
+ PageReadStore store = reader.readNextRowGroup();
+ ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new
DummyGroupConverter(), schema, createdBy);
+ Map<ColumnPath, ColumnDescriptor> descriptorsMap =
schema.getColumns().stream().collect(
+ Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+ int blockId = 0;
+ while (store != null) {
+ writer.startBlock(store.getRowCount());
+
+ BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+ List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+
+ for (int i = 0, columnId = 0; i < columnsInOrder.size(); i += 1) {
+ ColumnChunkMetaData chunk = columnsInOrder.get(i);
+ ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+
+ // This column has been pruned.
+ if (descriptor == null) {
+ continue;
+ }
+
+ // If a column is encrypted, we simply throw exception.
+ // Later we can add a feature to trans-encrypt it with different keys
+ if (chunk.isEncrypted()) {
+ throw new IOException("Column " + chunk.getPath().toDotString() + "
is already encrypted");
+ }
+
+ reader.setStreamPosition(chunk.getStartingPos());
+ CompressionCodecName newCodecName = codecName == null ?
chunk.getCodec() : codecName;
+ EncryptorRunTime encryptorRunTime = null;
+ if (this.encryptMode) {
+ encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(),
chunk, blockId, columnId);
+ }
+ boolean encryptColumn = encryptColumns != null &&
encryptColumns.contains(chunk.getPath());
Review Comment:
Can we apply this.encryptMode to this check also?
> Unified Rewriter Tool
> -----------------------
>
> Key: PARQUET-2075
> URL: https://issues.apache.org/jira/browse/PARQUET-2075
> Project: Parquet
> Issue Type: New Feature
> Reporter: Xinli Shang
> Assignee: Gang Wu
> Priority: Major
>
> During the discussion of PARQUET-2071, we came up with the idea of a
> universal tool to translate the existing file to a different state while
> skipping some level steps like encoding/decoding, to gain speed. For example,
> only decompress pages and then compress directly. For PARQUET-2071, we only
> decrypt and then encrypt directly. This will be useful for the existing data
> to onboard Parquet features like column encryption, zstd etc.
> We already have tools like trans-compression, column pruning etc. We will
> consolidate all these tools with this universal tool.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)