Repository: incubator-drill Updated Branches: refs/heads/master 379f1f387 -> 6dad59032
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java index 5806ca7..cf61158 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java @@ -31,6 +31,7 @@ import net.hydromatic.optiq.Schema; import net.hydromatic.optiq.SchemaPlus; import net.hydromatic.optiq.Table; +import org.apache.drill.exec.planner.logical.CreateTableEntry; import org.apache.drill.exec.planner.logical.DrillTable; public abstract class AbstractSchema implements Schema{ @@ -55,6 +56,10 @@ public abstract class AbstractSchema implements Schema{ return schemaPath; } + public CreateTableEntry createNewTable(String tableName) { + throw new UnsupportedOperationException("New tables are not allowed in this schema"); + } + @Override public Collection<Function> getFunctions(String name) { return Collections.emptyList(); @@ -94,7 +99,4 @@ public abstract class AbstractSchema implements Schema{ public Expression getExpression(SchemaPlus parentSchema, String name) { return EXPRESSION; } - - - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java index 1b64257..b328245 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java @@ -57,5 +57,4 @@ public abstract class AbstractStoragePlugin implements StoragePlugin{ public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException { throw new UnsupportedOperationException(); } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java new file mode 100644 index 0000000..8c79fed --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import com.google.common.collect.Maps; +import org.apache.drill.common.util.PathScanner; +import org.apache.drill.exec.store.writer.RecordWriterTemplate; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Map; + +public class RecordWriterRegistry { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordWriterRegistry.class); + + // Contains mapping of "format" to RecordWriter implementation class. + private static Map<String, Class<? extends RecordWriter>> formatRegistry; + + // Contains mapping of RecordWriter class to standard constructor that is used to instantiate the RecordWriter object. + private static Map<Class<? extends RecordWriter>, Constructor<? extends RecordWriter>> constructorMap; + + static { + formatRegistry = Maps.newHashMap(); + constructorMap = Maps.newHashMap(); + + Class<?>[] rwc = PathScanner.scanForImplementationsArr(RecordWriter.class, null); + + for(Class<?> clazz : rwc) { + RecordWriterTemplate template = clazz.getAnnotation(RecordWriterTemplate.class); + if(template == null){ + logger.warn("{} doesn't have {} annotation. Skipping.", clazz.getCanonicalName(), RecordWriterTemplate.class); + continue; + } + + if (template.format() == null || template.format().isEmpty()) { + logger.warn("{} annotation doesn't have valid format field. Skipping.", RecordWriterTemplate.class); + continue; + } + + // Find the standard empty parameter constructor and store it in map. + Constructor<?> validConstructor = null; + for(Constructor<?> c : clazz.getConstructors()) { + if (c.getParameterTypes().length == 0) { + validConstructor = c; + break; + } + } + + if (validConstructor != null) { + formatRegistry.put(template.format(), (Class<? extends RecordWriter>)clazz); + constructorMap.put((Class<? extends RecordWriter>)clazz, (Constructor<? extends RecordWriter>)validConstructor); + } else { + logger.info("Skipping RecordWriter class '{}' since it doesn't implement a constructor [{}()]", + clazz.getCanonicalName(), clazz.getName()); + } + } + } + + public static RecordWriter get(String format, Map<String, String> options) throws IOException { + + if (formatRegistry.containsKey(format)) { + try { + RecordWriter writer = constructorMap.get(formatRegistry.get(format)).newInstance(); + writer.init(options); + return writer; + } catch(Exception e) { + logger.debug("Failed to create RecordWriter. Received format: {}, options: {}", format, options, e); + throw new IOException( + String.format("Failed to create RecordWriter for format '%s' with options '%s'", format, options), e); + } + } + + logger.error("Unknown format '{}' received", format); + throw new IOException(String.format("Unknown format '%s' received", format)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java index ad9a7db..1459cfa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java @@ -153,6 +153,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage } logger.debug("using plugin config: {}", cachedPlugins); }catch(IOException e){ + logger.error("Failure while reading storage plugins data.", e); throw new IllegalStateException("Failure while reading storage plugins data.", e); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java index 14c5ad8..709fac9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java @@ -123,7 +123,7 @@ public class FileSelection { return statuses; } - public static FileSelection create(DrillFileSystem fs, Path parent, String path) throws IOException { + public static FileSelection create(DrillFileSystem fs, String parent, String path) throws IOException { if ( !(path.contains("*") || path.contains("?")) ) { Path p = new Path(parent, path); FileStatus status = fs.getFileStatus(p); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java index e392fa5..6254dfb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java @@ -23,13 +23,14 @@ import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.logical.WorkspaceConfig; @JsonTypeName("file") public class FileSystemConfig implements StoragePluginConfig{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemConfig.class); public String connection; - public Map<String, String> workspaces; + public Map<String, WorkspaceConfig> workspaces; public Map<String, FormatPluginConfig> formats; @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java index 6ade4ee..6ddc84a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java @@ -30,6 +30,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.common.logical.WorkspaceConfig; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.rpc.user.DrillUser; @@ -81,18 +82,19 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ formatPluginsByConfig.put(p.getConfig(), p); } - List<WorkspaceSchemaFactory> factories = null; + List<WorkspaceSchemaFactory> factories; if(config.workspaces == null || config.workspaces.isEmpty()){ - factories = Collections.singletonList(new WorkspaceSchemaFactory(this, "default", name, fs, "/", matchers, true/*TODO*/)); + factories = Collections.singletonList( + new WorkspaceSchemaFactory(this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers)); }else{ factories = Lists.newArrayList(); - for(Map.Entry<String, String> space : config.workspaces.entrySet()){ - factories.add(new WorkspaceSchemaFactory(this, space.getKey(), name, fs, space.getValue(), matchers, true/*TODO*/)); + for(Map.Entry<String, WorkspaceConfig> space : config.workspaces.entrySet()){ + factories.add(new WorkspaceSchemaFactory(this, space.getKey(), name, fs, space.getValue(), matchers)); } // if the "default" workspace is not given add one. if (!config.workspaces.containsKey("default")) { - factories.add(new WorkspaceSchemaFactory(this, "default", name, fs, "/", matchers, true/*TODO*/)); + factories.add(new WorkspaceSchemaFactory(this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers)); } } this.schemaFactory = new FileSystemSchemaFactory(name, factories); @@ -145,5 +147,4 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ return formatPluginsByConfig.get(config); } } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java index efd19c5..d904297 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java @@ -22,19 +22,22 @@ import java.util.List; import java.util.Set; import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import net.hydromatic.optiq.SchemaPlus; import net.hydromatic.optiq.Table; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.logical.WorkspaceConfig; +import org.apache.drill.exec.planner.logical.CreateTableEntry; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DynamicDrillTable; +import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry; import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap; import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; -import org.apache.hadoop.fs.Path; -import com.beust.jcommander.internal.Lists; +import org.apache.hadoop.fs.Path; public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class); @@ -42,18 +45,18 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa private final List<FormatMatcher> fileMatchers; private final List<FormatMatcher> dirMatchers; - private final Path root; + private final WorkspaceConfig config; private final DrillFileSystem fs; private final String storageEngineName; private final String schemaName; private final FileSystemPlugin plugin; - private final boolean isMutable; - public WorkspaceSchemaFactory(FileSystemPlugin plugin, String schemaName, String storageEngineName, DrillFileSystem fileSystem, String path, - List<FormatMatcher> formatMatchers, boolean isMutable) throws ExecutionSetupException { + public WorkspaceSchemaFactory(FileSystemPlugin plugin, String schemaName, String storageEngineName, + DrillFileSystem fileSystem, WorkspaceConfig config, + List<FormatMatcher> formatMatchers) throws ExecutionSetupException { this.fs = fileSystem; this.plugin = plugin; - this.root = new Path(path); + this.config = config; this.fileMatchers = Lists.newArrayList(); this.dirMatchers = Lists.newArrayList(); for (FormatMatcher m : formatMatchers) { @@ -64,18 +67,17 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa } this.storageEngineName = storageEngineName; this.schemaName = schemaName; - this.isMutable = isMutable; } public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) { - return new WorkspaceSchema(parentSchemaPath, schemaName, isMutable, session); + return new WorkspaceSchema(parentSchemaPath, schemaName, session); } @Override public DrillTable create(String key) { try { - FileSelection fileSelection = FileSelection.create(fs, root, key); + FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key); if(fileSelection == null) return null; if (fileSelection.containsDirectories(fs)) { @@ -83,7 +85,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa try { Object selection = m.isReadable(fileSelection); if (selection != null) - return new DynamicDrillTable(plugin, storageEngineName, selection, m.getFormatPlugin().getStorageConfig()); + return new DynamicDrillTable(plugin, storageEngineName, selection); } catch (IOException e) { logger.debug("File read failed.", e); } @@ -94,12 +96,12 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa for (FormatMatcher m : fileMatchers) { Object selection = m.isReadable(fileSelection); if (selection != null) - return new DynamicDrillTable(plugin, storageEngineName, selection, m.getFormatPlugin().getStorageConfig()); + return new DynamicDrillTable(plugin, storageEngineName, selection); } return null; } catch (IOException e) { - logger.debug("Failed to create DrillTable with root {} and name {}", root, key, e); + logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e); } return null; @@ -112,12 +114,10 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa public class WorkspaceSchema extends AbstractSchema implements HasFileSystemSchema { private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<String, DrillTable>(WorkspaceSchemaFactory.this); - private boolean isMutable; - private UserSession session; + private final UserSession session; - public WorkspaceSchema(List<String> parentSchemaPath, String name, boolean isMutable, UserSession session) { + public WorkspaceSchema(List<String> parentSchemaPath, String name, UserSession session) { super(parentSchemaPath, name); - this.isMutable = isMutable; this.session = session; } @@ -136,7 +136,13 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa @Override public boolean isMutable() { - return isMutable; + return config.isWritable(); + } + + @Override + public CreateTableEntry createNewTable(String tableName) { + return new FileSystemCreateTableEntry((FileSystemConfig)plugin.getConfig(), config.getStorageFormat(), + config.getLocation() + Path.SEPARATOR + tableName); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java new file mode 100644 index 0000000..be7b873 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.writer; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface RecordWriterTemplate { + + /** + * Output format identifier used to identify the RecordWriter implementation. + * @return + */ + String format(); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java new file mode 100644 index 0000000..8e7c58c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.writer.csv; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; +import java.util.Map; + +import com.google.common.base.Joiner; +import org.apache.drill.exec.store.StringOutputRecordWriter; +import org.apache.drill.exec.store.writer.RecordWriterTemplate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +@RecordWriterTemplate(format = "csv") +public class CSVRecordWriter extends StringOutputRecordWriter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CSVRecordWriter.class); + + private String location; // directory where to write the CSV files + private String prefix; // prefix to output file names. + private int index; + + private PrintStream stream = null; + private FileSystem fs = null; + + // Record write status + private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called + private StringBuilder currentRecord; // contains the current record separated by commas + + private static String eol = System.getProperty("line.separator"); + + @Override + public void init(Map<String, String> writerOptions) throws IOException { + this.location = writerOptions.get("location"); + this.prefix = writerOptions.get("prefix"); + this.index = 0; + + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY)); + this.fs = FileSystem.get(conf); + + currentRecord = new StringBuilder(); + } + + @Override + public void startNewSchema(List<String> columnNames) throws IOException { + // wrap up the current file + cleanup(); + + // open a new file for writing data with new schema + Path fileName = new Path(location, prefix + "_" + index + ".csv"); + try { + DataOutputStream fos = fs.create(fileName); + stream = new PrintStream(fos); + logger.debug("CSVWriter: created file: {}", fileName); + } catch (IOException ex) { + logger.error("Unable to create file: " + fileName, ex); + throw ex; + } + index++; + + stream.println(Joiner.on(",").join(columnNames)); + } + + @Override + public void addField(int fieldId, String value) throws IOException { + currentRecord.append(value + ","); + } + + @Override + public void startRecord() throws IOException { + if (fRecordStarted) + throw new IOException("Previous record is not written completely"); + + fRecordStarted = true; + } + + @Override + public void endRecord() throws IOException { + if (!fRecordStarted) + throw new IOException("No record is in writing"); + + // remove the extra "," at the end + currentRecord.deleteCharAt(currentRecord.length()-1); + + stream.println(currentRecord.toString()); + + // reset current record status + currentRecord.delete(0, currentRecord.length()); + fRecordStarted = false; + } + + @Override + public void cleanup() throws IOException { + super.cleanup(); + if (stream != null) { + stream.close(); + stream = null; + logger.debug("CSVWriter: closing file"); + } + } + + @Override + public void abort() throws IOException { + cleanup(); + try { + fs.delete(new Path(location), true); + } catch (IOException ex) { + logger.error("Abort failed. There could be leftover output files"); + throw ex; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java new file mode 100644 index 0000000..a9a277b --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.writer; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.VarCharVector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestWriter extends BaseTestQuery { + + @Test + public void testSimpleCsv() throws Exception { + // before executing the test deleting the existing CSV files in /tmp/csvtest + Configuration conf = new Configuration(); + conf.set("fs.name.default", "local"); + + FileSystem fs = FileSystem.get(conf); + Path path = new Path("/tmp/csvtest"); + if (fs.exists(path)) { + fs.delete(path, true); + } + + String plan = Files.toString(FileUtils.getResourceAsFile("/writer/simple_csv_writer.json"), Charsets.UTF_8); + + List<QueryResultBatch> results = testPhysicalWithResults(plan); + + RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator()); + + QueryResultBatch batch = results.get(0); + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + + VarCharVector fragmentIdV = (VarCharVector) batchLoader.getValueAccessorById(0, VarCharVector.class).getValueVector(); + BigIntVector recordWrittenV = (BigIntVector) batchLoader.getValueAccessorById(1, BigIntVector.class).getValueVector(); + + // expected only one row in output + assertEquals(1, batchLoader.getRecordCount()); + + assertEquals("0_0", fragmentIdV.getAccessor().getObject(0).toString()); + assertEquals(132000, recordWrittenV.getAccessor().get(0)); + + // now verify csv files are written to disk + assertTrue(fs.exists(path)); + + // expect two files + FileStatus[] fileStatuses = fs.globStatus(new Path(path.toString(), "*.csv")); + assertTrue(2 == fileStatuses.length); + + for(QueryResultBatch b : results){ + b.release(); + } + batchLoader.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/test/resources/join/hj_exchanges.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/join/hj_exchanges.json b/exec/java-exec/src/test/resources/join/hj_exchanges.json index 2ad1cb4..41360d1 100644 --- a/exec/java-exec/src/test/resources/join/hj_exchanges.json +++ b/exec/java-exec/src/test/resources/join/hj_exchanges.json @@ -18,8 +18,14 @@ "type" : "file", "connection" : "classpath:///", "workspaces" : { - "default" : "/", - "home" : "/" + "default" : { + location: "/", + writable: false + }, + "home" : { + location: "/", + writable: false + } }, "formats" : null }, @@ -41,8 +47,14 @@ "type" : "file", "connection" : "classpath:///", "workspaces" : { - "default" : "/", - "home" : "/" + "default" : { + location: "/", + writable: false + }, + "home" : { + location: "/", + writable: false + } }, "formats" : null }, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/test/resources/writer/simple_csv_writer.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json new file mode 100644 index 0000000..f726ce8 --- /dev/null +++ b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json @@ -0,0 +1,66 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-scan", + url: "http://apache.org", + entries:[ + {records: 66000, types: [ + {name: "red", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"}, + {name: "blue", type: "INT", mode: "REQUIRED"} + ]}, + {records: 66000, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "INT", mode: "REQUIRED"} + ]} + ] + }, { + @id:2, + child: 1, + pop:"project", + exprs: [ + { ref: "col1", expr:"red" }, + { ref: "col2", expr:"green" }, + { ref: "col3", expr:"blue" } + ] + }, { + @id: 3, + child: 2, + pop: "writer", + createTableEntry: { + "type" : "filesystem", + "config" : { + "type" : "file", + "connection" : "file:///", + "workspaces" : { + "root" : { + "location" : "/", + "writable" : false, + "storageformat" : null + }, + "tmp" : { + "location" : "/tmp", + "writable" : true, + "storageformat" : "csv" + } + } + }, + "format" : "csv", + "location" : "/tmp/csvtest" + } + }, { + @id: 4, + child: 3, + pop: "screen" + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java index a149ad2..94fa157 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java @@ -327,6 +327,7 @@ public class TestJdbcQuery extends JdbcTest{ "SCHEMA_NAME=hive\n" + "SCHEMA_NAME=dfs.home\n" + "SCHEMA_NAME=dfs.default\n" + + "SCHEMA_NAME=dfs.tmp\n" + "SCHEMA_NAME=dfs\n" + "SCHEMA_NAME=cp.default\n" + "SCHEMA_NAME=cp\n" + @@ -513,12 +514,12 @@ public class TestJdbcQuery extends JdbcTest{ Statement statement = connection.createStatement(); // change default schema - statement.executeQuery("USE dfs.`default`"); + statement.executeQuery("USE dfs.tmp"); // create view ResultSet resultSet = statement.executeQuery(viewCreate); String result = JdbcAssert.toString(resultSet).trim(); - String viewCreateResult = "ok=true; summary=View '" + viewName + "' created successfully in 'dfs.default' schema"; + String viewCreateResult = "ok=true; summary=View '" + viewName + "' created successfully in 'dfs.tmp' schema"; assertTrue(String.format("Generated string:\n%s\ndoes not match:\n%s", result, viewCreateResult), viewCreateResult.equals(result)); @@ -678,7 +679,7 @@ public class TestJdbcQuery extends JdbcTest{ Statement statement = connection.createStatement(); // change default schema - statement.executeQuery("USE dfs.`default`"); + statement.executeQuery("USE dfs.tmp"); // create view statement.executeQuery( @@ -693,7 +694,7 @@ public class TestJdbcQuery extends JdbcTest{ resultSet = statement.executeQuery("DROP VIEW testview3"); result = JdbcAssert.toString(resultSet).trim(); - expected = "ok=true; summary=View 'testview3' deleted successfully from 'dfs.default' schema"; + expected = "ok=true; summary=View 'testview3' deleted successfully from 'dfs.tmp' schema"; assertTrue(String.format("Generated string:\n%s\ndoes not match:\n%s", result, expected), expected.equals(result)); @@ -779,4 +780,4 @@ public class TestJdbcQuery extends JdbcTest{ } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/jdbc/src/test/resources/storage-plugins.json ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/test/resources/storage-plugins.json b/exec/jdbc/src/test/resources/storage-plugins.json index 60efa50..3861317 100644 --- a/exec/jdbc/src/test/resources/storage-plugins.json +++ b/exec/jdbc/src/test/resources/storage-plugins.json @@ -4,7 +4,15 @@ type: "file", connection: "file:///", workspaces: { - home: "/" + "home" : { + location: "/", + writable: false + }, + "tmp" : { + location: "/tmp/drilltest", + writable: true, + storageformat: "csv" + } }, formats: { "psv" : {
