brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411711834



##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##########
@@ -0,0 +1,634 @@
+/*

Review comment:
       Same issue here as above, Functional Interface forces us to do so

##########
File path: 
parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
##########
@@ -63,28 +104,70 @@ public MergeCommand() {
   @Override
   public String getCommandDescription() {
     return "Merges multiple Parquet files into one. " +
-      "The command doesn't merge row groups, just places one after the other. 
" +
+      "Without -b option the command doesn't merge row groups, just places one 
after the other. " +
       "When used to merge many small files, the resulting file will still 
contain small row groups, " +
-      "which usually leads to bad query performance.";
+      "which usually leads to bad query performance. " +
+      "To have adjacent blocks(row groups) merged together use -b option. " +
+      "Blocks will be grouped into larger one until the upper bound is 
reached. " +
+      "Default block upper bound 128 MB and default compression SNAPPY can be 
customized using -l and -c options";
   }
 
   @Override
   public void execute(CommandLine options) throws Exception {
+    super.execute(options);
+
+    boolean mergeBlocks = options.hasOption('b');
+
     // Prepare arguments
     List<String> args = options.getArgList();
-    List<Path> inputFiles = getInputFiles(args.subList(0, args.size() - 1));
+    List<Path> files = getInputFiles(args.subList(0, args.size() - 1));
     Path outputFile = new Path(args.get(args.size() - 1));
-
     // Merge schema and extraMeta
-    FileMetaData mergedMeta = mergedMetadata(inputFiles);
-    PrintWriter out = new PrintWriter(Main.out, true);
-
-    // Merge data
+    ParquetMetadata parquetMetadata = mergedMetadata(files);
     ParquetFileWriter writer = new ParquetFileWriter(conf,
-            mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE);
+      parquetMetadata.getFileMetaData().getSchema(), outputFile, 
ParquetFileWriter.Mode.CREATE);
+    PrintWriter stdOut = new PrintWriter(Main.out, true);
+
+    if (mergeBlocks) {
+      long maxRowGroupSize = options.hasOption('l')? 
Long.parseLong(options.getOptionValue('l')) * 1024 * 1024 : DEFAULT_BLOCK_SIZE;
+      CompressionCodecName compression = options.hasOption('c') ?
+        CompressionCodecName.valueOf(options.getOptionValue('c')) : 
CompressionCodecName.SNAPPY;
+
+      stdOut.println("Merging files and row-groups using " + 
compression.name() + " for compression and " + maxRowGroupSize
+        + " bytes as the upper bound for new row groups ..... ");
+      mergeRowGroups(files, parquetMetadata, writer, maxRowGroupSize, 
compression);
+    } else {
+      appendRowGroups(files, parquetMetadata.getFileMetaData(), writer, 
stdOut);
+    }
+  }
+
+  private void mergeRowGroups(List<Path> files, ParquetMetadata 
parquetMetadata, ParquetFileWriter writer,
+                              long maxRowGroupSize, CompressionCodecName 
compression) throws IOException {
+
+    boolean v2EncodingHint = parquetMetadata.getBlocks().stream()
+      .flatMap(b -> b.getColumns().stream())
+      .anyMatch(chunk -> {
+        EncodingStats stats = chunk.getEncodingStats();
+        return stats != null && stats.usesV2Pages();
+      });
+
+    List<InputFile> inputFiles = files.stream().map(f -> {
+      try {
+        return HadoopInputFile.fromPath(f, conf);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);

Review comment:
       Same issue here as above, Functional Interface forces us to do so




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to