This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ef93515f88caf232f00fd6a4b37a6751592bea8a
Author: Oleg Zinoviev <ozinov...@solit-clouds.ru>
AuthorDate: Sun Jun 16 21:21:46 2019 +0300

    DRILL-7156: Support empty Parquet files creation
    
    closes #1836
---
 .../exec/store/parquet/ParquetRecordWriter.java    |  95 +++++++++-------
 .../impl/writer/TestParquetWriterEmptyFiles.java   | 123 +++++++++++++++++----
 2 files changed, 159 insertions(+), 59 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 5a64f40..a9f7f14 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -122,6 +122,9 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
   private PrimitiveTypeName logicalTypeForDecimals;
   private boolean usePrimitiveTypesForDecimals;
 
+  /** Is used to ensure that empty Parquet file will be written if no rows 
were provided. */
+  private boolean empty = true;
+
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) 
throws OutOfMemoryException {
     this.oContext = context.newOperatorContext(writer);
     this.codecFactory = 
CodecFactory.createDirectCodecFactory(writer.getFormatPlugin().getFsConf(),
@@ -205,7 +208,7 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
   public void updateSchema(VectorAccessible batch) throws IOException {
     if (this.batchSchema == null || 
!this.batchSchema.equals(batch.getSchema()) || 
containsComplexVectors(this.batchSchema)) {
       if (this.batchSchema != null) {
-        flush();
+        flush(false);
       }
       this.batchSchema = batch.getSchema();
       newSchema();
@@ -310,7 +313,7 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
     try {
       boolean newPartition = newPartition(index);
       if (newPartition) {
-        flush();
+        flush(false);
         newSchema();
       }
     } catch (Exception e) {
@@ -318,19 +321,18 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
     }
   }
 
-  private void flush() throws IOException {
+  private void flush(boolean cleanUp) throws IOException {
     try {
       if (recordCount > 0) {
-        parquetFileWriter.startBlock(recordCount);
-        consumer.flush();
-        store.flush();
-        pageStore.flushToFileWriter(parquetFileWriter);
-        recordCount = 0;
-        parquetFileWriter.endBlock();
-
-        // we are writing one single block per file
-        parquetFileWriter.end(extraMetaData);
-        parquetFileWriter = null;
+        flushParquetFileWriter();
+      } else if (cleanUp && empty && schema != null && schema.getFieldCount() 
> 0) {
+        // Write empty parquet if:
+        // 1) This is a cleanup - no any additional records can be written
+        // 2) No file was written until this moment
+        // 3) Schema is set
+        // 4) Schema is not empty
+        createParquetFileWriter();
+        flushParquetFileWriter();
       }
     } finally {
       store.close();
@@ -347,7 +349,7 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
       long memSize = store.getBufferedSize();
       if (memSize > blockSize) {
         logger.debug("Reached block size " + blockSize);
-        flush();
+        flush(false);
         newSchema();
         recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, 
recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
       } else {
@@ -435,29 +437,10 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
 
     // we wait until there is at least one record before creating the parquet 
file
     if (parquetFileWriter == null) {
-      Path path = new Path(location, prefix + "_" + index + ".parquet");
-      // to ensure that our writer was the first to create output file, we 
create empty file first and fail if file exists
-      Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path);
-
-      // since parquet reader supports partitions, it means that several 
output files may be created
-      // if this writer was the one to create table folder, we store only 
folder and delete it with its content in case of abort
-      // if table location was created before, we store only files created by 
this writer and delete them in case of abort
-      addCleanUpLocation(fs, firstCreatedPath);
-
-      // since ParquetFileWriter will overwrite empty output file (append is 
not supported)
-      // we need to re-apply file permission
-      if (useSingleFSBlock) {
-        // Passing blockSize creates files with this blockSize instead of 
filesystem default blockSize.
-        // Currently, this is supported only by filesystems included in
-        // BLOCK_FS_SCHEMES (ParquetFileWriter.java in parquet-mr), which 
includes HDFS.
-        // For other filesystems, it uses default blockSize configured for the 
file system.
-        parquetFileWriter = new ParquetFileWriter(conf, schema, path, 
ParquetFileWriter.Mode.OVERWRITE, blockSize, 0);
-      } else {
-        parquetFileWriter = new ParquetFileWriter(conf, schema, path, 
ParquetFileWriter.Mode.OVERWRITE);
-      }
-      storageStrategy.applyToFile(fs, path);
-      parquetFileWriter.start();
+      createParquetFileWriter();
     }
+
+    empty = false;
     recordCount++;
     checkBlockSizeReached();
   }
@@ -486,11 +469,49 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
 
   @Override
   public void cleanup() throws IOException {
-    flush();
+    flush(true);
 
     codecFactory.release();
   }
 
+  private void createParquetFileWriter() throws IOException {
+    Path path = new Path(location, prefix + "_" + index + ".parquet");
+    // to ensure that our writer was the first to create output file, we 
create empty file first and fail if file exists
+    Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path);
+
+    // since parquet reader supports partitions, it means that several output 
files may be created
+    // if this writer was the one to create table folder, we store only folder 
and delete it with its content in case of abort
+    // if table location was created before, we store only files created by 
this writer and delete them in case of abort
+    addCleanUpLocation(fs, firstCreatedPath);
+
+    // since ParquetFileWriter will overwrite empty output file (append is not 
supported)
+    // we need to re-apply file permission
+    if (useSingleFSBlock) {
+      // Passing blockSize creates files with this blockSize instead of 
filesystem default blockSize.
+      // Currently, this is supported only by filesystems included in
+      // BLOCK_FS_SCHEMES (ParquetFileWriter.java in parquet-mr), which 
includes HDFS.
+      // For other filesystems, it uses default blockSize configured for the 
file system.
+      parquetFileWriter = new ParquetFileWriter(conf, schema, path, 
ParquetFileWriter.Mode.OVERWRITE, blockSize, 0);
+    } else {
+      parquetFileWriter = new ParquetFileWriter(conf, schema, path, 
ParquetFileWriter.Mode.OVERWRITE);
+    }
+    storageStrategy.applyToFile(fs, path);
+    parquetFileWriter.start();
+  }
+
+  private void flushParquetFileWriter() throws IOException {
+    parquetFileWriter.startBlock(recordCount);
+    consumer.flush();
+    store.flush();
+    pageStore.flushToFileWriter(parquetFileWriter);
+    recordCount = 0;
+    parquetFileWriter.endBlock();
+
+    // we are writing one single block per file
+    parquetFileWriter.end(extraMetaData);
+    parquetFileWriter = null;
+  }
+
   /**
    * Adds passed location to the list of locations to be cleaned up in case of 
abort.
    * Add locations if:
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
index bc72234..d2ae653 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
@@ -18,16 +18,24 @@
 package org.apache.drill.exec.physical.impl.writer;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchemaBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.exec.ExecConstants;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.io.File;
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @Category({ParquetTest.class, UnlikelyTest.class})
 public class TestParquetWriterEmptyFiles extends BaseTestQuery {
@@ -35,42 +43,110 @@ public class TestParquetWriterEmptyFiles extends 
BaseTestQuery {
   @BeforeClass
   public static void initFs() throws Exception {
     updateTestCluster(3, null);
+    dirTestWatcher.copyResourceToRoot(Paths.get("schemachange"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "empty"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("parquet", 
"alltypes_required.parquet"));
   }
 
-  @Test // see DRILL-2408
+  @Test
   public void testWriteEmptyFile() throws Exception {
     final String outputFileName = 
"testparquetwriteremptyfiles_testwriteemptyfile";
     final File outputFile = 
FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName);
 
     test("CREATE TABLE dfs.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 
1=0", outputFileName);
-    Assert.assertFalse(outputFile.exists());
+    assertTrue(outputFile.exists());
   }
 
   @Test
-  public void testMultipleWriters() throws Exception {
-    final String outputFile = 
"testparquetwriteremptyfiles_testmultiplewriters";
+  public void testWriteEmptyFileWithSchema() throws Exception {
+    final String outputFileName = 
"testparquetwriteremptyfiles_testwriteemptyfilewithschema";
 
-    runSQL("alter session set `planner.slice_target` = 1");
+    test("CREATE TABLE dfs.tmp.%s AS select * from 
dfs.`parquet/alltypes_required.parquet` where `col_int` = 0", outputFileName);
 
-    try {
-      final String query = "SELECT position_id FROM cp.`employee.json` WHERE 
position_id IN (15, 16) GROUP BY position_id";
+    // Only the last scan scheme is written
+    SchemaBuilder schemaBuilder = new SchemaBuilder()
+      .add("col_int", TypeProtos.MinorType.INT)
+      .add("col_chr", TypeProtos.MinorType.VARCHAR)
+      .add("col_vrchr", TypeProtos.MinorType.VARCHAR)
+      .add("col_dt", TypeProtos.MinorType.DATE)
+      .add("col_tim", TypeProtos.MinorType.TIME)
+      .add("col_tmstmp", TypeProtos.MinorType.TIMESTAMP)
+      .add("col_flt", TypeProtos.MinorType.FLOAT4)
+      .add("col_intrvl_yr", TypeProtos.MinorType.INTERVAL)
+      .add("col_intrvl_day", TypeProtos.MinorType.INTERVAL)
+      .add("col_bln", TypeProtos.MinorType.BIT);
+    BatchSchema expectedSchema = new BatchSchemaBuilder()
+      .withSchemaBuilder(schemaBuilder)
+      .build();
 
-      test("CREATE TABLE dfs.tmp.%s AS %s", outputFile, query);
+    testBuilder()
+      .unOrdered()
+      .sqlQuery("select * from dfs.tmp.%s", outputFileName)
+      .schemaBaseLine(expectedSchema)
+      .go();
+  }
 
-      // this query will fail if an "empty" file was created
-      testBuilder()
-        .unOrdered()
-        .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFile)
-        .sqlBaselineQuery(query)
-        .go();
-    } finally {
-      runSQL("alter session set `planner.slice_target` = " + 
ExecConstants.SLICE_TARGET_DEFAULT);
-    }
+  @Test
+  public void testWriteEmptyFileWithEmptySchema() throws Exception {
+    final String outputFileName = 
"testparquetwriteremptyfiles_testwriteemptyfileemptyschema";
+    final File outputFile = 
FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName);
+
+    test("CREATE TABLE dfs.tmp.%s AS SELECT * FROM cp.`empty.json`", 
outputFileName);
+    assertFalse(outputFile.exists());
   }
 
-  @Test // see DRILL-2408
+  @Test
+  public void testWriteEmptySchemaChange() throws Exception {
+    final String outputFileName = 
"testparquetwriteremptyfiles_testwriteemptyschemachange";
+    final File outputFile = 
FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName);
+
+    test("CREATE TABLE dfs.tmp.%s AS select id, a, b from 
dfs.`schemachange/multi/*.json` WHERE id = 0", outputFileName);
+
+    // Only the last scan scheme is written
+    SchemaBuilder schemaBuilder = new SchemaBuilder()
+      .addNullable("id", TypeProtos.MinorType.BIGINT)
+      .addNullable("a", TypeProtos.MinorType.BIGINT)
+      .addNullable("b", TypeProtos.MinorType.BIT);
+    BatchSchema expectedSchema = new BatchSchemaBuilder()
+      .withSchemaBuilder(schemaBuilder)
+      .build();
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery("select * from dfs.tmp.%s", outputFileName)
+      .schemaBaseLine(expectedSchema)
+      .go();
+
+    // Make sure that only 1 parquet file was created
+    assertEquals(1, outputFile.list((dir, name) -> 
name.endsWith("parquet")).length);
+  }
+
+  @Test
+  public void testComplexEmptyFileSchema() throws Exception {
+    final String outputFileName = 
"testparquetwriteremptyfiles_testcomplexemptyfileschema";
+
+    test("create table dfs.tmp.%s as select * from 
dfs.`parquet/empty/complex/empty_complex.parquet`", outputFileName);
+
+    // end_date column is null, so it missing in result schema.
+    SchemaBuilder schemaBuilder = new SchemaBuilder()
+      .addNullable("id", TypeProtos.MinorType.BIGINT)
+      .addNullable("name", TypeProtos.MinorType.VARCHAR)
+      .addArray("orders", TypeProtos.MinorType.BIGINT);
+    BatchSchema expectedSchema = new BatchSchemaBuilder()
+      .withSchemaBuilder(schemaBuilder)
+      .build();
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery("select * from dfs.tmp.%s", outputFileName)
+      .schemaBaseLine(expectedSchema)
+      .go();
+  }
+
+  @Test
   public void testWriteEmptyFileAfterFlush() throws Exception {
-    final String outputFile = 
"testparquetwriteremptyfiles_test_write_empty_file_after_flush";
+    final String outputFileName = 
"testparquetwriteremptyfiles_test_write_empty_file_after_flush";
+    final File outputFile = 
FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName);
 
     try {
       // this specific value will force a flush just after the final row is 
written
@@ -78,12 +154,15 @@ public class TestParquetWriterEmptyFiles extends 
BaseTestQuery {
       test("ALTER SESSION SET `store.parquet.block-size` = 19926");
 
       final String query = "SELECT * FROM cp.`employee.json` LIMIT 100";
-      test("CREATE TABLE dfs.tmp.%s AS %s", outputFile, query);
+      test("CREATE TABLE dfs.tmp.%s AS %s", outputFileName, query);
+
+      // Make sure that only 1 parquet file was created
+      assertEquals(1, outputFile.list((dir, name) -> 
name.endsWith("parquet")).length);
 
       // this query will fail if an "empty" file was created
       testBuilder()
         .unOrdered()
-        .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFile)
+        .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFileName)
         .sqlBaselineQuery(query)
         .go();
     } finally {

Reply via email to