http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63b03467/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
new file mode 100644
index 0000000..50ab94c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class ParquetWriterBatchCreator implements BatchCreator<ParquetWriter>{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetWriterBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, ParquetWriter config, 
List<RecordBatch> children)
+      throws ExecutionSetupException {
+    assert children != null && children.size() == 1;
+    return config.getFormatPlugin().getWriterBatch(context, 
children.iterator().next(), config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63b03467/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
index 7e9d770..f0f2146 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
@@ -18,8 +18,10 @@
 package org.apache.drill.exec.store.parquet;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.store.parquet.ColumnReader;
-import org.apache.drill.exec.store.parquet.ParquetRecordReader;
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
 import org.apache.drill.exec.vector.*;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
@@ -28,9 +30,12 @@ import org.apache.drill.exec.vector.VarCharVector;
 import parquet.column.ColumnDescriptor;
 import parquet.format.ConvertedType;
 import parquet.column.Encoding;
+import parquet.format.SchemaElement;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.io.api.Binary;
 
+import java.math.BigDecimal;
+
 public class VarLengthColumnReaders {
 
   public static abstract class VarLengthColumn<V extends ValueVector> extends 
ColumnReader {
@@ -40,8 +45,14 @@ public class VarLengthColumnReaders {
 
     VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
                     ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, V v,
-                    ConvertedType convertedType) throws 
ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
+                    SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+        usingDictionary = true;
+      }
+      else {
+        usingDictionary = false;
+      }
     }
 
     @Override
@@ -64,8 +75,8 @@ public class VarLengthColumnReaders {
 
     NullableVarLengthColumn(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                             ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, V v,
-                            ConvertedType convertedType ) throws 
ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
+                            SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
     }
 
     public abstract boolean setSafe(int index, byte[] value, int start, int 
length);
@@ -78,6 +89,125 @@ public class VarLengthColumnReaders {
     }
   }
 
+  public static class Decimal28Column extends 
VarLengthColumn<Decimal28SparseVector> {
+
+    protected Decimal28SparseVector decimal28Vector;
+
+    Decimal28Column(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
+                   ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, Decimal28SparseVector v,
+                   SchemaElement schemaElement) throws ExecutionSetupException 
{
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      this.decimal28Vector = v;
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      int width = Decimal28SparseHolder.WIDTH;
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, 
schemaElement.getScale());
+      if (index >= decimal28Vector.getValueCapacity()) {
+        return false;
+      }
+      DecimalUtility.getSparseFromBigDecimal(intermediate, 
decimal28Vector.getData(), index * width, schemaElement.getScale(),
+              schemaElement.getPrecision(), 
Decimal28SparseHolder.nDecimalDigits);
+      return true;
+    }
+
+    @Override
+    public int capacity() {
+      return decimal28Vector.getData().capacity();
+    }
+  }
+
+  public static class NullableDecimal28Column extends 
NullableVarLengthColumn<NullableDecimal28SparseVector> {
+
+    protected NullableDecimal28SparseVector nullableDecimal28Vector;
+
+    NullableDecimal28Column(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                    ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, NullableDecimal28SparseVector v,
+                    SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      nullableDecimal28Vector = v;
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      int width = Decimal28SparseHolder.WIDTH;
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, 
schemaElement.getScale());
+      if (index >= nullableDecimal28Vector.getValueCapacity()) {
+        return false;
+      }
+      DecimalUtility.getSparseFromBigDecimal(intermediate, 
nullableDecimal28Vector.getData(), index * width, schemaElement.getScale(),
+              schemaElement.getPrecision(), 
Decimal28SparseHolder.nDecimalDigits);
+      nullableDecimal28Vector.getMutator().setIndexDefined(index);
+      return true;
+    }
+
+    @Override
+    public int capacity() {
+      return nullableDecimal28Vector.getData().capacity();
+    }
+  }
+
+  public static class Decimal38Column extends 
VarLengthColumn<Decimal38SparseVector> {
+
+    protected Decimal38SparseVector decimal28Vector;
+
+    Decimal38Column(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
+                    ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, Decimal38SparseVector v,
+                    SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      decimal28Vector = v;
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      int width = Decimal38SparseHolder.WIDTH;
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, 
schemaElement.getScale());
+      if (index >= decimal28Vector.getValueCapacity()) {
+        return false;
+      }
+      DecimalUtility.getSparseFromBigDecimal(intermediate, 
decimal28Vector.getData(), index * width, schemaElement.getScale(),
+              schemaElement.getPrecision(), 
Decimal38SparseHolder.nDecimalDigits);
+      return true;
+    }
+
+    @Override
+    public int capacity() {
+      return decimal28Vector.getData().capacity();
+    }
+  }
+
+  public static class NullableDecimal38Column extends 
NullableVarLengthColumn<NullableDecimal38SparseVector> {
+
+    protected NullableDecimal38SparseVector nullableDecimal38Vector;
+
+    NullableDecimal38Column(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                            ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, NullableDecimal38SparseVector v,
+                            SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      nullableDecimal38Vector = v;
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      int width = Decimal38SparseHolder.WIDTH;
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, 
schemaElement.getScale());
+      if (index >= nullableDecimal38Vector.getValueCapacity()) {
+        return false;
+      }
+      DecimalUtility.getSparseFromBigDecimal(intermediate, 
nullableDecimal38Vector.getData(), index * width, schemaElement.getScale(),
+              schemaElement.getPrecision(), 
Decimal38SparseHolder.nDecimalDigits);
+      nullableDecimal38Vector.getMutator().setIndexDefined(index);
+      return true;
+    }
+
+    @Override
+    public int capacity() {
+      return nullableDecimal38Vector.getData().capacity();
+    }
+  }
+
+
   public static class VarCharColumn extends VarLengthColumn <VarCharVector> {
 
     // store a hard reference to the vector (which is also stored in the 
superclass) to prevent repetitive casting
@@ -85,14 +215,8 @@ public class VarLengthColumnReaders {
 
     VarCharColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
                   ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, VarCharVector v,
-                  ConvertedType convertedType) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
-      if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-        usingDictionary = true;
-      }
-      else {
-        usingDictionary = false;
-      }
+                  SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
       varCharVector = v;
     }
 
