http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63b03467/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java new file mode 100644 index 0000000..50ab94c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.List; + +public class ParquetWriterBatchCreator implements BatchCreator<ParquetWriter>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriterBatchCreator.class); + + @Override + public RecordBatch getBatch(FragmentContext context, ParquetWriter config, List<RecordBatch> children) + throws ExecutionSetupException { + assert children != null && children.size() == 1; + return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63b03467/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java index 7e9d770..f0f2146 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java @@ -18,8 +18,10 @@ package org.apache.drill.exec.store.parquet; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.store.parquet.ColumnReader; -import org.apache.drill.exec.store.parquet.ParquetRecordReader; +import org.apache.drill.common.util.DecimalUtility; +import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; +import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; +import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder; import org.apache.drill.exec.vector.*; import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.NullableVarCharVector; @@ -28,9 +30,12 @@ import org.apache.drill.exec.vector.VarCharVector; import parquet.column.ColumnDescriptor; import parquet.format.ConvertedType; import parquet.column.Encoding; +import parquet.format.SchemaElement; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.io.api.Binary; +import java.math.BigDecimal; + public class VarLengthColumnReaders { public static abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader { @@ -40,8 +45,14 @@ public class VarLengthColumnReaders { VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, - ConvertedType convertedType) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + usingDictionary = true; + } + else { + usingDictionary = false; + } } @Override @@ -64,8 +75,8 @@ public class VarLengthColumnReaders { NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, - ConvertedType convertedType ) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } public abstract boolean setSafe(int index, byte[] value, int start, int length); @@ -78,6 +89,125 @@ public class VarLengthColumnReaders { } } + public static class Decimal28Column extends VarLengthColumn<Decimal28SparseVector> { + + protected Decimal28SparseVector decimal28Vector; + + Decimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal28SparseVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + this.decimal28Vector = v; + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + int width = Decimal28SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + if (index >= decimal28Vector.getValueCapacity()) { + return false; + } + DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits); + return true; + } + + @Override + public int capacity() { + return decimal28Vector.getData().capacity(); + } + } + + public static class NullableDecimal28Column extends NullableVarLengthColumn<NullableDecimal28SparseVector> { + + protected NullableDecimal28SparseVector nullableDecimal28Vector; + + NullableDecimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal28SparseVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + nullableDecimal28Vector = v; + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + int width = Decimal28SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + if (index >= nullableDecimal28Vector.getValueCapacity()) { + return false; + } + DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal28Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits); + nullableDecimal28Vector.getMutator().setIndexDefined(index); + return true; + } + + @Override + public int capacity() { + return nullableDecimal28Vector.getData().capacity(); + } + } + + public static class Decimal38Column extends VarLengthColumn<Decimal38SparseVector> { + + protected Decimal38SparseVector decimal28Vector; + + Decimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal38SparseVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + decimal28Vector = v; + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + int width = Decimal38SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + if (index >= decimal28Vector.getValueCapacity()) { + return false; + } + DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits); + return true; + } + + @Override + public int capacity() { + return decimal28Vector.getData().capacity(); + } + } + + public static class NullableDecimal38Column extends NullableVarLengthColumn<NullableDecimal38SparseVector> { + + protected NullableDecimal38SparseVector nullableDecimal38Vector; + + NullableDecimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal38SparseVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + nullableDecimal38Vector = v; + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + int width = Decimal38SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + if (index >= nullableDecimal38Vector.getValueCapacity()) { + return false; + } + DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal38Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits); + nullableDecimal38Vector.getMutator().setIndexDefined(index); + return true; + } + + @Override + public int capacity() { + return nullableDecimal38Vector.getData().capacity(); + } + } + + public static class VarCharColumn extends VarLengthColumn <VarCharVector> { // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting @@ -85,14 +215,8 @@ public class VarLengthColumnReaders { VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v, - ConvertedType convertedType) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); - if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { - usingDictionary = true; - } - else { - usingDictionary = false; - } + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); varCharVector = v; } @@ -129,15 +253,9 @@ public class VarLengthColumnReaders { NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v, - ConvertedType convertedType ) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); nullableVarCharVector = v; - if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { - usingDictionary = true; - } - else { - usingDictionary = false; - } } public boolean setSafe(int index, byte[] value, int start, int length) { @@ -170,14 +288,8 @@ public class VarLengthColumnReaders { VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v, - ConvertedType convertedType) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); - if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { - usingDictionary = true; - } - else { - usingDictionary = false; - } + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); varBinaryVector = v; } @@ -214,15 +326,9 @@ public class VarLengthColumnReaders { NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, - ConvertedType convertedType ) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); nullableVarBinaryVector = v; - if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { - usingDictionary = true; - } - else { - usingDictionary = false; - } } public boolean setSafe(int index, byte[] value, int start, int length) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63b03467/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java new file mode 100644 index 0000000..54f647a --- /dev/null +++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import parquet.column.page.PageWriteStore; +import parquet.hadoop.CodecFactory.BytesCompressor; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.schema.MessageType; + +import java.io.IOException; + +public class ColumnChunkPageWriteStoreExposer { + + public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(CompressionCodecName codec, int pageSize, MessageType schema, int initialSize) { + BytesCompressor compressor = new CodecFactory(new Configuration()).getCompressor(codec, pageSize); + return new ColumnChunkPageWriteStore(compressor, schema, initialSize); + } + + public static void flushPageStore(PageWriteStore pageStore, ParquetFileWriter w) throws IOException { + ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63b03467/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java new file mode 100644 index 0000000..d9fa722 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.writer; + +import com.google.common.collect.Lists; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.*; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class TestParquetWriter extends BaseTestQuery { + + static FileSystem fs; + + @BeforeClass + public static void initFs() throws Exception { + Configuration conf = new Configuration(); + conf.set("fs.name.default", "local"); + + fs = FileSystem.get(conf); + } + + @Test + public void testSimple() throws Exception { + String selection = "*"; + String inputTable = "cp.`employee.json`"; + runTestAndValidate(selection, selection, inputTable); + } + + @Test + public void testDecimal() throws Exception { + String selection = "cast(salary as decimal(8,2)) as decimal8, cast(salary as decimal(15,2)) as decimal15, " + + "cast(salary as decimal(24,2)) as decimal24, cast(salary as decimal(38,2)) as decimal38"; + String validateSelection = "decimal8, decimal15, decimal24, decimal38"; + String inputTable = "cp.`employee.json`"; + runTestAndValidate(selection, validateSelection, inputTable); + } + + @Test + @Ignore //this test currently fails. will file jira + public void testMulipleRowGroups() throws Exception { + try { + test(String.format("ALTER SESSION SET `%s` = %d", ExecConstants.PARQUET_BLOCK_SIZE, 512*1024)); + String selection = "*"; + String inputTable = "cp.`customer.json`"; + runTestAndValidate(selection, selection, inputTable); + } finally { + test(String.format("ALTER SESSION SET `%s` = %d", ExecConstants.PARQUET_BLOCK_SIZE, 512*1024*1024)); + } + } + + + @Test + @Ignore //enable once Date is enabled + public void testDate() throws Exception { + String selection = "cast(hire_date as DATE) as hire_date"; + String validateSelection = "hire_date"; + String inputTable = "cp.`employee.json`"; + runTestAndValidate(selection, validateSelection, inputTable); + } + + public void runTestAndValidate(String selection, String validationSelection, String inputTable) throws Exception { + + Path path = new Path("/tmp/drilltest/employee_parquet"); + if (fs.exists(path)) { + fs.delete(path, true); + } + + test("use dfs.tmp"); + String query = String.format("SELECT %s FROM %s", selection, inputTable); + String create = "CREATE TABLE employee_parquet AS " + query; + String validateQuery = String.format("SELECT %s FROM employee_parquet", validationSelection); + test(create); + List<QueryResultBatch> results = testSqlWithResults(query); + List<QueryResultBatch> expected = testSqlWithResults(validateQuery); + compareResults(expected, results); + } + + public void compareResults(List<QueryResultBatch> expected, List<QueryResultBatch> result) throws Exception { + Set<Object> expectedObjects = new HashSet(); + Set<Object> actualObjects = new HashSet(); + + BatchSchema schema = null; + RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); + for (QueryResultBatch batch : expected) { + loader.load(batch.getHeader().getDef(), batch.getData()); + if (schema == null) { + schema = loader.getSchema(); + } + for (VectorWrapper w : loader) { + for (int i = 0; i < loader.getRecordCount(); i++) { + Object obj = w.getValueVector().getAccessor().getObject(i); + if (obj != null) { + if (obj instanceof Text) { + expectedObjects.add(obj.toString()); + if (obj.toString().equals("")) { + System.out.println(w.getField()); + } + } else { + expectedObjects.add(obj); + } + } + } + } + loader.clear(); + } + for (QueryResultBatch batch : result) { + loader.load(batch.getHeader().getDef(), batch.getData()); + for (VectorWrapper w : loader) { + for (int i = 0; i < loader.getRecordCount(); i++) { + Object obj = w.getValueVector().getAccessor().getObject(i); + if (obj != null) { + if (obj instanceof Text) { + actualObjects.add(obj.toString()); + if (obj.toString().equals(" ")) { + System.out.println("EMPTY STRING" + w.getField()); + } + } else { + actualObjects.add(obj); + } + } + } + } + loader.clear(); + } + +// Assert.assertEquals("Different number of objects returned", expectedObjects.size(), actualObjects.size()); + + for (Object obj: expectedObjects) { + Assert.assertTrue(String.format("Expected object %s", obj), actualObjects.contains(obj)); + } + for (Object obj: actualObjects) { + Assert.assertTrue(String.format("Unexpected object %s", obj), expectedObjects.contains(obj)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63b03467/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java index c8261aa..43e12d0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java @@ -20,8 +20,11 @@ package org.apache.drill.exec.physical.impl.writer; import com.google.common.base.Charsets; import com.google.common.io.Files; import org.apache.drill.BaseTestQuery; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.VarCharVector; @@ -40,6 +43,7 @@ import static org.junit.Assert.assertTrue; public class TestWriter extends BaseTestQuery { static FileSystem fs; + static String ALTER_SESSION = String.format("ALTER SESSION SET `%s` = 'csv'", ExecConstants.OUTPUT_FORMAT_OPTION); @BeforeClass public static void initFs() throws Exception { @@ -91,6 +95,7 @@ public class TestWriter extends BaseTestQuery { @Test public void simpleCTAS() throws Exception { testSqlWithResults("Use dfs.tmp"); + testSqlWithResults(ALTER_SESSION); String testQuery = "CREATE TABLE simplectas AS SELECT * FROM cp.`employee.json`"; @@ -100,6 +105,7 @@ public class TestWriter extends BaseTestQuery { @Test public void complex1CTAS() throws Exception { testSqlWithResults("Use dfs.tmp"); + testSqlWithResults(ALTER_SESSION); String testQuery = "CREATE TABLE complex1ctas AS SELECT first_name, last_name, position_id FROM cp.`employee.json`"; ctasHelper("/tmp/drilltest/complex1ctas", testQuery, 1155); @@ -108,6 +114,7 @@ public class TestWriter extends BaseTestQuery { @Test public void complex2CTAS() throws Exception { testSqlWithResults("Use dfs.tmp"); + testSqlWithResults(ALTER_SESSION); String testQuery = "CREATE TABLE complex2ctas AS SELECT CAST(`birth_date` as Timestamp) FROM cp.`employee.json` GROUP BY birth_date"; ctasHelper("/tmp/drilltest/complex2ctas", testQuery, 52); @@ -115,11 +122,20 @@ public class TestWriter extends BaseTestQuery { @Test public void simpleCTASWithSchemaInTableName() throws Exception { + testSqlWithResults(ALTER_SESSION); String testQuery = "CREATE TABLE dfs.tmp.`/test/simplectas2` AS SELECT * FROM cp.`employee.json`"; ctasHelper("/tmp/drilltest/test/simplectas2", testQuery, 1155); } + @Test + public void simpleParquetDecimal() throws Exception { +// String testQuery = "CREATE TABLE dfs.tmp.`simpleparquetdecimal` AS SELECT full_name FROM cp.`employee.json`"; + String testQuery = "CREATE TABLE dfs.tmp.`simpleparquetdecimal` AS SELECT cast(salary as decimal(30,2)) * -1 as salary FROM cp.`employee.json`"; +// String testQuery = "select * from dfs.tmp.`simpleparquetdecimal`"; + ctasHelper("/tmp/drilltest/simpleparquetdecimal", testQuery, 1155); + } + private void ctasHelper(String tableDir, String testQuery, int expectedOutputCount) throws Exception { Path tableLocation = new Path(tableDir); if (fs.exists(tableLocation)){ @@ -147,8 +163,9 @@ public class TestWriter extends BaseTestQuery { } batchLoader.clear(); - assertTrue(fs.exists(tableLocation)); +// assertTrue(fs.exists(tableLocation)); assertEquals(expectedOutputCount, recordsWritten); + Thread.sleep(1000); } }
