This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit ef93515f88caf232f00fd6a4b37a6751592bea8a Author: Oleg Zinoviev <ozinov...@solit-clouds.ru> AuthorDate: Sun Jun 16 21:21:46 2019 +0300 DRILL-7156: Support empty Parquet files creation closes #1836 --- .../exec/store/parquet/ParquetRecordWriter.java | 95 +++++++++------- .../impl/writer/TestParquetWriterEmptyFiles.java | 123 +++++++++++++++++---- 2 files changed, 159 insertions(+), 59 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 5a64f40..a9f7f14 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -122,6 +122,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { private PrimitiveTypeName logicalTypeForDecimals; private boolean usePrimitiveTypesForDecimals; + /** Is used to ensure that empty Parquet file will be written if no rows were provided. */ + private boolean empty = true; + public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException { this.oContext = context.newOperatorContext(writer); this.codecFactory = CodecFactory.createDirectCodecFactory(writer.getFormatPlugin().getFsConf(), @@ -205,7 +208,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { public void updateSchema(VectorAccessible batch) throws IOException { if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema()) || containsComplexVectors(this.batchSchema)) { if (this.batchSchema != null) { - flush(); + flush(false); } this.batchSchema = batch.getSchema(); newSchema(); @@ -310,7 +313,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { try { boolean newPartition = newPartition(index); if (newPartition) { - flush(); + flush(false); newSchema(); } } catch (Exception e) { @@ -318,19 +321,18 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { } } - private void flush() throws IOException { + private void flush(boolean cleanUp) throws IOException { try { if (recordCount > 0) { - parquetFileWriter.startBlock(recordCount); - consumer.flush(); - store.flush(); - pageStore.flushToFileWriter(parquetFileWriter); - recordCount = 0; - parquetFileWriter.endBlock(); - - // we are writing one single block per file - parquetFileWriter.end(extraMetaData); - parquetFileWriter = null; + flushParquetFileWriter(); + } else if (cleanUp && empty && schema != null && schema.getFieldCount() > 0) { + // Write empty parquet if: + // 1) This is a cleanup - no any additional records can be written + // 2) No file was written until this moment + // 3) Schema is set + // 4) Schema is not empty + createParquetFileWriter(); + flushParquetFileWriter(); } } finally { store.close(); @@ -347,7 +349,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { long memSize = store.getBufferedSize(); if (memSize > blockSize) { logger.debug("Reached block size " + blockSize); - flush(); + flush(false); newSchema(); recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); } else { @@ -435,29 +437,10 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { // we wait until there is at least one record before creating the parquet file if (parquetFileWriter == null) { - Path path = new Path(location, prefix + "_" + index + ".parquet"); - // to ensure that our writer was the first to create output file, we create empty file first and fail if file exists - Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path); - - // since parquet reader supports partitions, it means that several output files may be created - // if this writer was the one to create table folder, we store only folder and delete it with its content in case of abort - // if table location was created before, we store only files created by this writer and delete them in case of abort - addCleanUpLocation(fs, firstCreatedPath); - - // since ParquetFileWriter will overwrite empty output file (append is not supported) - // we need to re-apply file permission - if (useSingleFSBlock) { - // Passing blockSize creates files with this blockSize instead of filesystem default blockSize. - // Currently, this is supported only by filesystems included in - // BLOCK_FS_SCHEMES (ParquetFileWriter.java in parquet-mr), which includes HDFS. - // For other filesystems, it uses default blockSize configured for the file system. - parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE, blockSize, 0); - } else { - parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE); - } - storageStrategy.applyToFile(fs, path); - parquetFileWriter.start(); + createParquetFileWriter(); } + + empty = false; recordCount++; checkBlockSizeReached(); } @@ -486,11 +469,49 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { @Override public void cleanup() throws IOException { - flush(); + flush(true); codecFactory.release(); } + private void createParquetFileWriter() throws IOException { + Path path = new Path(location, prefix + "_" + index + ".parquet"); + // to ensure that our writer was the first to create output file, we create empty file first and fail if file exists + Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path); + + // since parquet reader supports partitions, it means that several output files may be created + // if this writer was the one to create table folder, we store only folder and delete it with its content in case of abort + // if table location was created before, we store only files created by this writer and delete them in case of abort + addCleanUpLocation(fs, firstCreatedPath); + + // since ParquetFileWriter will overwrite empty output file (append is not supported) + // we need to re-apply file permission + if (useSingleFSBlock) { + // Passing blockSize creates files with this blockSize instead of filesystem default blockSize. + // Currently, this is supported only by filesystems included in + // BLOCK_FS_SCHEMES (ParquetFileWriter.java in parquet-mr), which includes HDFS. + // For other filesystems, it uses default blockSize configured for the file system. + parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE, blockSize, 0); + } else { + parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE); + } + storageStrategy.applyToFile(fs, path); + parquetFileWriter.start(); + } + + private void flushParquetFileWriter() throws IOException { + parquetFileWriter.startBlock(recordCount); + consumer.flush(); + store.flush(); + pageStore.flushToFileWriter(parquetFileWriter); + recordCount = 0; + parquetFileWriter.endBlock(); + + // we are writing one single block per file + parquetFileWriter.end(extraMetaData); + parquetFileWriter = null; + } + /** * Adds passed location to the list of locations to be cleaned up in case of abort. * Add locations if: diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java index bc72234..d2ae653 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java @@ -18,16 +18,24 @@ package org.apache.drill.exec.physical.impl.writer; import org.apache.commons.io.FileUtils; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchemaBuilder; +import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.ParquetTest; import org.apache.drill.categories.UnlikelyTest; import org.apache.drill.exec.ExecConstants; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.File; +import java.nio.file.Paths; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; @Category({ParquetTest.class, UnlikelyTest.class}) public class TestParquetWriterEmptyFiles extends BaseTestQuery { @@ -35,42 +43,110 @@ public class TestParquetWriterEmptyFiles extends BaseTestQuery { @BeforeClass public static void initFs() throws Exception { updateTestCluster(3, null); + dirTestWatcher.copyResourceToRoot(Paths.get("schemachange")); + dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "empty")); + dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "alltypes_required.parquet")); } - @Test // see DRILL-2408 + @Test public void testWriteEmptyFile() throws Exception { final String outputFileName = "testparquetwriteremptyfiles_testwriteemptyfile"; final File outputFile = FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName); test("CREATE TABLE dfs.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0", outputFileName); - Assert.assertFalse(outputFile.exists()); + assertTrue(outputFile.exists()); } @Test - public void testMultipleWriters() throws Exception { - final String outputFile = "testparquetwriteremptyfiles_testmultiplewriters"; + public void testWriteEmptyFileWithSchema() throws Exception { + final String outputFileName = "testparquetwriteremptyfiles_testwriteemptyfilewithschema"; - runSQL("alter session set `planner.slice_target` = 1"); + test("CREATE TABLE dfs.tmp.%s AS select * from dfs.`parquet/alltypes_required.parquet` where `col_int` = 0", outputFileName); - try { - final String query = "SELECT position_id FROM cp.`employee.json` WHERE position_id IN (15, 16) GROUP BY position_id"; + // Only the last scan scheme is written + SchemaBuilder schemaBuilder = new SchemaBuilder() + .add("col_int", TypeProtos.MinorType.INT) + .add("col_chr", TypeProtos.MinorType.VARCHAR) + .add("col_vrchr", TypeProtos.MinorType.VARCHAR) + .add("col_dt", TypeProtos.MinorType.DATE) + .add("col_tim", TypeProtos.MinorType.TIME) + .add("col_tmstmp", TypeProtos.MinorType.TIMESTAMP) + .add("col_flt", TypeProtos.MinorType.FLOAT4) + .add("col_intrvl_yr", TypeProtos.MinorType.INTERVAL) + .add("col_intrvl_day", TypeProtos.MinorType.INTERVAL) + .add("col_bln", TypeProtos.MinorType.BIT); + BatchSchema expectedSchema = new BatchSchemaBuilder() + .withSchemaBuilder(schemaBuilder) + .build(); - test("CREATE TABLE dfs.tmp.%s AS %s", outputFile, query); + testBuilder() + .unOrdered() + .sqlQuery("select * from dfs.tmp.%s", outputFileName) + .schemaBaseLine(expectedSchema) + .go(); + } - // this query will fail if an "empty" file was created - testBuilder() - .unOrdered() - .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFile) - .sqlBaselineQuery(query) - .go(); - } finally { - runSQL("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT); - } + @Test + public void testWriteEmptyFileWithEmptySchema() throws Exception { + final String outputFileName = "testparquetwriteremptyfiles_testwriteemptyfileemptyschema"; + final File outputFile = FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName); + + test("CREATE TABLE dfs.tmp.%s AS SELECT * FROM cp.`empty.json`", outputFileName); + assertFalse(outputFile.exists()); } - @Test // see DRILL-2408 + @Test + public void testWriteEmptySchemaChange() throws Exception { + final String outputFileName = "testparquetwriteremptyfiles_testwriteemptyschemachange"; + final File outputFile = FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName); + + test("CREATE TABLE dfs.tmp.%s AS select id, a, b from dfs.`schemachange/multi/*.json` WHERE id = 0", outputFileName); + + // Only the last scan scheme is written + SchemaBuilder schemaBuilder = new SchemaBuilder() + .addNullable("id", TypeProtos.MinorType.BIGINT) + .addNullable("a", TypeProtos.MinorType.BIGINT) + .addNullable("b", TypeProtos.MinorType.BIT); + BatchSchema expectedSchema = new BatchSchemaBuilder() + .withSchemaBuilder(schemaBuilder) + .build(); + + testBuilder() + .unOrdered() + .sqlQuery("select * from dfs.tmp.%s", outputFileName) + .schemaBaseLine(expectedSchema) + .go(); + + // Make sure that only 1 parquet file was created + assertEquals(1, outputFile.list((dir, name) -> name.endsWith("parquet")).length); + } + + @Test + public void testComplexEmptyFileSchema() throws Exception { + final String outputFileName = "testparquetwriteremptyfiles_testcomplexemptyfileschema"; + + test("create table dfs.tmp.%s as select * from dfs.`parquet/empty/complex/empty_complex.parquet`", outputFileName); + + // end_date column is null, so it missing in result schema. + SchemaBuilder schemaBuilder = new SchemaBuilder() + .addNullable("id", TypeProtos.MinorType.BIGINT) + .addNullable("name", TypeProtos.MinorType.VARCHAR) + .addArray("orders", TypeProtos.MinorType.BIGINT); + BatchSchema expectedSchema = new BatchSchemaBuilder() + .withSchemaBuilder(schemaBuilder) + .build(); + + testBuilder() + .unOrdered() + .sqlQuery("select * from dfs.tmp.%s", outputFileName) + .schemaBaseLine(expectedSchema) + .go(); + } + + @Test public void testWriteEmptyFileAfterFlush() throws Exception { - final String outputFile = "testparquetwriteremptyfiles_test_write_empty_file_after_flush"; + final String outputFileName = "testparquetwriteremptyfiles_test_write_empty_file_after_flush"; + final File outputFile = FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName); try { // this specific value will force a flush just after the final row is written @@ -78,12 +154,15 @@ public class TestParquetWriterEmptyFiles extends BaseTestQuery { test("ALTER SESSION SET `store.parquet.block-size` = 19926"); final String query = "SELECT * FROM cp.`employee.json` LIMIT 100"; - test("CREATE TABLE dfs.tmp.%s AS %s", outputFile, query); + test("CREATE TABLE dfs.tmp.%s AS %s", outputFileName, query); + + // Make sure that only 1 parquet file was created + assertEquals(1, outputFile.list((dir, name) -> name.endsWith("parquet")).length); // this query will fail if an "empty" file was created testBuilder() .unOrdered() - .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFile) + .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFileName) .sqlBaselineQuery(query) .go(); } finally {