MaxNevermind commented on code in PR #1335:
URL: https://github.com/apache/parquet-java/pull/1335#discussion_r1714632636
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -106,48 +109,63 @@ public class ParquetRewriter implements Closeable {
private int numBlocksRewritten = 0;
// Reader and relevant states of the in-processing input file
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
- // Schema of input files (should be the same) and to write to the output file
- private MessageType schema = null;
- private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
- // The reader for the current input file
- private TransParquetFileReader reader = null;
- // The metadata of current reader being processed
- private ParquetMetadata meta = null;
- // created_by information of current reader being processed
- private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
+ private final Queue<TransParquetFileReader> inputFilesToJoin = new
LinkedList<>();
+ private MessageType outSchema;
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
+ private final boolean overwriteInputWithJoinColumns;
public ParquetRewriter(RewriteOptions options) throws IOException {
+ this.newCodecName = options.getNewCodecName();
+ this.indexCacheStrategy = options.getIndexCacheStrategy();
+ this.overwriteInputWithJoinColumns =
options.getOverwriteInputWithJoinColumns();
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
- LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(),
options.getParquetInputFiles(), out);
-
- // Init reader of the first input file
- initNextReader();
-
- newCodecName = options.getNewCodecName();
- List<String> pruneColumns = options.getPruneColumns();
- // Prune columns if specified
- if (pruneColumns != null && !pruneColumns.isEmpty()) {
- List<String> paths = new ArrayList<>();
- getPaths(schema, paths, null);
- for (String col : pruneColumns) {
- if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
- }
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(),
conf));
+ ensureSameSchema(inputFiles);
+ ensureSameSchema(inputFilesToJoin);
Review Comment:
Sorry, I didn't get your example. What is the schema for file_2 and file_3?
Are file_2 and file_3 both in `inputFilesToJoin`?
Current logic is:
- `inputFiles` is required (left side), a schema for these file must be the
same for all them as it was before this PR
- `inputFilesToJoin` is optional (right side), a schema for these file must
be the same for all them
- a final merged schema is produced in a constructor
- in case of an overlapping column that present on the left and on the right
its values are extracted from the left or from the right depending on
`overwriteInputWithJoinColumns` parameter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]