Repository: tajo
Updated Branches:
  refs/heads/master 7c84aeb29 -> ea5ce54d8


Revert "TAJO-1008: Protocol buffer De/Serialization for EvalNode."

This reverts commit 49d52553c5e4ef37a6af4605befbe36b0dd36c3c.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2d6bff7e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2d6bff7e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2d6bff7e

Branch: refs/heads/master
Commit: 2d6bff7e8ef2ca2a7dff39c70383f30b2e06e055
Parents: 7c84aeb
Author: Hyunsik Choi <[email protected]>
Authored: Wed Aug 20 16:44:37 2014 +0900
Committer: Hyunsik Choi <[email protected]>
Committed: Wed Aug 20 16:44:37 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 -
 .../apache/tajo/catalog/TestKeyValueSet.java    |   1 +
 .../org/apache/tajo/client/TajoGetConf.java     |  12 ++
 .../java/org/apache/tajo/OverridableConf.java   |  12 --
 .../java/org/apache/tajo/util/BitArray.java     |   2 +-
 .../java/org/apache/tajo/util/KeyValueSet.java  |   1 +
 .../planner/physical/ColPartitionStoreExec.java |  58 ++-----
 .../HashBasedColPartitionStoreExec.java         |   6 +-
 .../SortBasedColPartitionStoreExec.java         |  40 +++--
 .../engine/planner/physical/StoreTableExec.java |  66 ++-----
 .../org/apache/tajo/master/session/Session.java |   8 +-
 .../planner/physical/TestPhysicalPlanner.java   | 173 +------------------
 .../java/org/apache/tajo/storage/Appender.java  |   2 -
 .../org/apache/tajo/storage/FileAppender.java   |   4 -
 .../org/apache/tajo/storage/StorageUtil.java    |  21 ---
 .../tajo/storage/parquet/ParquetAppender.java   |   4 -
 .../parquet/InternalParquetRecordWriter.java    |   4 -
 .../thirdparty/parquet/ParquetWriter.java       |   4 -
 .../tajo/storage/TestCompressionStorages.java   |   2 +
 19 files changed, 72 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9a22e27..a4160aa 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,9 +31,6 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
