Repository: incubator-drill
Updated Branches:
  refs/heads/master 379f1f387 -> 6dad59032


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 5806ca7..cf61158 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -31,6 +31,7 @@ import net.hydromatic.optiq.Schema;
 import net.hydromatic.optiq.SchemaPlus;
 
 import net.hydromatic.optiq.Table;
+import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.planner.logical.DrillTable;
 
 public abstract class AbstractSchema implements Schema{
@@ -55,6 +56,10 @@ public abstract class AbstractSchema implements Schema{
     return schemaPath;
   }
 
+  public CreateTableEntry createNewTable(String tableName) {
+    throw new UnsupportedOperationException("New tables are not allowed in 
this schema");
+  }
+
   @Override
   public Collection<Function> getFunctions(String name) {
     return Collections.emptyList();
@@ -94,7 +99,4 @@ public abstract class AbstractSchema implements Schema{
   public Expression getExpression(SchemaPlus parentSchema, String name) {
     return EXPRESSION;
   }
-  
-  
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 1b64257..b328245 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -57,5 +57,4 @@ public abstract class AbstractStoragePlugin implements 
StoragePlugin{
   public AbstractGroupScan getPhysicalScan(JSONOptions selection, 
List<SchemaPath> columns) throws IOException {
     throw new UnsupportedOperationException();
   }
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java
new file mode 100644
index 0000000..8c79fed
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.collect.Maps;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.store.writer.RecordWriterTemplate;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class RecordWriterRegistry {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RecordWriterRegistry.class);
+
+  // Contains mapping of "format" to RecordWriter implementation class.
+  private static Map<String, Class<? extends RecordWriter>> formatRegistry;
+
+  // Contains mapping of RecordWriter class to standard constructor that is 
used to instantiate the RecordWriter object.
+  private static Map<Class<? extends RecordWriter>, Constructor<? extends 
RecordWriter>> constructorMap;
+
+  static {
+    formatRegistry = Maps.newHashMap();
+    constructorMap = Maps.newHashMap();
+
+    Class<?>[] rwc = PathScanner.scanForImplementationsArr(RecordWriter.class, 
null);
+
+    for(Class<?> clazz : rwc) {
+      RecordWriterTemplate template = 
clazz.getAnnotation(RecordWriterTemplate.class);
+      if(template == null){
+        logger.warn("{} doesn't have {} annotation. Skipping.", 
clazz.getCanonicalName(), RecordWriterTemplate.class);
+        continue;
+      }
+
+      if (template.format() == null || template.format().isEmpty()) {
+        logger.warn("{} annotation doesn't have valid format field. 
Skipping.", RecordWriterTemplate.class);
+        continue;
+      }
+
+      // Find the standard empty parameter constructor and store it in map.
+      Constructor<?> validConstructor = null;
+      for(Constructor<?> c : clazz.getConstructors()) {
+        if (c.getParameterTypes().length == 0) {
+          validConstructor = c;
+          break;
+        }
+      }
+
+      if (validConstructor != null) {
+        formatRegistry.put(template.format(), (Class<? extends 
RecordWriter>)clazz);
+        constructorMap.put((Class<? extends RecordWriter>)clazz, 
(Constructor<? extends RecordWriter>)validConstructor);
+      } else {
+        logger.info("Skipping RecordWriter class '{}' since it doesn't 
implement a constructor [{}()]",
+            clazz.getCanonicalName(), clazz.getName());
+      }
+    }
+  }
+
+  public static RecordWriter get(String format, Map<String, String> options) 
throws IOException {
+
+    if (formatRegistry.containsKey(format)) {
+      try {
+        RecordWriter writer = 
constructorMap.get(formatRegistry.get(format)).newInstance();
+        writer.init(options);
+        return writer;
+      } catch(Exception e) {
+        logger.debug("Failed to create RecordWriter. Received format: {}, 
options: {}", format, options, e);
+        throw new IOException(
+            String.format("Failed to create RecordWriter for format '%s' with 
options '%s'", format, options), e);
+      }
+    }
+
+    logger.error("Unknown format '{}' received", format);
+    throw new IOException(String.format("Unknown format '%s' received", 
format));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index ad9a7db..1459cfa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -153,6 +153,7 @@ public class StoragePluginRegistry implements 
Iterable<Map.Entry<String, Storage
       }
       logger.debug("using plugin config: {}", cachedPlugins);
     }catch(IOException e){
+      logger.error("Failure while reading storage plugins data.", e);
       throw new IllegalStateException("Failure while reading storage plugins 
data.", e);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 14c5ad8..709fac9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -123,7 +123,7 @@ public class FileSelection {
     return statuses;
   }
 
-  public static FileSelection create(DrillFileSystem fs, Path parent, String 
path) throws IOException {
+  public static FileSelection create(DrillFileSystem fs, String parent, String 
path) throws IOException {
     if ( !(path.contains("*") || path.contains("?")) ) {
       Path p = new Path(parent, path);
       FileStatus status = fs.getFileStatus(p);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index e392fa5..6254dfb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -23,13 +23,14 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.WorkspaceConfig;
 
 @JsonTypeName("file")
 public class FileSystemConfig implements StoragePluginConfig{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FileSystemConfig.class);
   
   public String connection;
-  public Map<String, String> workspaces;
+  public Map<String, WorkspaceConfig> workspaces;
   public Map<String, FormatPluginConfig> formats;
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 6ade4ee..6ddc84a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -30,6 +30,7 @@ import 
org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.WorkspaceConfig;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.rpc.user.DrillUser;
@@ -81,18 +82,19 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
         formatPluginsByConfig.put(p.getConfig(), p);
       }
 
-      List<WorkspaceSchemaFactory> factories = null;
+      List<WorkspaceSchemaFactory> factories;
       if(config.workspaces == null || config.workspaces.isEmpty()){
-        factories = Collections.singletonList(new WorkspaceSchemaFactory(this, 
"default", name, fs, "/", matchers, true/*TODO*/));
+        factories = Collections.singletonList(
+            new WorkspaceSchemaFactory(this, "default", name, fs, 
WorkspaceConfig.DEFAULT, matchers));
       }else{
         factories = Lists.newArrayList();
-        for(Map.Entry<String, String> space : config.workspaces.entrySet()){
-          factories.add(new WorkspaceSchemaFactory(this, space.getKey(), name, 
fs, space.getValue(), matchers, true/*TODO*/));
+        for(Map.Entry<String, WorkspaceConfig> space : 
config.workspaces.entrySet()){
+          factories.add(new WorkspaceSchemaFactory(this, space.getKey(), name, 
fs, space.getValue(), matchers));
         }
 
         // if the "default" workspace is not given add one.
         if (!config.workspaces.containsKey("default")) {
-          factories.add(new WorkspaceSchemaFactory(this, "default", name, fs, 
"/", matchers, true/*TODO*/));
+          factories.add(new WorkspaceSchemaFactory(this, "default", name, fs, 
WorkspaceConfig.DEFAULT, matchers));
         }
       }
       this.schemaFactory = new FileSystemSchemaFactory(name, factories);
@@ -145,5 +147,4 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
       return formatPluginsByConfig.get(config);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index efd19c5..d904297 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -22,19 +22,22 @@ import java.util.List;
 import java.util.Set;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.Table;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.WorkspaceConfig;
+import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
 import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.hadoop.fs.Path;
 
-import com.beust.jcommander.internal.Lists;
+import org.apache.hadoop.fs.Path;
 
 public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
@@ -42,18 +45,18 @@ public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFa
   private final List<FormatMatcher> fileMatchers;
   private final List<FormatMatcher> dirMatchers;
 
-  private final Path root;
+  private final WorkspaceConfig config;
   private final DrillFileSystem fs;
   private final String storageEngineName;
   private final String schemaName;
   private final FileSystemPlugin plugin;
-  private final boolean isMutable;
 
-  public WorkspaceSchemaFactory(FileSystemPlugin plugin, String schemaName, 
String storageEngineName, DrillFileSystem fileSystem, String path,
-      List<FormatMatcher> formatMatchers, boolean isMutable) throws 
ExecutionSetupException {
+  public WorkspaceSchemaFactory(FileSystemPlugin plugin, String schemaName, 
String storageEngineName,
+      DrillFileSystem fileSystem, WorkspaceConfig config,
+      List<FormatMatcher> formatMatchers) throws ExecutionSetupException {
     this.fs = fileSystem;
     this.plugin = plugin;
-    this.root = new Path(path);
+    this.config = config;
     this.fileMatchers = Lists.newArrayList();
     this.dirMatchers = Lists.newArrayList();
     for (FormatMatcher m : formatMatchers) {
@@ -64,18 +67,17 @@ public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFa
     }
     this.storageEngineName = storageEngineName;
     this.schemaName = schemaName;
-    this.isMutable = isMutable;
   }
 
   public WorkspaceSchema createSchema(List<String> parentSchemaPath, 
UserSession session) {
-    return new WorkspaceSchema(parentSchemaPath, schemaName, isMutable, 
session);
+    return new WorkspaceSchema(parentSchemaPath, schemaName, session);
   }
 
   @Override
   public DrillTable create(String key) {
     try {
 
-      FileSelection fileSelection = FileSelection.create(fs, root, key);
+      FileSelection fileSelection = FileSelection.create(fs, 
config.getLocation(), key);
       if(fileSelection == null) return null;
       
       if (fileSelection.containsDirectories(fs)) {
@@ -83,7 +85,7 @@ public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFa
           try {
             Object selection = m.isReadable(fileSelection);
             if (selection != null)
-              return new DynamicDrillTable(plugin, storageEngineName, 
selection, m.getFormatPlugin().getStorageConfig());
+              return new DynamicDrillTable(plugin, storageEngineName, 
selection);
           } catch (IOException e) {
             logger.debug("File read failed.", e);
           }
@@ -94,12 +96,12 @@ public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFa
       for (FormatMatcher m : fileMatchers) {
         Object selection = m.isReadable(fileSelection);
         if (selection != null)
-          return new DynamicDrillTable(plugin, storageEngineName, selection, 
m.getFormatPlugin().getStorageConfig());
+          return new DynamicDrillTable(plugin, storageEngineName, selection);
       }
       return null;
 
     } catch (IOException e) {
-      logger.debug("Failed to create DrillTable with root {} and name {}", 
root, key, e);
+      logger.debug("Failed to create DrillTable with root {} and name {}", 
config.getLocation(), key, e);
     }
 
     return null;
@@ -112,12 +114,10 @@ public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFa
   public class WorkspaceSchema extends AbstractSchema implements 
HasFileSystemSchema {
 
     private ExpandingConcurrentMap<String, DrillTable> tables = new 
ExpandingConcurrentMap<String, DrillTable>(WorkspaceSchemaFactory.this);
-    private boolean isMutable;
-    private UserSession session;
+    private final UserSession session;
 
-    public WorkspaceSchema(List<String> parentSchemaPath, String name, boolean 
isMutable, UserSession session) {
+    public WorkspaceSchema(List<String> parentSchemaPath, String name, 
UserSession session) {
       super(parentSchemaPath, name);
-      this.isMutable = isMutable;
       this.session = session;
     }
 
@@ -136,7 +136,13 @@ public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFa
 
     @Override
     public boolean isMutable() {
-      return isMutable;
+      return config.isWritable();
+    }
+
+    @Override
+    public CreateTableEntry createNewTable(String tableName) {
+      return new 
FileSystemCreateTableEntry((FileSystemConfig)plugin.getConfig(), 
config.getStorageFormat(),
+          config.getLocation() + Path.SEPARATOR + tableName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java
new file mode 100644
index 0000000..be7b873
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.writer;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface RecordWriterTemplate {
+
+  /**
+   * Output format identifier used to identify the RecordWriter implementation.
+   * @return
+   */
+  String format();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java
new file mode 100644
index 0000000..8e7c58c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.writer.csv;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import org.apache.drill.exec.store.StringOutputRecordWriter;
+import org.apache.drill.exec.store.writer.RecordWriterTemplate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@RecordWriterTemplate(format = "csv")
+public class CSVRecordWriter extends StringOutputRecordWriter {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CSVRecordWriter.class);
+
+  private String location;      // directory where to write the CSV files
+  private String prefix;        // prefix to output file names.
+  private int index;
+
+  private PrintStream stream = null;
+  private FileSystem fs = null;
+
+  // Record write status
+  private boolean fRecordStarted = false; // true once the startRecord() is 
called until endRecord() is called
+  private StringBuilder currentRecord;    // contains the current record 
separated by commas
+
+  private static String eol = System.getProperty("line.separator");
+
+  @Override
+  public void init(Map<String, String> writerOptions) throws IOException {
+    this.location = writerOptions.get("location");
+    this.prefix = writerOptions.get("prefix");
+    this.index = 0;
+
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, 
writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
+    this.fs = FileSystem.get(conf);
+
+    currentRecord = new StringBuilder();
+  }
+
+  @Override
+  public void startNewSchema(List<String> columnNames) throws IOException {
+    // wrap up the current file
+    cleanup();
+
+    // open a new file for writing data with new schema
+    Path fileName = new Path(location, prefix + "_" + index + ".csv");
+    try {
+      DataOutputStream fos = fs.create(fileName);
+      stream = new PrintStream(fos);
+      logger.debug("CSVWriter: created file: {}", fileName);
+    } catch (IOException ex) {
+      logger.error("Unable to create file: " + fileName, ex);
+      throw ex;
+    }
+    index++;
+
+    stream.println(Joiner.on(",").join(columnNames));
+  }
+
+  @Override
+  public void addField(int fieldId, String value) throws IOException {
+    currentRecord.append(value + ",");
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    if (fRecordStarted)
+      throw new IOException("Previous record is not written completely");
+
+    fRecordStarted = true;
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    if (!fRecordStarted)
+      throw new IOException("No record is in writing");
+
+    // remove the extra "," at the end
+    currentRecord.deleteCharAt(currentRecord.length()-1);
+
+    stream.println(currentRecord.toString());
+
+    // reset current record status
+    currentRecord.delete(0, currentRecord.length());
+    fRecordStarted = false;
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    super.cleanup();
+    if (stream != null) {
+      stream.close();
+      stream = null;
+      logger.debug("CSVWriter: closing file");
+    }
+  }
+
+  @Override
+  public void abort() throws IOException {
+    cleanup();
+    try {
+      fs.delete(new Path(location), true);
+    } catch (IOException ex) {
+      logger.error("Abort failed. There could be leftover output files");
+      throw ex;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
new file mode 100644
index 0000000..a9a277b
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.writer;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestWriter extends BaseTestQuery {
+
+  @Test
+  public void testSimpleCsv() throws Exception {
+    // before executing the test deleting the existing CSV files in 
/tmp/csvtest
+    Configuration conf = new Configuration();
+    conf.set("fs.name.default", "local");
+
+    FileSystem fs = FileSystem.get(conf);
+    Path path = new Path("/tmp/csvtest");
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+
+    String plan = 
Files.toString(FileUtils.getResourceAsFile("/writer/simple_csv_writer.json"), 
Charsets.UTF_8);
+
+    List<QueryResultBatch> results = testPhysicalWithResults(plan);
+
+    RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
+
+    QueryResultBatch batch = results.get(0);
+    assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+
+    VarCharVector fragmentIdV = (VarCharVector) 
batchLoader.getValueAccessorById(0, VarCharVector.class).getValueVector();
+    BigIntVector recordWrittenV = (BigIntVector) 
batchLoader.getValueAccessorById(1, BigIntVector.class).getValueVector();
+
+    // expected only one row in output
+    assertEquals(1, batchLoader.getRecordCount());
+
+    assertEquals("0_0", fragmentIdV.getAccessor().getObject(0).toString());
+    assertEquals(132000, recordWrittenV.getAccessor().get(0));
+
+    // now verify csv files are written to disk
+    assertTrue(fs.exists(path));
+
+    // expect two files
+    FileStatus[] fileStatuses = fs.globStatus(new Path(path.toString(), 
"*.csv"));
+    assertTrue(2 == fileStatuses.length);
+
+    for(QueryResultBatch b : results){
+      b.release();
+    }
+    batchLoader.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/test/resources/join/hj_exchanges.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hj_exchanges.json 
b/exec/java-exec/src/test/resources/join/hj_exchanges.json
index 2ad1cb4..41360d1 100644
--- a/exec/java-exec/src/test/resources/join/hj_exchanges.json
+++ b/exec/java-exec/src/test/resources/join/hj_exchanges.json
@@ -18,8 +18,14 @@
       "type" : "file",
       "connection" : "classpath:///",
       "workspaces" : {
-        "default" : "/",
-        "home" : "/"
+        "default" : {
+          location: "/",
+          writable: false
+        },
+        "home" : {
+          location: "/",
+          writable: false
+        }
       },
       "formats" : null
     },
@@ -41,8 +47,14 @@
       "type" : "file",
       "connection" : "classpath:///",
       "workspaces" : {
-        "default" : "/",
-        "home" : "/"
+        "default" : {
+          location: "/",
+          writable: false
+        },
+        "home" : {
+          location: "/",
+          writable: false
+        }
       },
       "formats" : null
     },

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json 
b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
new file mode 100644
index 0000000..f726ce8
--- /dev/null
+++ b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
@@ -0,0 +1,66 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+       @id:1,
+       pop:"mock-scan",
+       url: "http://apache.org";,
+       entries:[
+         {records: 66000, types: [
+           {name: "red", type: "INT", mode: "REQUIRED"},
+           {name: "green", type: "INT", mode: "REQUIRED"},
+           {name: "blue", type: "INT", mode: "REQUIRED"}
+         ]},
+         {records: 66000, types: [
+           {name: "blue", type: "INT", mode: "REQUIRED"},
+           {name: "green", type: "INT", mode: "REQUIRED"},
+           {name: "red", type: "INT", mode: "REQUIRED"}
+         ]}
+       ]
+    }, {
+      @id:2,
+      child: 1,
+      pop:"project",
+      exprs: [
+        { ref: "col1", expr:"red" },
+        { ref: "col2", expr:"green" },
+        { ref: "col3", expr:"blue" }
+      ]
+    }, {
+      @id: 3,
+      child: 2,
+      pop: "writer",
+      createTableEntry: {
+        "type" : "filesystem",
+        "config" : {
+          "type" : "file",
+          "connection" : "file:///",
+          "workspaces" : {
+            "root" : {
+              "location" : "/",
+              "writable" : false,
+              "storageformat" : null
+            },
+            "tmp" : {
+              "location" : "/tmp",
+              "writable" : true,
+              "storageformat" : "csv"
+            }
+          }
+        },
+        "format" : "csv",
+        "location" : "/tmp/csvtest"
+      }
+    }, {
+      @id: 4,
+      child: 3,
+      pop: "screen"
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java 
b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
index a149ad2..94fa157 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
@@ -327,6 +327,7 @@ public class TestJdbcQuery extends JdbcTest{
         "SCHEMA_NAME=hive\n" +
         "SCHEMA_NAME=dfs.home\n" +
         "SCHEMA_NAME=dfs.default\n" +
+        "SCHEMA_NAME=dfs.tmp\n" +
         "SCHEMA_NAME=dfs\n" +
         "SCHEMA_NAME=cp.default\n" +
         "SCHEMA_NAME=cp\n" +
@@ -513,12 +514,12 @@ public class TestJdbcQuery extends JdbcTest{
           Statement statement = connection.createStatement();
 
           // change default schema
-          statement.executeQuery("USE dfs.`default`");
+          statement.executeQuery("USE dfs.tmp");
 
           // create view
           ResultSet resultSet = statement.executeQuery(viewCreate);
           String result = JdbcAssert.toString(resultSet).trim();
-          String viewCreateResult = "ok=true; summary=View '" + viewName + "' 
created successfully in 'dfs.default' schema";
+          String viewCreateResult = "ok=true; summary=View '" + viewName + "' 
created successfully in 'dfs.tmp' schema";
           assertTrue(String.format("Generated string:\n%s\ndoes not 
match:\n%s", result, viewCreateResult),
               viewCreateResult.equals(result));
 
@@ -678,7 +679,7 @@ public class TestJdbcQuery extends JdbcTest{
           Statement statement = connection.createStatement();
 
           // change default schema
-          statement.executeQuery("USE dfs.`default`");
+          statement.executeQuery("USE dfs.tmp");
 
           // create view
           statement.executeQuery(
@@ -693,7 +694,7 @@ public class TestJdbcQuery extends JdbcTest{
 
           resultSet = statement.executeQuery("DROP VIEW testview3");
           result = JdbcAssert.toString(resultSet).trim();
-          expected = "ok=true; summary=View 'testview3' deleted successfully 
from 'dfs.default' schema";
+          expected = "ok=true; summary=View 'testview3' deleted successfully 
from 'dfs.tmp' schema";
           assertTrue(String.format("Generated string:\n%s\ndoes not 
match:\n%s", result, expected),
               expected.equals(result));
 
@@ -779,4 +780,4 @@ public class TestJdbcQuery extends JdbcTest{
   }
 
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e1960659/exec/jdbc/src/test/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/resources/storage-plugins.json 
b/exec/jdbc/src/test/resources/storage-plugins.json
index 60efa50..3861317 100644
--- a/exec/jdbc/src/test/resources/storage-plugins.json
+++ b/exec/jdbc/src/test/resources/storage-plugins.json
@@ -4,7 +4,15 @@
       type: "file",
       connection: "file:///",
       workspaces: {
-        home: "/"
+        "home" : {
+          location: "/",
+          writable: false
+        },
+        "tmp" : {
+          location: "/tmp/drilltest",
+          writable: true,
+          storageformat: "csv"
+        }
       },
       formats: {
         "psv" : {

Reply via email to