@@ -129,15 +253,9 @@ public class VarLengthColumnReaders {
 
     NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
                           ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, NullableVarCharVector v,
-                          ConvertedType convertedType ) throws 
ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
+                          SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
       nullableVarCharVector = v;
-      if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-          usingDictionary = true;
-      }
-      else {
-        usingDictionary = false;
-      }
     }
 
     public boolean setSafe(int index, byte[] value, int start, int length) {
@@ -170,14 +288,8 @@ public class VarLengthColumnReaders {
 
     VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
                     ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, VarBinaryVector v,
-                    ConvertedType convertedType) throws 
ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
-      if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-        usingDictionary = true;
-      }
-      else {
-        usingDictionary = false;
-      }
+                    SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
       varBinaryVector = v;
     }
 
@@ -214,15 +326,9 @@ public class VarLengthColumnReaders {
 
     NullableVarBinaryColumn(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                             ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, NullableVarBinaryVector v,
-                            ConvertedType convertedType ) throws 
ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
+                            SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
       nullableVarBinaryVector = v;
-      if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-        usingDictionary = true;
-      }
-      else {
-        usingDictionary = false;
-      }
     }
 
     public boolean setSafe(int index, byte[] value, int start, int length) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63b03467/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
 
b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
new file mode 100644
index 0000000..54f647a
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import parquet.column.page.PageWriteStore;
+import parquet.hadoop.CodecFactory.BytesCompressor;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+
+public class ColumnChunkPageWriteStoreExposer {
+
+  public static ColumnChunkPageWriteStore 
newColumnChunkPageWriteStore(CompressionCodecName codec, int pageSize, 
MessageType schema, int initialSize) {
+    BytesCompressor compressor = new CodecFactory(new 
Configuration()).getCompressor(codec, pageSize);
+    return new ColumnChunkPageWriteStore(compressor, schema, initialSize);
+  }
+
+  public static void flushPageStore(PageWriteStore pageStore, 
ParquetFileWriter w) throws IOException {
+    ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63b03467/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
new file mode 100644
index 0000000..d9fa722
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.writer;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.*;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class TestParquetWriter extends BaseTestQuery {
+
+  static FileSystem fs;
+
+  @BeforeClass
+  public static void initFs() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("fs.name.default", "local");
+
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void testSimple() throws Exception {
+    String selection = "*";
+    String inputTable = "cp.`employee.json`";
+    runTestAndValidate(selection, selection, inputTable);
+  }
+
+  @Test
+  public void testDecimal() throws Exception {
+    String selection = "cast(salary as decimal(8,2)) as decimal8, cast(salary 
as decimal(15,2)) as decimal15, " +
+            "cast(salary as decimal(24,2)) as decimal24, cast(salary as 
decimal(38,2)) as decimal38";
+    String validateSelection = "decimal8, decimal15, decimal24, decimal38";
+    String inputTable = "cp.`employee.json`";
+    runTestAndValidate(selection, validateSelection, inputTable);
+  }
+
+  @Test
+  @Ignore //this test currently fails. will file jira
+  public void testMulipleRowGroups() throws Exception {
+    try {
+      test(String.format("ALTER SESSION SET `%s` = %d", 
ExecConstants.PARQUET_BLOCK_SIZE, 512*1024));
+      String selection = "*";
+      String inputTable = "cp.`customer.json`";
+      runTestAndValidate(selection, selection, inputTable);
+    } finally {
+      test(String.format("ALTER SESSION SET `%s` = %d", 
ExecConstants.PARQUET_BLOCK_SIZE, 512*1024*1024));
+    }
+  }
+
+
+  @Test
+  @Ignore //enable once Date is enabled
+  public void testDate() throws Exception {
+    String selection = "cast(hire_date as DATE) as hire_date";
+    String validateSelection = "hire_date";
+    String inputTable = "cp.`employee.json`";
+    runTestAndValidate(selection, validateSelection, inputTable);
+  }
+
+  public void runTestAndValidate(String selection, String validationSelection, 
String inputTable) throws Exception {
+
+    Path path = new Path("/tmp/drilltest/employee_parquet");
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+
+    test("use dfs.tmp");
+    String query = String.format("SELECT %s FROM %s", selection, inputTable);
+    String create = "CREATE TABLE employee_parquet AS " + query;
+    String validateQuery = String.format("SELECT %s FROM employee_parquet", 
validationSelection);
+    test(create);
+    List<QueryResultBatch> results = testSqlWithResults(query);
+    List<QueryResultBatch> expected = testSqlWithResults(validateQuery);
+    compareResults(expected, results);
+  }
+
+  public void compareResults(List<QueryResultBatch> expected, 
List<QueryResultBatch> result) throws Exception {
+    Set<Object> expectedObjects = new HashSet();
+    Set<Object> actualObjects = new HashSet();
+
+    BatchSchema schema = null;
+    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+    for (QueryResultBatch batch : expected) {
+      loader.load(batch.getHeader().getDef(), batch.getData());
+      if (schema == null) {
+        schema = loader.getSchema();
+      }
+      for (VectorWrapper w : loader) {
+        for (int i = 0; i < loader.getRecordCount(); i++) {
+          Object obj = w.getValueVector().getAccessor().getObject(i);
+          if (obj != null) {
+            if (obj instanceof Text) {
+              expectedObjects.add(obj.toString());
+              if (obj.toString().equals("")) {
+                System.out.println(w.getField());
+              }
+            } else {
+              expectedObjects.add(obj);
+            }
+          }
+        }
+      }
+      loader.clear();
+    }
+    for (QueryResultBatch batch : result) {
+      loader.load(batch.getHeader().getDef(), batch.getData());
+      for (VectorWrapper w : loader) {
+        for (int i = 0; i < loader.getRecordCount(); i++) {
+          Object obj = w.getValueVector().getAccessor().getObject(i);
+          if (obj != null) {
+            if (obj instanceof Text) {
+              actualObjects.add(obj.toString());
+              if (obj.toString().equals(" ")) {
+                System.out.println("EMPTY STRING" + w.getField());
+              }
+            } else {
+              actualObjects.add(obj);
+            }
+          }
+        }
+      }
+      loader.clear();
+    }
+
+//    Assert.assertEquals("Different number of objects returned", 
expectedObjects.size(), actualObjects.size());
+
+    for (Object obj: expectedObjects) {
+      Assert.assertTrue(String.format("Expected object %s", obj), 
actualObjects.contains(obj));
+    }
+    for (Object obj: actualObjects) {
+      Assert.assertTrue(String.format("Unexpected object %s", obj), 
expectedObjects.contains(obj));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63b03467/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
index c8261aa..43e12d0 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
@@ -20,8 +20,11 @@ package org.apache.drill.exec.physical.impl.writer;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.VarCharVector;
@@ -40,6 +43,7 @@ import static org.junit.Assert.assertTrue;
 public class TestWriter extends BaseTestQuery {
 
   static FileSystem fs;
+  static String ALTER_SESSION = String.format("ALTER SESSION SET `%s` = 
'csv'", ExecConstants.OUTPUT_FORMAT_OPTION);
 
   @BeforeClass
   public static void initFs() throws Exception {
@@ -91,6 +95,7 @@ public class TestWriter extends BaseTestQuery {
   @Test
   public void simpleCTAS() throws Exception {
     testSqlWithResults("Use dfs.tmp");
+    testSqlWithResults(ALTER_SESSION);
 
     String testQuery = "CREATE TABLE simplectas AS SELECT * FROM 
cp.`employee.json`";
 
@@ -100,6 +105,7 @@ public class TestWriter extends BaseTestQuery {
   @Test
   public void complex1CTAS() throws Exception {
     testSqlWithResults("Use dfs.tmp");
+    testSqlWithResults(ALTER_SESSION);
     String testQuery = "CREATE TABLE complex1ctas AS SELECT first_name, 
last_name, position_id FROM cp.`employee.json`";
 
     ctasHelper("/tmp/drilltest/complex1ctas", testQuery, 1155);
@@ -108,6 +114,7 @@ public class TestWriter extends BaseTestQuery {
   @Test
   public void complex2CTAS() throws Exception {
     testSqlWithResults("Use dfs.tmp");
+    testSqlWithResults(ALTER_SESSION);
     String testQuery = "CREATE TABLE complex2ctas AS SELECT CAST(`birth_date` 
as Timestamp) FROM cp.`employee.json` GROUP BY birth_date";
 
     ctasHelper("/tmp/drilltest/complex2ctas", testQuery, 52);
@@ -115,11 +122,20 @@ public class TestWriter extends BaseTestQuery {
 
   @Test
   public void simpleCTASWithSchemaInTableName() throws Exception {
+    testSqlWithResults(ALTER_SESSION);
     String testQuery = "CREATE TABLE dfs.tmp.`/test/simplectas2` AS SELECT * 
FROM cp.`employee.json`";
 
     ctasHelper("/tmp/drilltest/test/simplectas2", testQuery, 1155);
   }
 
+  @Test
+  public void simpleParquetDecimal() throws Exception {
+//    String testQuery = "CREATE TABLE dfs.tmp.`simpleparquetdecimal` AS 
SELECT full_name FROM cp.`employee.json`";
+    String testQuery = "CREATE TABLE dfs.tmp.`simpleparquetdecimal` AS SELECT 
cast(salary as decimal(30,2)) * -1 as salary FROM cp.`employee.json`";
+//    String testQuery = "select * from dfs.tmp.`simpleparquetdecimal`";
+    ctasHelper("/tmp/drilltest/simpleparquetdecimal", testQuery, 1155);
+  }
+
   private void ctasHelper(String tableDir, String testQuery, int 
expectedOutputCount) throws Exception {
     Path tableLocation = new Path(tableDir);
     if (fs.exists(tableLocation)){
@@ -147,8 +163,9 @@ public class TestWriter extends BaseTestQuery {
     }
     batchLoader.clear();
 
-    assertTrue(fs.exists(tableLocation));
+//    assertTrue(fs.exists(tableLocation));
     assertEquals(expectedOutputCount, recordsWritten);
+    Thread.sleep(1000);
   }
 
 }

Reply via email to