brimzi commented on a change in pull request #775: URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411711834
########## File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java ########## @@ -0,0 +1,634 @@ +/* Review comment: Same issue here as above, Functional Interface forces us to do so ########## File path: parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java ########## @@ -63,28 +104,70 @@ public MergeCommand() { @Override public String getCommandDescription() { return "Merges multiple Parquet files into one. " + - "The command doesn't merge row groups, just places one after the other. " + + "Without -b option the command doesn't merge row groups, just places one after the other. " + "When used to merge many small files, the resulting file will still contain small row groups, " + - "which usually leads to bad query performance."; + "which usually leads to bad query performance. " + + "To have adjacent blocks(row groups) merged together use -b option. " + + "Blocks will be grouped into larger one until the upper bound is reached. " + + "Default block upper bound 128 MB and default compression SNAPPY can be customized using -l and -c options"; } @Override public void execute(CommandLine options) throws Exception { + super.execute(options); + + boolean mergeBlocks = options.hasOption('b'); + // Prepare arguments List<String> args = options.getArgList(); - List<Path> inputFiles = getInputFiles(args.subList(0, args.size() - 1)); + List<Path> files = getInputFiles(args.subList(0, args.size() - 1)); Path outputFile = new Path(args.get(args.size() - 1)); - // Merge schema and extraMeta - FileMetaData mergedMeta = mergedMetadata(inputFiles); - PrintWriter out = new PrintWriter(Main.out, true); - - // Merge data + ParquetMetadata parquetMetadata = mergedMetadata(files); ParquetFileWriter writer = new ParquetFileWriter(conf, - mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE); + parquetMetadata.getFileMetaData().getSchema(), outputFile, ParquetFileWriter.Mode.CREATE); + PrintWriter stdOut = new PrintWriter(Main.out, true); + + if (mergeBlocks) { + long maxRowGroupSize = options.hasOption('l')? Long.parseLong(options.getOptionValue('l')) * 1024 * 1024 : DEFAULT_BLOCK_SIZE; + CompressionCodecName compression = options.hasOption('c') ? + CompressionCodecName.valueOf(options.getOptionValue('c')) : CompressionCodecName.SNAPPY; + + stdOut.println("Merging files and row-groups using " + compression.name() + " for compression and " + maxRowGroupSize + + " bytes as the upper bound for new row groups ..... "); + mergeRowGroups(files, parquetMetadata, writer, maxRowGroupSize, compression); + } else { + appendRowGroups(files, parquetMetadata.getFileMetaData(), writer, stdOut); + } + } + + private void mergeRowGroups(List<Path> files, ParquetMetadata parquetMetadata, ParquetFileWriter writer, + long maxRowGroupSize, CompressionCodecName compression) throws IOException { + + boolean v2EncodingHint = parquetMetadata.getBlocks().stream() + .flatMap(b -> b.getColumns().stream()) + .anyMatch(chunk -> { + EncodingStats stats = chunk.getEncodingStats(); + return stats != null && stats.usesV2Pages(); + }); + + List<InputFile> inputFiles = files.stream().map(f -> { + try { + return HadoopInputFile.fromPath(f, conf); + } catch (IOException e) { + throw new UncheckedIOException(e); Review comment: Same issue here as above, Functional Interface forces us to do so ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org