Repository: tajo Updated Branches: refs/heads/master 7c84aeb29 -> ea5ce54d8
Revert "TAJO-1008: Protocol buffer De/Serialization for EvalNode." This reverts commit 49d52553c5e4ef37a6af4605befbe36b0dd36c3c. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2d6bff7e Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2d6bff7e Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2d6bff7e Branch: refs/heads/master Commit: 2d6bff7e8ef2ca2a7dff39c70383f30b2e06e055 Parents: 7c84aeb Author: Hyunsik Choi <[email protected]> Authored: Wed Aug 20 16:44:37 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Wed Aug 20 16:44:37 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 - .../apache/tajo/catalog/TestKeyValueSet.java | 1 + .../org/apache/tajo/client/TajoGetConf.java | 12 ++ .../java/org/apache/tajo/OverridableConf.java | 12 -- .../java/org/apache/tajo/util/BitArray.java | 2 +- .../java/org/apache/tajo/util/KeyValueSet.java | 1 + .../planner/physical/ColPartitionStoreExec.java | 58 ++----- .../HashBasedColPartitionStoreExec.java | 6 +- .../SortBasedColPartitionStoreExec.java | 40 +++-- .../engine/planner/physical/StoreTableExec.java | 66 ++----- .../org/apache/tajo/master/session/Session.java | 8 +- .../planner/physical/TestPhysicalPlanner.java | 173 +------------------ .../java/org/apache/tajo/storage/Appender.java | 2 - .../org/apache/tajo/storage/FileAppender.java | 4 - .../org/apache/tajo/storage/StorageUtil.java | 21 --- .../tajo/storage/parquet/ParquetAppender.java | 4 - .../parquet/InternalParquetRecordWriter.java | 4 - .../thirdparty/parquet/ParquetWriter.java | 4 - .../tajo/storage/TestCompressionStorages.java | 2 + 19 files changed, 72 insertions(+), 351 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9a22e27..a4160aa 100644 --- a/CHANGES +++ b/CHANGES @@ -31,9 +31,6 @@ Release 0.9.0 - unreleased IMPROVEMENT - TAJO-1008: TAJO-1008: Protocol buffer De/Serialization for EvalNode. - (hyunsik) - TAJO-984: Improve the default data type handling in RowStoreUtil. (jihoon via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java index a09275d..b317ba4 100644 --- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java +++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java @@ -23,6 +23,7 @@ import org.apache.tajo.util.KeyValueSet; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class TestKeyValueSet { http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java index 2377427..52e1894 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java @@ -20,12 +20,24 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.commons.cli.*; +import org.apache.commons.lang.StringUtils; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo; +import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo; +import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.TajoIdUtils; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; +import java.net.InetSocketAddress; import java.sql.SQLException; +import java.text.DecimalFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; public class TajoGetConf { private static final org.apache.commons.cli.Options options; http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java index c9cf7fa..220bd43 100644 --- a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java @@ -99,10 +99,6 @@ public class OverridableConf extends KeyValueSet { return getBool(key, null); } - public void setInt(ConfigKey key, int val) { - setInt(key.keyname(), val); - } - public int getInt(ConfigKey key, Integer defaultVal) { assertRegisteredEnum(key); @@ -124,10 +120,6 @@ public class OverridableConf extends KeyValueSet { return getInt(key, null); } - public void setLong(ConfigKey key, long val) { - setLong(key.keyname(), val); - } - public long getLong(ConfigKey key, Long defaultVal) { assertRegisteredEnum(key); @@ -149,10 +141,6 @@ public class OverridableConf extends KeyValueSet { return getLong(key, null); } - public void setFloat(ConfigKey key, float val) { - setFloat(key.keyname(), val); - } - public float getFloat(ConfigKey key, Float defaultVal) { assertRegisteredEnum(key); http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java index e62496a..9905b6f 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java @@ -80,7 +80,7 @@ public class BitArray { public void fromByteBuffer(ByteBuffer byteBuffer) { clear(); int i = 0; - while(i < data.length && byteBuffer.hasRemaining()) { + while(byteBuffer.hasRemaining()) { data[i] = byteBuffer.get(); i++; } http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java index 6edb547..0c3db6d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java @@ -21,6 +21,7 @@ package org.apache.tajo.util; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.gson.annotations.Expose; +import org.apache.tajo.OverridableConf; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.json.CommonGsonHelper; import org.apache.tajo.json.GsonObject; http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 7d7d020..e90baff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -24,12 +24,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.logical.CreateTableNode; import org.apache.tajo.engine.planner.logical.InsertNode; import org.apache.tajo.engine.planner.logical.NodeType; @@ -37,7 +35,6 @@ import org.apache.tajo.engine.planner.logical.StoreTableNode; import org.apache.tajo.storage.Appender; import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -53,14 +50,6 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { protected final int [] keyIds; protected final String [] keyNames; - protected Appender appender; - - // for file punctuation - protected TableStats aggregatedStats; // for aggregating all stats of written files - protected long maxPerFileSize = Long.MAX_VALUE; // default max file size is 2^63 - protected int writtenFileNum = 0; // how many file are written so far? - protected Path lastFileName; // latest written file name - public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) { super(context, plan.getInSchema(), plan.getOutSchema(), child); this.plan = plan; @@ -78,15 +67,6 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { meta = CatalogUtil.newTableMeta(plan.getStorageType()); } - if (!(plan instanceof InsertNode)) { - String nullChar = context.getQueryContext().get(SessionVars.NULL_CHAR); - StorageUtil.setNullCharForTextSerializer(meta, nullChar); - } - - if (context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) { - maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB; - } - // Find column index to name subpartition directory path keyNum = this.plan.getPartitionMethod().getExpressionSchema().size(); @@ -127,45 +107,33 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { if (!fs.exists(storeTablePath.getParent())) { fs.mkdirs(storeTablePath.getParent()); } - - aggregatedStats = new TableStats(); } protected Path getDataFile(String partition) { return StorageUtil.concatPath(storeTablePath.getParent(), partition, storeTablePath.getName()); } - protected Appender getNextPartitionAppender(String partition) throws IOException { - lastFileName = getDataFile(partition); - FileSystem fs = lastFileName.getFileSystem(context.getConf()); + protected Appender makeAppender(String partition) throws IOException { + Path dataFile = getDataFile(partition); + FileSystem fs = dataFile.getFileSystem(context.getConf()); - if (fs.exists(lastFileName.getParent())) { - LOG.info("Path " + lastFileName.getParent() + " already exists!"); + if (fs.exists(dataFile.getParent())) { + LOG.info("Path " + dataFile.getParent() + " already exists!"); } else { - fs.mkdirs(lastFileName.getParent()); - LOG.info("Add subpartition path directory :" + lastFileName.getParent()); + fs.mkdirs(dataFile.getParent()); + LOG.info("Add subpartition path directory :" + dataFile.getParent()); } - if (fs.exists(lastFileName)) { - LOG.info("File " + lastFileName + " already exists!"); - FileStatus status = fs.getFileStatus(lastFileName); + if (fs.exists(dataFile)) { + LOG.info("File " + dataFile + " already exists!"); + FileStatus status = fs.getFileStatus(dataFile); LOG.info("File size: " + status.getLen()); } - openAppender(0); - - return appender; - } - - public void openAppender(int suffixId) throws IOException { - Path actualFilePath = lastFileName; - if (suffixId > 0) { - actualFilePath = new Path(lastFileName + "_" + suffixId); - } - - appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, actualFilePath); - + Appender appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile); appender.enableStats(); appender.init(); + + return appender; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index e27a43d..44d1270 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -18,6 +18,8 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; @@ -37,6 +39,8 @@ import java.util.Map; * This class is a physical operator to store at column partitioned table. */ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec { + private static Log LOG = LogFactory.getLog(HashBasedColPartitionStoreExec.class); + private final Map<String, Appender> appenderMap = new HashMap<String, Appender>(); public HashBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) @@ -52,7 +56,7 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec { Appender appender = appenderMap.get(partition); if (appender == null) { - appender = getNextPartitionAppender(partition); + appender = makeAppender(partition); appenderMap.put(partition, appender); } else { appender = appenderMap.get(partition); http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index 6084f0e..9ce455f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -21,9 +21,13 @@ */ package org.apache.tajo.engine.planner.physical; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.statistics.StatisticsUtil; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.planner.logical.StoreTableNode; +import org.apache.tajo.storage.Appender; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.StringUtils; @@ -36,9 +40,14 @@ import java.io.IOException; * ascending or descending order of partition columns. */ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { + private static Log LOG = LogFactory.getLog(SortBasedColPartitionStoreExec.class); + private Tuple currentKey; private Tuple prevKey; + private Appender appender; + private TableStats aggregated; + public SortBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException { super(context, plan, child); @@ -48,6 +57,12 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { super.init(); currentKey = new VTuple(keyNum); + aggregated = new TableStats(); + } + + private Appender getAppender(String partition) throws IOException { + this.appender = makeAppender(partition); + return appender; } private void fillKeyTuple(Tuple inTuple, Tuple keyTuple) { @@ -78,30 +93,19 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { fillKeyTuple(tuple, currentKey); if (prevKey == null) { - appender = getNextPartitionAppender(getSubdirectory(currentKey)); + appender = getAppender(getSubdirectory(currentKey)); prevKey = new VTuple(currentKey); } else { - if (!prevKey.equals(currentKey)) { + if (!prevKey.equals(currentKey) && !getSubdirectory(prevKey).equalsIgnoreCase(getSubdirectory(currentKey))) { appender.close(); - StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); + StatisticsUtil.aggregateTableStat(aggregated, appender.getStats()); - appender = getNextPartitionAppender(getSubdirectory(currentKey)); + appender = getAppender(getSubdirectory(currentKey)); prevKey = new VTuple(currentKey); - - // reset all states for file rotating - writtenFileNum = 0; } } appender.addTuple(tuple); - - if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) { - appender.close(); - writtenFileNum++; - StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); - - openAppender(writtenFileNum); - } } return null; @@ -111,10 +115,8 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { public void close() throws IOException { if (appender != null) { appender.close(); - - // Collect statistics data - StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); - context.setResultStats(aggregatedStats); + StatisticsUtil.aggregateTableStat(aggregated, appender.getStats()); + context.setResultStats(aggregated); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index 3753d26..e73cc2f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -18,18 +18,15 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.StatisticsUtil; -import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.logical.InsertNode; import org.apache.tajo.engine.planner.logical.PersistentStoreNode; -import org.apache.tajo.storage.*; -import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.StorageManagerFactory; +import org.apache.tajo.storage.Tuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -38,50 +35,24 @@ import java.io.IOException; * This is a physical executor to store a table part into a specified storage. */ public class StoreTableExec extends UnaryPhysicalExec { - private static final Log LOG = LogFactory.getLog(StoreTableExec.class); - private PersistentStoreNode plan; - private TableMeta meta; private Appender appender; private Tuple tuple; - // for file punctuation - private TableStats sumStats; // for aggregating all stats of written files - private long maxPerFileSize = Long.MAX_VALUE; // default max file size is 2^63 - private int writtenFileNum = 0; // how many file are written so far? - private Path lastFileName; // latest written file name - public StoreTableExec(TaskAttemptContext context, PersistentStoreNode plan, PhysicalExec child) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), child); this.plan = plan; - - if (context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) { - maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB; - } } public void init() throws IOException { super.init(); + TableMeta meta; if (plan.hasOptions()) { meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions()); } else { meta = CatalogUtil.newTableMeta(plan.getStorageType()); } - openNewFile(writtenFileNum); - - sumStats = new TableStats(); - } - - public void openNewFile(int suffixId) throws IOException { - String prevFile = null; - - lastFileName = context.getOutputPath(); - if (suffixId > 0) { - prevFile = lastFileName.toString(); - - lastFileName = new Path(lastFileName + "_" + suffixId); - } if (plan instanceof InsertNode) { InsertNode createTableNode = (InsertNode) plan; @@ -89,17 +60,13 @@ public class StoreTableExec extends UnaryPhysicalExec { createTableNode.getTableSchema(), context.getOutputPath()); } else { String nullChar = context.getQueryContext().get(SessionVars.NULL_CHAR); - StorageUtil.setNullCharForTextSerializer(meta, nullChar); - appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, lastFileName); + meta.putOption(StorageConstants.CSVFILE_NULL, nullChar); + appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, + context.getOutputPath()); } appender.enableStats(); appender.init(); - - if (suffixId > 0) { - LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " + - "The remain output will be written into " + lastFileName.toString()); - } } /* (non-Javadoc) @@ -109,16 +76,8 @@ public class StoreTableExec extends UnaryPhysicalExec { public Tuple next() throws IOException { while((tuple = child.next()) != null) { appender.addTuple(tuple); - - if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) { - appender.close(); - - writtenFileNum++; - StatisticsUtil.aggregateTableStat(sumStats, appender.getStats()); - openNewFile(writtenFileNum); - } } - + return null; } @@ -134,12 +93,7 @@ public class StoreTableExec extends UnaryPhysicalExec { appender.flush(); appender.close(); // Collect statistics data - if (sumStats == null) { - context.setResultStats(appender.getStats()); - } else { - StatisticsUtil.aggregateTableStat(sumStats, appender.getStats()); - context.setResultStats(sumStats); - } + context.setResultStats(appender.getStats()); if (context.getTaskId() != null) { context.addShuffleFileOutput(0, context.getTaskId().toString()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java index 1f21e2a..cdf552d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java @@ -28,7 +28,7 @@ import java.util.Map; import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto; -public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable { +public class Session implements SessionConstants, ProtoObject<SessionProto> { private final String sessionId; private final String userName; private String currentDatabase; @@ -133,10 +133,4 @@ public class Session implements SessionConstants, ProtoObject<SessionProto>, Clo public String toString() { return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + getLastAccessTime(); } - - public Session clone() throws CloneNotSupportedException { - Session newSession = (Session) super.clone(); - newSession.sessionVariables.putAll(getAllVariables()); - return newSession; - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index da66aa5..a184a9a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -25,11 +25,13 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tajo.*; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; @@ -50,7 +52,6 @@ import org.apache.tajo.storage.*; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.index.bst.BSTIndex; -import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; @@ -88,7 +89,6 @@ public class TestPhysicalPlanner { private static TableDesc employee = null; private static TableDesc score = null; - private static TableDesc largeScore = null; private static MasterPlan masterPlan; @@ -171,45 +171,6 @@ public class TestPhysicalPlanner { defaultContext = LocalTajoTestingUtility.createDummyContext(conf); masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); - - createLargeScoreTable(); - } - - public static void createLargeScoreTable() throws IOException { - // Preparing a large table - Path scoreLargePath = new Path(testDir, "score_large"); - CommonTestingUtil.cleanupTestDir(scoreLargePath.toString()); - - Schema scoreSchmea = score.getSchema(); - TableMeta scoreLargeMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet()); - Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreLargeMeta, scoreSchmea, - scoreLargePath); - appender.enableStats(); - appender.init(); - largeScore = new TableDesc( - CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score_large"), scoreSchmea, scoreLargeMeta, - scoreLargePath); - - Tuple tuple = new VTuple(scoreSchmea.size()); - int m = 0; - for (int i = 1; i <= 35000; i++) { - for (int k = 3; k < 5; k++) { // |{3,4}| = 2 - for (int j = 1; j <= 3; j++) { // |{1,2,3}| = 3 - tuple.put( - new Datum[] { - DatumFactory.createText("name_" + i), // name_1 ~ 5 (cad: // 5) - DatumFactory.createText(k + "rd"), // 3 or 4rd (cad: 2) - DatumFactory.createInt4(j), // 1 ~ 3 - m % 3 == 1 ? DatumFactory.createText("one") : NullDatum.get()}); - appender.addTuple(tuple); - m++; - } - } - } - appender.flush(); - appender.close(); - largeScore.setStats(appender.getStats()); - catalog.createTable(largeScore); } @AfterClass @@ -415,10 +376,7 @@ public class TestPhysicalPlanner { private String[] CreateTableAsStmts = { "create table grouped1 as select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 0 "create table grouped2 using rcfile as select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 1 - "create table grouped3 partition by column (dept text, class text) as select sum(score), max(score), min(score), deptName, class from score group by deptName, class", // 2, - "create table score_large_output as select * from score_large", // 4 - "CREATE TABLE score_part (deptname text, score int4, nullable text) PARTITION BY COLUMN (class text) " + - "AS SELECT deptname, score, nullable, class from score_large" // 5 + "create table grouped3 partition by column (dept text, class text) as select sum(score), max(score), min(score), deptName, class from score group by deptName, class", // 2 }; @Test @@ -464,62 +422,6 @@ public class TestPhysicalPlanner { } @Test - public final void testStorePlanWithMaxOutputFileSize() throws IOException, PlanningException, - CloneNotSupportedException { - - TableStats stats = largeScore.getStats(); - assertTrue("Checking meaningfulness of test", stats.getNumBytes() > StorageUnit.MB); - - FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(), - largeScore.getPath(), Integer.MAX_VALUE); - Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithMaxOutputFileSize"); - - QueryContext queryContext = new QueryContext(conf, session); - queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1); - - TaskAttemptContext ctx = new TaskAttemptContext( - queryContext, - LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new FileFragment[] { frags[0] }, workDir); - ctx.setEnforcer(new Enforcer()); - ctx.setOutputPath(new Path(workDir, "maxOutput")); - - Expr context = analyzer.parse(CreateTableAsStmts[3]); - - LogicalPlan plan = planner.createPlan(queryContext, context); - LogicalNode rootNode = optimizer.optimize(plan); - - // executing StoreTableExec - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); - PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); - exec.next(); - exec.close(); - - // checking the number of punctuated files - int expectedFileNum = (int) Math.ceil(stats.getNumBytes() / (float) StorageUnit.MB); - FileSystem fs = ctx.getOutputPath().getFileSystem(conf); - FileStatus [] statuses = fs.listStatus(ctx.getOutputPath().getParent()); - assertEquals(expectedFileNum, statuses.length); - - // checking the file contents - long totalNum = 0; - for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) { - Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner( - CatalogUtil.newTableMeta(StoreType.CSV), - rootNode.getOutSchema(), - status.getPath()); - - scanner.init(); - while ((scanner.next()) != null) { - totalNum++; - } - scanner.close(); - } - assertTrue(totalNum == ctx.getResultStats().getNumRows()); - } - - @Test public final void testStorePlanWithRCFile() throws IOException, PlanningException { FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); @@ -686,71 +588,6 @@ public class TestPhysicalPlanner { } @Test - public final void testPartitionedStorePlanWithMaxFileSize() throws IOException, PlanningException { - - // Preparing working dir and input fragments - FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(), - largeScore.getPath(), Integer.MAX_VALUE); - QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan); - Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlanWithMaxFileSize"); - - // Setting session variables - QueryContext queryContext = new QueryContext(conf, session); - queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1); - - // Preparing task context - TaskAttemptContext ctx = new TaskAttemptContext(queryContext, id, new FileFragment[] { frags[0] }, workDir); - ctx.setOutputPath(new Path(workDir, "part-01-000000")); - // SortBasedColumnPartitionStoreExec will be chosen by default. - ctx.setEnforcer(new Enforcer()); - Expr context = analyzer.parse(CreateTableAsStmts[4]); - LogicalPlan plan = planner.createPlan(queryContext, context); - LogicalNode rootNode = optimizer.optimize(plan); - - // Executing CREATE TABLE PARTITION BY - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); - PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - exec.init(); - exec.next(); - exec.close(); - - FileSystem fs = sm.getFileSystem(); - FileStatus [] list = fs.listStatus(workDir); - // checking the number of partitions - assertEquals(2, list.length); - - List<FileFragment> fragments = Lists.newArrayList(); - int i = 0; - for (FileStatus status : list) { - assertTrue(status.isDirectory()); - - long fileVolumSum = 0; - FileStatus [] fileStatuses = fs.listStatus(status.getPath()); - for (FileStatus fileStatus : fileStatuses) { - fileVolumSum += fileStatus.getLen(); - fragments.add(new FileFragment("partition", fileStatus.getPath(), 0, fileStatus.getLen())); - } - assertTrue("checking the meaningfulness of test", fileVolumSum > StorageUnit.MB && fileStatuses.length > 1); - - long expectedFileNum = (long) Math.ceil(fileVolumSum / (float)StorageUnit.MB); - assertEquals(expectedFileNum, fileStatuses.length); - } - TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV); - Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments)); - scanner.init(); - - long rowNum = 0; - while (scanner.next() != null) { - rowNum++; - } - - // checking the number of all written rows - assertTrue(largeScore.getStats().getNumRows() == rowNum); - - scanner.close(); - } - - @Test public final void testPartitionedStorePlanWithEmptyGroupingSet() throws IOException, PlanningException { FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(), http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java index c5e96ac..5b42cbd 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java @@ -31,8 +31,6 @@ public interface Appender extends Closeable { void flush() throws IOException; - long getEstimatedOutputSize() throws IOException; - void close() throws IOException; void enableStats(); http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java index 04278e9..064841f 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -57,9 +57,5 @@ public abstract class FileAppender implements Appender { this.enabledStats = true; } - public long getEstimatedOutputSize() throws IOException { - return getOffset(); - } - public abstract long getOffset() throws IOException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java index 98eaafc..07fa16b 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java @@ -240,25 +240,4 @@ public class StorageUtil extends StorageConstants { amt -= ret; } } - - /** - * Set nullChar to TableMeta according to file format - * - * @param meta TableMeta - * @param nullChar A character for NULL representation - */ - public static void setNullCharForTextSerializer(TableMeta meta, String nullChar) { - switch (meta.getStoreType()) { - case CSV: - meta.putOption(StorageConstants.CSVFILE_NULL, nullChar); - break; - case RCFILE: - meta.putOption(StorageConstants.RCFILE_NULL, nullChar); - break; - case SEQUENCEFILE: - meta.putOption(StorageConstants.SEQUENCEFILE_NULL, nullChar); - break; - default: // nothing to do - } - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index ff9e43c..10b9331 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -129,10 +129,6 @@ public class ParquetAppender extends FileAppender { writer.close(); } - public long getEstimatedOutputSize() throws IOException { - return writer.getEstimatedWrittenSize(); - } - /** * If table statistics is enabled, retrieve the table statistics. * http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java index 7410d2b..8ce4b1c 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java @@ -139,10 +139,6 @@ class InternalParquetRecordWriter<T> { } } - public long getEstimatedWrittenSize() throws IOException { - return w.getPos() + store.memSize(); - } - private void flushStore() throws IOException { LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java index 0447a47..743d168 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java @@ -210,10 +210,6 @@ public class ParquetWriter<T> implements Closeable { } } - public long getEstimatedWrittenSize() throws IOException { - return this.writer.getEstimatedWrittenSize(); - } - @Override public void close() throws IOException { try { http://git-wip-us.apache.org/repos/asf/tajo/blob/2d6bff7e/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index cae0357..61f4682 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -21,6 +21,7 @@ package org.apache.tajo.storage; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; @@ -47,6 +48,7 @@ import java.util.Arrays; import java.util.Collection; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public class TestCompressionStorages {