-    TAJO-1008: TAJO-1008: Protocol buffer De/Serialization for EvalNode.
-    (hyunsik)
-
     TAJO-984: Improve the default data type handling in RowStoreUtil.
     (jihoon via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java
 
b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java
index a09275d..b317ba4 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java
@@ -23,6 +23,7 @@ import org.apache.tajo.util.KeyValueSet;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class TestKeyValueSet {

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java 
b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
index 2377427..52e1894 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
@@ -20,12 +20,24 @@ package org.apache.tajo.client;
 
 import com.google.protobuf.ServiceException;
 import org.apache.commons.cli.*;
+import org.apache.commons.lang.StringUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
+import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.Writer;
+import java.net.InetSocketAddress;
 import java.sql.SQLException;
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
 
 public class TajoGetConf {
   private static final org.apache.commons.cli.Options options;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java 
b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
index c9cf7fa..220bd43 100644
--- a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
@@ -99,10 +99,6 @@ public class OverridableConf extends KeyValueSet {
     return getBool(key, null);
   }
 
-  public void setInt(ConfigKey key, int val) {
-    setInt(key.keyname(), val);
-  }
-
   public int getInt(ConfigKey key, Integer defaultVal) {
     assertRegisteredEnum(key);
 
@@ -124,10 +120,6 @@ public class OverridableConf extends KeyValueSet {
     return getInt(key, null);
   }
 
-  public void setLong(ConfigKey key, long val) {
-    setLong(key.keyname(), val);
-  }
-
   public long getLong(ConfigKey key, Long defaultVal) {
     assertRegisteredEnum(key);
 
@@ -149,10 +141,6 @@ public class OverridableConf extends KeyValueSet {
     return getLong(key, null);
   }
 
-  public void setFloat(ConfigKey key, float val) {
-    setFloat(key.keyname(), val);
-  }
-
   public float getFloat(ConfigKey key, Float defaultVal) {
     assertRegisteredEnum(key);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java 
b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
index e62496a..9905b6f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
@@ -80,7 +80,7 @@ public class BitArray {
   public void fromByteBuffer(ByteBuffer byteBuffer) {
     clear();
     int i = 0;
-    while(i < data.length && byteBuffer.hasRemaining()) {
+    while(byteBuffer.hasRemaining()) {
       data[i] = byteBuffer.get();
       i++;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java 
b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
index 6edb547..0c3db6d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
@@ -21,6 +21,7 @@ package org.apache.tajo.util;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
+import org.apache.tajo.OverridableConf;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.json.CommonGsonHelper;
 import org.apache.tajo.json.GsonObject;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 7d7d020..e90baff 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -24,12 +24,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.CreateTableNode;
 import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
@@ -37,7 +35,6 @@ import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -53,14 +50,6 @@ public abstract class ColPartitionStoreExec extends 
UnaryPhysicalExec {
   protected final int [] keyIds;
   protected final String [] keyNames;
 
-  protected Appender appender;
-
-  // for file punctuation
-  protected TableStats aggregatedStats;                  // for aggregating 
all stats of written files
-  protected long maxPerFileSize = Long.MAX_VALUE; // default max file size is 
2^63
-  protected int writtenFileNum = 0;               // how many file are written 
so far?
-  protected Path lastFileName;                    // latest written file name
-
   public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode 
plan, PhysicalExec child) {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
@@ -78,15 +67,6 @@ public abstract class ColPartitionStoreExec extends 
UnaryPhysicalExec {
       meta = CatalogUtil.newTableMeta(plan.getStorageType());
     }
 
-    if (!(plan instanceof InsertNode)) {
-      String nullChar = context.getQueryContext().get(SessionVars.NULL_CHAR);
-      StorageUtil.setNullCharForTextSerializer(meta, nullChar);
-    }
-
-    if 
(context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) {
-      maxPerFileSize = 
context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * 
StorageUnit.MB;
-    }
-
     // Find column index to name subpartition directory path
     keyNum = this.plan.getPartitionMethod().getExpressionSchema().size();
 
@@ -127,45 +107,33 @@ public abstract class ColPartitionStoreExec extends 
UnaryPhysicalExec {
     if (!fs.exists(storeTablePath.getParent())) {
       fs.mkdirs(storeTablePath.getParent());
     }
-
-    aggregatedStats = new TableStats();
   }
 
   protected Path getDataFile(String partition) {
     return StorageUtil.concatPath(storeTablePath.getParent(), partition, 
storeTablePath.getName());
   }
 
-  protected Appender getNextPartitionAppender(String partition) throws 
IOException {
-    lastFileName = getDataFile(partition);
-    FileSystem fs = lastFileName.getFileSystem(context.getConf());
+  protected Appender makeAppender(String partition) throws IOException {
+    Path dataFile = getDataFile(partition);
+    FileSystem fs = dataFile.getFileSystem(context.getConf());
 
-    if (fs.exists(lastFileName.getParent())) {
-      LOG.info("Path " + lastFileName.getParent() + " already exists!");
+    if (fs.exists(dataFile.getParent())) {
+      LOG.info("Path " + dataFile.getParent() + " already exists!");
     } else {
-      fs.mkdirs(lastFileName.getParent());
-      LOG.info("Add subpartition path directory :" + lastFileName.getParent());
+      fs.mkdirs(dataFile.getParent());
+      LOG.info("Add subpartition path directory :" + dataFile.getParent());
     }
 
-    if (fs.exists(lastFileName)) {
-      LOG.info("File " + lastFileName + " already exists!");
-      FileStatus status = fs.getFileStatus(lastFileName);
+    if (fs.exists(dataFile)) {
+      LOG.info("File " + dataFile + " already exists!");
+      FileStatus status = fs.getFileStatus(dataFile);
       LOG.info("File size: " + status.getLen());
     }
 
-    openAppender(0);
-
-    return appender;
-  }
-
-  public void openAppender(int suffixId) throws IOException {
-    Path actualFilePath = lastFileName;
-    if (suffixId > 0) {
-      actualFilePath = new Path(lastFileName + "_" + suffixId);
-    }
-
-    appender = 
StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, 
outSchema, actualFilePath);
-
+    Appender appender = 
StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, 
outSchema, dataFile);
     appender.enableStats();
     appender.init();
+
+    return appender;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
index e27a43d..44d1270 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
@@ -37,6 +39,8 @@ import java.util.Map;
  * This class is a physical operator to store at column partitioned table.
  */
 public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec {
+  private static Log LOG = 
LogFactory.getLog(HashBasedColPartitionStoreExec.class);
+
   private final Map<String, Appender> appenderMap = new HashMap<String, 
Appender>();
 
   public HashBasedColPartitionStoreExec(TaskAttemptContext context, 
StoreTableNode plan, PhysicalExec child)
@@ -52,7 +56,7 @@ public class HashBasedColPartitionStoreExec extends 
ColPartitionStoreExec {
     Appender appender = appenderMap.get(partition);
 
     if (appender == null) {
-      appender = getNextPartitionAppender(partition);
+      appender = makeAppender(partition);
       appenderMap.put(partition, appender);
     } else {
       appender = appenderMap.get(partition);

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index 6084f0e..9ce455f 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -21,9 +21,13 @@
  */
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.StringUtils;
@@ -36,9 +40,14 @@ import java.io.IOException;
  * ascending or descending order of partition columns.
  */
 public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
+  private static Log LOG = 
LogFactory.getLog(SortBasedColPartitionStoreExec.class);
+
   private Tuple currentKey;
   private Tuple prevKey;
 
+  private Appender appender;
+  private TableStats aggregated;
+
   public SortBasedColPartitionStoreExec(TaskAttemptContext context, 
StoreTableNode plan, PhysicalExec child)
       throws IOException {
     super(context, plan, child);
@@ -48,6 +57,12 @@ public class SortBasedColPartitionStoreExec extends 
ColPartitionStoreExec {
     super.init();
 
     currentKey = new VTuple(keyNum);
+    aggregated = new TableStats();
+  }
+
+  private Appender getAppender(String partition) throws IOException {
+    this.appender = makeAppender(partition);
+    return appender;
   }
 
   private void fillKeyTuple(Tuple inTuple, Tuple keyTuple) {
@@ -78,30 +93,19 @@ public class SortBasedColPartitionStoreExec extends 
ColPartitionStoreExec {
       fillKeyTuple(tuple, currentKey);
 
       if (prevKey == null) {
-        appender = getNextPartitionAppender(getSubdirectory(currentKey));
+        appender = getAppender(getSubdirectory(currentKey));
         prevKey = new VTuple(currentKey);
       } else {
-        if (!prevKey.equals(currentKey)) {
+        if (!prevKey.equals(currentKey) && 
!getSubdirectory(prevKey).equalsIgnoreCase(getSubdirectory(currentKey))) {
           appender.close();
-          StatisticsUtil.aggregateTableStat(aggregatedStats, 
appender.getStats());
+          StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
 
-          appender = getNextPartitionAppender(getSubdirectory(currentKey));
+          appender = getAppender(getSubdirectory(currentKey));
           prevKey = new VTuple(currentKey);
-
-          // reset all states for file rotating
-          writtenFileNum = 0;
         }
       }
 
       appender.addTuple(tuple);
-
-      if (maxPerFileSize > 0 && maxPerFileSize <= 
appender.getEstimatedOutputSize()) {
-        appender.close();
-        writtenFileNum++;
-        StatisticsUtil.aggregateTableStat(aggregatedStats, 
appender.getStats());
-
-        openAppender(writtenFileNum);
-      }
     }
 
     return null;
@@ -111,10 +115,8 @@ public class SortBasedColPartitionStoreExec extends 
ColPartitionStoreExec {
   public void close() throws IOException {
     if (appender != null) {
       appender.close();
-
-      // Collect statistics data
-      StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats());
-      context.setResultStats(aggregatedStats);
+      StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
+      context.setResultStats(aggregated);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 3753d26..e73cc2f 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -18,18 +18,15 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.StatisticsUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -38,50 +35,24 @@ import java.io.IOException;
  * This is a physical executor to store a table part into a specified storage.
  */
 public class StoreTableExec extends UnaryPhysicalExec {
-  private static final Log LOG = LogFactory.getLog(StoreTableExec.class);
-
   private PersistentStoreNode plan;
-  private TableMeta meta;
   private Appender appender;
   private Tuple tuple;
 
-  // for file punctuation
-  private TableStats sumStats;                  // for aggregating all stats 
of written files
-  private long maxPerFileSize = Long.MAX_VALUE; // default max file size is 
2^63
-  private int writtenFileNum = 0;               // how many file are written 
so far?
-  private Path lastFileName;                    // latest written file name
-
   public StoreTableExec(TaskAttemptContext context, PersistentStoreNode plan, 
PhysicalExec child) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
-
-    if 
(context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) {
-      maxPerFileSize = 
context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * 
StorageUnit.MB;
-    }
   }
 
   public void init() throws IOException {
     super.init();
 
+    TableMeta meta;
     if (plan.hasOptions()) {
       meta = CatalogUtil.newTableMeta(plan.getStorageType(), 
plan.getOptions());
     } else {
       meta = CatalogUtil.newTableMeta(plan.getStorageType());
     }
-    openNewFile(writtenFileNum);
-
-    sumStats = new TableStats();
-  }
-
-  public void openNewFile(int suffixId) throws IOException {
-    String prevFile = null;
-
-    lastFileName = context.getOutputPath();
-    if (suffixId > 0) {
-      prevFile = lastFileName.toString();
-
-      lastFileName = new Path(lastFileName + "_" + suffixId);
-    }
 
     if (plan instanceof InsertNode) {
       InsertNode createTableNode = (InsertNode) plan;
@@ -89,17 +60,13 @@ public class StoreTableExec extends UnaryPhysicalExec {
           createTableNode.getTableSchema(), context.getOutputPath());
     } else {
       String nullChar = context.getQueryContext().get(SessionVars.NULL_CHAR);
-      StorageUtil.setNullCharForTextSerializer(meta, nullChar);
-      appender = 
StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, 
outSchema, lastFileName);
+      meta.putOption(StorageConstants.CSVFILE_NULL, nullChar);
+      appender = 
StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, 
outSchema,
+          context.getOutputPath());
     }
 
     appender.enableStats();
     appender.init();
-
-    if (suffixId > 0) {
-      LOG.info(prevFile + " exceeds " + 
SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " +
-          "The remain output will be written into " + lastFileName.toString());
-    }
   }
 
   /* (non-Javadoc)
@@ -109,16 +76,8 @@ public class StoreTableExec extends UnaryPhysicalExec {
   public Tuple next() throws IOException {
     while((tuple = child.next()) != null) {
       appender.addTuple(tuple);
-
-      if (maxPerFileSize > 0 && maxPerFileSize <= 
appender.getEstimatedOutputSize()) {
-        appender.close();
-
-        writtenFileNum++;
-        StatisticsUtil.aggregateTableStat(sumStats, appender.getStats());
-        openNewFile(writtenFileNum);
-      }
     }
-        
+
     return null;
   }
 
@@ -134,12 +93,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
       appender.flush();
       appender.close();
       // Collect statistics data
-      if (sumStats == null) {
-        context.setResultStats(appender.getStats());
-      } else {
-        StatisticsUtil.aggregateTableStat(sumStats, appender.getStats());
-        context.setResultStats(sumStats);
-      }
+      context.setResultStats(appender.getStats());
       if (context.getTaskId() != null) {
         context.addShuffleFileOutput(0, context.getTaskId().toString());
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java 
b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
index 1f21e2a..cdf552d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
@@ -28,7 +28,7 @@ import java.util.Map;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
 
-public class Session implements SessionConstants, ProtoObject<SessionProto>, 
Cloneable {
+public class Session implements SessionConstants, ProtoObject<SessionProto> {
   private final String sessionId;
   private final String userName;
   private String currentDatabase;
@@ -133,10 +133,4 @@ public class Session implements SessionConstants, 
ProtoObject<SessionProto>, Clo
   public String toString() {
     return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + 
getLastAccessTime();
   }
-
-  public Session clone() throws CloneNotSupportedException {
-    Session newSession = (Session) super.clone();
-    newSession.sessionVariables.putAll(getAllVariables());
-    return newSession;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index da66aa5..a184a9a 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -25,11 +25,13 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.*;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
@@ -50,7 +52,6 @@ import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
@@ -88,7 +89,6 @@ public class TestPhysicalPlanner {
 
   private static TableDesc employee = null;
   private static TableDesc score = null;
-  private static TableDesc largeScore = null;
 
   private static MasterPlan masterPlan;
 
@@ -171,45 +171,6 @@ public class TestPhysicalPlanner {
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
     masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, 
null);
-
-    createLargeScoreTable();
-  }
-
-  public static void createLargeScoreTable() throws IOException {
-    // Preparing a large table
-    Path scoreLargePath = new Path(testDir, "score_large");
-    CommonTestingUtil.cleanupTestDir(scoreLargePath.toString());
-
-    Schema scoreSchmea = score.getSchema();
-    TableMeta scoreLargeMeta = CatalogUtil.newTableMeta(StoreType.RAW, new 
KeyValueSet());
-    Appender appender = 
StorageManagerFactory.getStorageManager(conf).getAppender(scoreLargeMeta, 
scoreSchmea,
-        scoreLargePath);
-    appender.enableStats();
-    appender.init();
-    largeScore = new TableDesc(
-        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, 
"score_large"), scoreSchmea, scoreLargeMeta,
-        scoreLargePath);
-
-    Tuple tuple = new VTuple(scoreSchmea.size());
-    int m = 0;
-    for (int i = 1; i <= 35000; i++) {
-      for (int k = 3; k < 5; k++) { // |{3,4}| = 2
-        for (int j = 1; j <= 3; j++) { // |{1,2,3}| = 3
-          tuple.put(
-              new Datum[] {
-                  DatumFactory.createText("name_" + i), // name_1 ~ 5 (cad: // 
5)
-                  DatumFactory.createText(k + "rd"), // 3 or 4rd (cad: 2)
-                  DatumFactory.createInt4(j), // 1 ~ 3
-                  m % 3 == 1 ? DatumFactory.createText("one") : 
NullDatum.get()});
-          appender.addTuple(tuple);
-          m++;
-        }
-      }
-    }
-    appender.flush();
-    appender.close();
-    largeScore.setStats(appender.getStats());
-    catalog.createTable(largeScore);
   }
 
   @AfterClass
@@ -415,10 +376,7 @@ public class TestPhysicalPlanner {
   private String[] CreateTableAsStmts = {
       "create table grouped1 as select deptName, class, sum(score), 
max(score), min(score) from score group by deptName, class", // 0
       "create table grouped2 using rcfile as select deptName, class, 
sum(score), max(score), min(score) from score group by deptName, class", // 1
-      "create table grouped3 partition by column (dept text,  class text) as 
select sum(score), max(score), min(score), deptName, class from score group by 
deptName, class", // 2,
-      "create table score_large_output as select * from score_large", // 4
-      "CREATE TABLE score_part (deptname text, score int4, nullable text) 
PARTITION BY COLUMN (class text) " +
-          "AS SELECT deptname, score, nullable, class from score_large" // 5
+      "create table grouped3 partition by column (dept text,  class text) as 
select sum(score), max(score), min(score), deptName, class from score group by 
deptName, class", // 2
   };
 
   @Test
@@ -464,62 +422,6 @@ public class TestPhysicalPlanner {
   }
 
   @Test
-  public final void testStorePlanWithMaxOutputFileSize() throws IOException, 
PlanningException,
-      CloneNotSupportedException {
-
-    TableStats stats = largeScore.getStats();
-    assertTrue("Checking meaningfulness of test", stats.getNumBytes() > 
StorageUnit.MB);
-
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", 
largeScore.getMeta(),
-        largeScore.getPath(), Integer.MAX_VALUE);
-    Path workDir = 
CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithMaxOutputFileSize");
-
-    QueryContext queryContext = new QueryContext(conf, session);
-    queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1);
-
-    TaskAttemptContext ctx = new TaskAttemptContext(
-        queryContext,
-        LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new FileFragment[] { frags[0] }, workDir);
-    ctx.setEnforcer(new Enforcer());
-    ctx.setOutputPath(new Path(workDir, "maxOutput"));
-
-    Expr context = analyzer.parse(CreateTableAsStmts[3]);
-
-    LogicalPlan plan = planner.createPlan(queryContext, context);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-    // executing StoreTableExec
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-    exec.init();
-    exec.next();
-    exec.close();
-
-    // checking the number of punctuated files
-    int expectedFileNum = (int) Math.ceil(stats.getNumBytes() / (float) 
StorageUnit.MB);
-    FileSystem fs = ctx.getOutputPath().getFileSystem(conf);
-    FileStatus [] statuses = fs.listStatus(ctx.getOutputPath().getParent());
-    assertEquals(expectedFileNum, statuses.length);
-
-    // checking the file contents
-    long totalNum = 0;
-    for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) {
-      Scanner scanner = 
StorageManagerFactory.getStorageManager(conf).getFileScanner(
-          CatalogUtil.newTableMeta(StoreType.CSV),
-          rootNode.getOutSchema(),
-          status.getPath());
-
-      scanner.init();
-      while ((scanner.next()) != null) {
-        totalNum++;
-      }
-      scanner.close();
-    }
-    assertTrue(totalNum == ctx.getResultStats().getNumRows());
-  }
-
-  @Test
   public final void testStorePlanWithRCFile() throws IOException, 
PlanningException {
     FileFragment[] frags = StorageManager.splitNG(conf, "default.score", 
score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
@@ -686,71 +588,6 @@ public class TestPhysicalPlanner {
   }
 
   @Test
-  public final void testPartitionedStorePlanWithMaxFileSize() throws 
IOException, PlanningException {
-
-    // Preparing working dir and input fragments
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", 
largeScore.getMeta(),
-        largeScore.getPath(), Integer.MAX_VALUE);
-    QueryUnitAttemptId id = 
LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
-    Path workDir = 
CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlanWithMaxFileSize");
-
-    // Setting session variables
-    QueryContext queryContext = new QueryContext(conf, session);
-    queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1);
-
-    // Preparing task context
-    TaskAttemptContext ctx = new TaskAttemptContext(queryContext, id, new 
FileFragment[] { frags[0] }, workDir);
-    ctx.setOutputPath(new Path(workDir, "part-01-000000"));
-    // SortBasedColumnPartitionStoreExec will be chosen by default.
-    ctx.setEnforcer(new Enforcer());
-    Expr context = analyzer.parse(CreateTableAsStmts[4]);
-    LogicalPlan plan = planner.createPlan(queryContext, context);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-    // Executing CREATE TABLE PARTITION BY
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-    exec.init();
-    exec.next();
-    exec.close();
-
-    FileSystem fs = sm.getFileSystem();
-    FileStatus [] list = fs.listStatus(workDir);
-    // checking the number of partitions
-    assertEquals(2, list.length);
-
-    List<FileFragment> fragments = Lists.newArrayList();
-    int i = 0;
-    for (FileStatus status : list) {
-      assertTrue(status.isDirectory());
-
-      long fileVolumSum = 0;
-      FileStatus [] fileStatuses = fs.listStatus(status.getPath());
-      for (FileStatus fileStatus : fileStatuses) {
-        fileVolumSum += fileStatus.getLen();
-        fragments.add(new FileFragment("partition", fileStatus.getPath(), 0, 
fileStatus.getLen()));
-      }
-      assertTrue("checking the meaningfulness of test", fileVolumSum > 
StorageUnit.MB && fileStatuses.length > 1);
-
-      long expectedFileNum = (long) Math.ceil(fileVolumSum / 
(float)StorageUnit.MB);
-      assertEquals(expectedFileNum, fileStatuses.length);
-    }
-    TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
-    Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), 
outputMeta, TUtil.newList(fragments));
-    scanner.init();
-
-    long rowNum = 0;
-    while (scanner.next() != null) {
-      rowNum++;
-    }
-
-    // checking the number of all written rows
-    assertTrue(largeScore.getStats().getNumRows() == rowNum);
-
-    scanner.close();
-  }
-
-  @Test
   public final void testPartitionedStorePlanWithEmptyGroupingSet()
       throws IOException, PlanningException {
     FileFragment[] frags = StorageManager.splitNG(conf, "default.score", 
score.getMeta(), score.getPath(),

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
index c5e96ac..5b42cbd 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
@@ -31,8 +31,6 @@ public interface Appender extends Closeable {
   
   void flush() throws IOException;
 
-  long getEstimatedOutputSize() throws IOException;
-  
   void close() throws IOException;
 
   void enableStats();

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
index 04278e9..064841f 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -57,9 +57,5 @@ public abstract class FileAppender implements Appender {
     this.enabledStats = true;
   }
 
-  public long getEstimatedOutputSize() throws IOException {
-    return getOffset();
-  }
-
   public abstract long getOffset() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 98eaafc..07fa16b 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -240,25 +240,4 @@ public class StorageUtil extends StorageConstants {
       amt -= ret;
     }
   }
-
-  /**
-   * Set nullChar to TableMeta according to file format
-   *
-   * @param meta TableMeta
-   * @param nullChar A character for NULL representation
-   */
-  public static void setNullCharForTextSerializer(TableMeta meta, String 
nullChar) {
-    switch (meta.getStoreType()) {
-    case CSV:
-      meta.putOption(StorageConstants.CSVFILE_NULL, nullChar);
-      break;
-    case RCFILE:
-      meta.putOption(StorageConstants.RCFILE_NULL, nullChar);
-      break;
-    case SEQUENCEFILE:
-      meta.putOption(StorageConstants.SEQUENCEFILE_NULL, nullChar);
-      break;
-    default: // nothing to do
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index ff9e43c..10b9331 100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -129,10 +129,6 @@ public class ParquetAppender extends FileAppender {
     writer.close();
   }
 
-  public long getEstimatedOutputSize() throws IOException {
-    return writer.getEstimatedWrittenSize();
-  }
-
   /**
    * If table statistics is enabled, retrieve the table statistics.
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
index 7410d2b..8ce4b1c 100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
@@ -139,10 +139,6 @@ class InternalParquetRecordWriter<T> {
     }
   }
 
-  public long getEstimatedWrittenSize() throws IOException {
-    return w.getPos() + store.memSize();
-  }
-
   private void flushStore()
       throws IOException {
     LOG.info(format("Flushing mem store to file. allocated memory: %,d", 
store.allocatedSize()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
index 0447a47..743d168 100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
@@ -210,10 +210,6 @@ public class ParquetWriter<T> implements Closeable {
     }
   }
 
-  public long getEstimatedWrittenSize() throws IOException {
-    return this.writer.getEstimatedWrittenSize();
-  }
-
   @Override
   public void close() throws IOException {
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
 
b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index cae0357..61f4682 100644
--- 
a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ 
b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
@@ -47,6 +48,7 @@ import java.util.Arrays;
 import java.util.Collection;
 
 import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 @RunWith(Parameterized.class)
 public class TestCompressionStorages {

Reply via email to