[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17088105#comment-17088105
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-----------------------------------------

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411711834



##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##########
@@ -0,0 +1,634 @@
+/*

Review comment:
       Same issue here as above, Functional Interface forces us to do so

##########
File path: 
parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
##########
@@ -63,28 +104,70 @@ public MergeCommand() {
   @Override
   public String getCommandDescription() {
     return "Merges multiple Parquet files into one. " +
-      "The command doesn't merge row groups, just places one after the other. 
" +
+      "Without -b option the command doesn't merge row groups, just places one 
after the other. " +
       "When used to merge many small files, the resulting file will still 
contain small row groups, " +
-      "which usually leads to bad query performance.";
+      "which usually leads to bad query performance. " +
+      "To have adjacent blocks(row groups) merged together use -b option. " +
+      "Blocks will be grouped into larger one until the upper bound is 
reached. " +
+      "Default block upper bound 128 MB and default compression SNAPPY can be 
customized using -l and -c options";
   }
 
   @Override
   public void execute(CommandLine options) throws Exception {
+    super.execute(options);
+
+    boolean mergeBlocks = options.hasOption('b');
+
     // Prepare arguments
     List<String> args = options.getArgList();
-    List<Path> inputFiles = getInputFiles(args.subList(0, args.size() - 1));
+    List<Path> files = getInputFiles(args.subList(0, args.size() - 1));
     Path outputFile = new Path(args.get(args.size() - 1));
-
     // Merge schema and extraMeta
-    FileMetaData mergedMeta = mergedMetadata(inputFiles);
-    PrintWriter out = new PrintWriter(Main.out, true);
-
-    // Merge data
+    ParquetMetadata parquetMetadata = mergedMetadata(files);
     ParquetFileWriter writer = new ParquetFileWriter(conf,
-            mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE);
+      parquetMetadata.getFileMetaData().getSchema(), outputFile, 
ParquetFileWriter.Mode.CREATE);
+    PrintWriter stdOut = new PrintWriter(Main.out, true);
+
+    if (mergeBlocks) {
+      long maxRowGroupSize = options.hasOption('l')? 
Long.parseLong(options.getOptionValue('l')) * 1024 * 1024 : DEFAULT_BLOCK_SIZE;
+      CompressionCodecName compression = options.hasOption('c') ?
+        CompressionCodecName.valueOf(options.getOptionValue('c')) : 
CompressionCodecName.SNAPPY;
+
+      stdOut.println("Merging files and row-groups using " + 
compression.name() + " for compression and " + maxRowGroupSize
+        + " bytes as the upper bound for new row groups ..... ");
+      mergeRowGroups(files, parquetMetadata, writer, maxRowGroupSize, 
compression);
+    } else {
+      appendRowGroups(files, parquetMetadata.getFileMetaData(), writer, 
stdOut);
+    }
+  }
+
+  private void mergeRowGroups(List<Path> files, ParquetMetadata 
parquetMetadata, ParquetFileWriter writer,
+                              long maxRowGroupSize, CompressionCodecName 
compression) throws IOException {
+
+    boolean v2EncodingHint = parquetMetadata.getBlocks().stream()
+      .flatMap(b -> b.getColumns().stream())
+      .anyMatch(chunk -> {
+        EncodingStats stats = chunk.getEncodingStats();
+        return stats != null && stats.usesV2Pages();
+      });
+
+    List<InputFile> inputFiles = files.stream().map(f -> {
+      try {
+        return HadoopInputFile.fromPath(f, conf);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);

Review comment:
       Same issue here as above, Functional Interface forces us to do so




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -----------------------------------------
>
>                 Key: PARQUET-1381
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1381
>             Project: Parquet
>          Issue Type: New Feature
>          Components: parquet-mr
>    Affects Versions: 1.10.0
>            Reporter: Ekaterina Galieva
>            Assignee: Ekaterina Galieva
>            Priority: Major
>              Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to