http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java new file mode 100644 index 0000000..864ae48 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.dfs.easy; + +import java.io.IOException; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("fs-writer") +public class EasyWriter extends AbstractWriter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriter.class); + + private final String location; + private final EasyFormatPlugin<?> formatPlugin; + + @JsonCreator + public EasyWriter( + @JsonProperty("child") PhysicalOperator child, + @JsonProperty("location") String location, + @JsonProperty("storage") StoragePluginConfig storageConfig, + @JsonProperty("format") FormatPluginConfig formatConfig, + @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException { + + super(child); + this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig); + Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config."); + this.location = location; + } + + public EasyWriter(PhysicalOperator child, + String location, + EasyFormatPlugin<?> formatPlugin) { + + super(child); + this.formatPlugin = formatPlugin; + this.location = location; + } + + @JsonProperty("location") + public String getLocation() { + return location; + } + + @JsonProperty("storage") + public StoragePluginConfig getStorageConfig(){ + return formatPlugin.getStorageConfig(); + } + + @JsonProperty("format") + public FormatPluginConfig getFormatConfig(){ + return formatPlugin.getConfig(); + } + + @JsonIgnore + public EasyFormatPlugin getFormatPlugin(){ + return formatPlugin; + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new EasyWriter(child, location, formatPlugin); + } + + @Override + public OperatorCost getCost() { + // TODO: + return new OperatorCost(1,1,1,1); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java new file mode 100644 index 0000000..c91ceba --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.dfs.easy; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +public class EasyWriterBatchCreator implements BatchCreator<EasyWriter>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriterBatchCreator.class); + + @Override + public RecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children) + throws ExecutionSetupException { + assert children != null && children.size() == 1; + return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 8bdb1ee..872052c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.easy.json; +import java.io.IOException; import java.util.List; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -29,7 +30,9 @@ import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; +import org.apache.drill.exec.store.dfs.easy.EasyWriter; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig; @@ -52,6 +55,11 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns); } + @Override + public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException { + throw new UnsupportedOperationException("Json Writer is not supported currently."); + } + @JsonTypeName("json") public static class JSONFormatConfig implements FormatPluginConfig { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index 0dbed89..f6cc58e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -17,34 +17,37 @@ */ package org.apache.drill.exec.store.easy.text; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Maps; import com.google.common.base.Preconditions; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyGroupScan; +import org.apache.drill.exec.store.dfs.easy.EasyWriter; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; import org.apache.drill.exec.store.text.DrillTextRecordReader; -import org.apache.hadoop.conf.Configuration; +import org.apache.drill.exec.store.text.DrillTextRecordWriter; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> { @@ -71,6 +74,27 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project? } + @Override + public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException { + Map<String, String> options = Maps.newHashMap(); + + options.put("location", writer.getLocation()); + + FragmentHandle handle = context.getHandle(); + String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); + options.put("prefix", fragmentId); + + options.put("separator", ((TextFormatConfig)getConfig()).getDelimiter()); + options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)writer.getStorageConfig()).connection); + + options.put("extension", ((TextFormatConfig)getConfig()).getExtensions().get(0)); + + RecordWriter recordWriter = new DrillTextRecordWriter(); + recordWriter.init(options); + + return recordWriter; + } + @JsonTypeName("text") public static class TextFormatConfig implements FormatPluginConfig { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index ec6456b..a10d30f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -24,6 +24,8 @@ import java.util.regex.Pattern; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.dfs.BasicFormatMatcher; @@ -101,6 +103,11 @@ public class ParquetFormatPlugin implements FormatPlugin{ } @Override + public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException { + throw new UnsupportedOperationException("Parquet Writer is not supported currently."); + } + + @Override public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException { return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java new file mode 100644 index 0000000..b6840f8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.text; + +import com.google.common.base.Joiner; +import org.apache.drill.exec.store.StringOutputRecordWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; +import java.util.Map; + +public class DrillTextRecordWriter extends StringOutputRecordWriter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class); + + private String location; + private String prefix; + + private String fieldDelimiter; + private String extension; + + private static String eol = System.getProperty("line.separator"); + private int index; + private PrintStream stream = null; + private FileSystem fs = null; + + // Record write status + private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called + private StringBuilder currentRecord; // contains the current record separated by field delimiter + + @Override + public void init(Map<String, String> writerOptions) throws IOException { + this.location = writerOptions.get("location"); + this.prefix = writerOptions.get("prefix"); + this.fieldDelimiter = writerOptions.get("separator"); + this.extension = writerOptions.get("extension"); + + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY)); + this.fs = FileSystem.get(conf); + + this.currentRecord = new StringBuilder(); + this.index = 0; + } + + @Override + public void startNewSchema(List<String> columnNames) throws IOException { + // wrap up the current file + cleanup(); + + // open a new file for writing data with new schema + Path fileName = new Path(location, prefix + "_" + index + "." + extension); + try { + DataOutputStream fos = fs.create(fileName); + stream = new PrintStream(fos); + logger.debug("Created file: {}", fileName); + } catch (IOException ex) { + logger.error("Unable to create file: " + fileName, ex); + throw ex; + } + index++; + + stream.println(Joiner.on(fieldDelimiter).join(columnNames)); + } + + @Override + public void addField(int fieldId, String value) throws IOException { + currentRecord.append(value + fieldDelimiter); + } + + @Override + public void startRecord() throws IOException { + if (fRecordStarted) + throw new IOException("Previous record is not written completely"); + + fRecordStarted = true; + } + + @Override + public void endRecord() throws IOException { + if (!fRecordStarted) + throw new IOException("No record is in writing"); + + // remove the extra delimiter at the end + currentRecord.deleteCharAt(currentRecord.length()-fieldDelimiter.length()); + + stream.println(currentRecord.toString()); + + // reset current record status + currentRecord.delete(0, currentRecord.length()); + fRecordStarted = false; + } + + @Override + public void cleanup() throws IOException { + super.cleanup(); + if (stream != null) { + stream.close(); + stream = null; + logger.debug("closing file"); + } + } + + @Override + public void abort() throws IOException { + cleanup(); + try { + fs.delete(new Path(location), true); + } catch (IOException ex) { + logger.error("Abort failed. There could be leftover output files"); + throw ex; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java deleted file mode 100644 index be7b873..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.writer; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - - -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.TYPE}) -public @interface RecordWriterTemplate { - - /** - * Output format identifier used to identify the RecordWriter implementation. - * @return - */ - String format(); -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java deleted file mode 100644 index 8e7c58c..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.writer.csv; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.util.List; -import java.util.Map; - -import com.google.common.base.Joiner; -import org.apache.drill.exec.store.StringOutputRecordWriter; -import org.apache.drill.exec.store.writer.RecordWriterTemplate; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -@RecordWriterTemplate(format = "csv") -public class CSVRecordWriter extends StringOutputRecordWriter { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CSVRecordWriter.class); - - private String location; // directory where to write the CSV files - private String prefix; // prefix to output file names. - private int index; - - private PrintStream stream = null; - private FileSystem fs = null; - - // Record write status - private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called - private StringBuilder currentRecord; // contains the current record separated by commas - - private static String eol = System.getProperty("line.separator"); - - @Override - public void init(Map<String, String> writerOptions) throws IOException { - this.location = writerOptions.get("location"); - this.prefix = writerOptions.get("prefix"); - this.index = 0; - - Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY)); - this.fs = FileSystem.get(conf); - - currentRecord = new StringBuilder(); - } - - @Override - public void startNewSchema(List<String> columnNames) throws IOException { - // wrap up the current file - cleanup(); - - // open a new file for writing data with new schema - Path fileName = new Path(location, prefix + "_" + index + ".csv"); - try { - DataOutputStream fos = fs.create(fileName); - stream = new PrintStream(fos); - logger.debug("CSVWriter: created file: {}", fileName); - } catch (IOException ex) { - logger.error("Unable to create file: " + fileName, ex); - throw ex; - } - index++; - - stream.println(Joiner.on(",").join(columnNames)); - } - - @Override - public void addField(int fieldId, String value) throws IOException { - currentRecord.append(value + ","); - } - - @Override - public void startRecord() throws IOException { - if (fRecordStarted) - throw new IOException("Previous record is not written completely"); - - fRecordStarted = true; - } - - @Override - public void endRecord() throws IOException { - if (!fRecordStarted) - throw new IOException("No record is in writing"); - - // remove the extra "," at the end - currentRecord.deleteCharAt(currentRecord.length()-1); - - stream.println(currentRecord.toString()); - - // reset current record status - currentRecord.delete(0, currentRecord.length()); - fRecordStarted = false; - } - - @Override - public void cleanup() throws IOException { - super.cleanup(); - if (stream != null) { - stream.close(); - stream = null; - logger.debug("CSVWriter: closing file"); - } - } - - @Override - public void abort() throws IOException { - cleanup(); - try { - fs.delete(new Path(location), true); - } catch (IOException ex) { - logger.error("Abort failed. There could be leftover output files"); - throw ex; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java index a9a277b..698186c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.junit.BeforeClass; import org.junit.Test; import java.util.List; @@ -38,13 +39,19 @@ import static org.junit.Assert.assertTrue; public class TestWriter extends BaseTestQuery { - @Test - public void testSimpleCsv() throws Exception { - // before executing the test deleting the existing CSV files in /tmp/csvtest + static FileSystem fs; + + @BeforeClass + public static void initFs() throws Exception { Configuration conf = new Configuration(); conf.set("fs.name.default", "local"); - FileSystem fs = FileSystem.get(conf); + fs = FileSystem.get(conf); + } + + @Test + public void simpleCsv() throws Exception { + // before executing the test deleting the existing CSV files in /tmp/csvtest Path path = new Path("/tmp/csvtest"); if (fs.exists(path)) { fs.delete(path, true); @@ -80,4 +87,43 @@ public class TestWriter extends BaseTestQuery { } batchLoader.clear(); } + + @Test + public void simpleCTAS() throws Exception { + String testQuery = "USE dfs.tmp;" + + "CREATE TABLE simplectas AS SELECT * FROM cp.`employee.json`;"; + + ctasHelper("/tmp/drilltest/simplectas", testQuery, 1); + } + + @Test + public void complex1CTAS() throws Exception { + String testQuery = "USE dfs.tmp;" + + "CREATE TABLE complex1ctas AS SELECT first_name, last_name, position_id FROM cp.`employee.json`;"; + + ctasHelper("/tmp/drilltest/complex1ctas", testQuery, 1); + } + + @Test + public void complex2CTAS() throws Exception { + String testQuery = "USE dfs.tmp;" + + "CREATE TABLE complex2ctas AS SELECT CAST(`birth_date` as Timestamp) FROM cp.`employee.json` GROUP BY birth_date;"; + + ctasHelper("/tmp/drilltest/complex2ctas", testQuery, 3); + } + + + private void ctasHelper(String tableDir, String testQuery, int numExpectedFiles) throws Exception { + Path tableLocation = new Path(tableDir); + if (fs.exists(tableLocation)){ + fs.delete(tableLocation, true); + } + + test(testQuery); + + assertTrue(fs.exists(tableLocation)); + FileStatus[] fileStatuses = fs.globStatus(new Path(tableLocation.toString(), "*.csv")); + assertEquals(numExpectedFiles, fileStatuses.length); + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/test/resources/storage-plugins.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/storage-plugins.json b/exec/java-exec/src/test/resources/storage-plugins.json index 33f4fac..020805e 100644 --- a/exec/java-exec/src/test/resources/storage-plugins.json +++ b/exec/java-exec/src/test/resources/storage-plugins.json @@ -3,7 +3,18 @@ dfs: { type: "file", connection: "file:///", - formats: { + workspaces: { + "home" : { + location: "/", + writable: false + }, + "tmp" : { + location: "/tmp/drilltest", + writable: true, + storageformat: "csv" + } + }, + formats: { "psv" : { type: "text", extensions: [ "tbl" ], @@ -37,4 +48,4 @@ } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/test/resources/writer/simple_csv_writer.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json index f726ce8..ff670d5 100644 --- a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json +++ b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json @@ -14,52 +14,67 @@ entries:[ {records: 66000, types: [ {name: "red", type: "INT", mode: "REQUIRED"}, - {name: "green", type: "INT", mode: "REQUIRED"}, - {name: "blue", type: "INT", mode: "REQUIRED"} + {name: "green", type: "BIGINT", mode: "OPTIONAL"}, + {name: "blue", type: "VARCHAR", mode: "REQUIRED"} ]}, {records: 66000, types: [ - {name: "blue", type: "INT", mode: "REQUIRED"}, - {name: "green", type: "INT", mode: "REQUIRED"}, - {name: "red", type: "INT", mode: "REQUIRED"} + {name: "blue", type: "BIT", mode: "REQUIRED"}, + {name: "green", type: "DECIMAL18", mode: "REQUIRED"}, + {name: "red", type: "FLOAT8", mode: "OPTIONAL"} ]} ] }, { - @id:2, + @id: 2, child: 1, - pop:"project", - exprs: [ - { ref: "col1", expr:"red" }, - { ref: "col2", expr:"green" }, - { ref: "col3", expr:"blue" } - ] - }, { - @id: 3, - child: 2, - pop: "writer", - createTableEntry: { - "type" : "filesystem", - "config" : { - "type" : "file", - "connection" : "file:///", - "workspaces" : { - "root" : { - "location" : "/", - "writable" : false, - "storageformat" : null - }, - "tmp" : { - "location" : "/tmp", - "writable" : true, - "storageformat" : "csv" - } + pop: "fs-writer", + "location" : "/tmp/csvtest", + "storage" : { + "type" : "file", + "connection" : "file:///", + "workspaces" : { + "root" : { + "location" : "/", + "writable" : false, + "storageformat" : null + }, + "tmp" : { + "location" : "/tmp", + "writable" : true, + "storageformat" : "csv" } }, - "format" : "csv", - "location" : "/tmp/csvtest" + "formats" : { + "psv" : { + "type" : "text", + "extensions" : [ "tbl" ], + "delimiter" : "|" + }, + "csv" : { + "type" : "text", + "extensions" : [ "csv" ], + "delimiter" : "," + }, + "tsv" : { + "type" : "text", + "extensions" : [ "tsv" ], + "delimiter" : "\t" + }, + "parquet" : { + "type" : "parquet" + }, + "json" : { + "type" : "json" + } + } + }, + "format" : { + "type" : "text", + "extensions" : [ "csv" ], + "delimiter" : "," } }, { - @id: 4, - child: 3, + @id: 3, + child: 2, pop: "screen" } ]
