Repository: tajo Updated Branches: refs/heads/master f3d63b46b -> a1711d16b
TAJO-931: Output file can be punctuated depending on the file size. (hyunsik) Closes #119 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a1711d16 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a1711d16 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a1711d16 Branch: refs/heads/master Commit: a1711d16be579082fb57e5abb43ff1872d424451 Parents: f3d63b4 Author: Hyunsik Choi <[email protected]> Authored: Thu Aug 21 13:35:29 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Thu Aug 21 13:35:29 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../apache/tajo/catalog/TestKeyValueSet.java | 1 - .../org/apache/tajo/client/TajoGetConf.java | 12 -- .../java/org/apache/tajo/OverridableConf.java | 12 ++ .../java/org/apache/tajo/util/BitArray.java | 2 +- .../java/org/apache/tajo/util/KeyValueSet.java | 1 - .../engine/planner/PhysicalPlannerImpl.java | 6 + .../planner/logical/PersistentStoreNode.java | 2 +- .../planner/physical/ColPartitionStoreExec.java | 55 ++++-- .../HashBasedColPartitionStoreExec.java | 6 +- .../planner/physical/PhysicalPlanUtil.java | 63 +++++++ .../SortBasedColPartitionStoreExec.java | 40 +++-- .../engine/planner/physical/StoreTableExec.java | 64 ++++++- .../org/apache/tajo/master/session/Session.java | 8 +- .../planner/physical/TestPhysicalPlanner.java | 168 ++++++++++++++++++- .../java/org/apache/tajo/storage/Appender.java | 2 + .../org/apache/tajo/storage/FileAppender.java | 4 + .../tajo/storage/HashShuffleAppender.java | 5 + .../tajo/storage/parquet/ParquetAppender.java | 4 + .../parquet/InternalParquetRecordWriter.java | 4 + .../thirdparty/parquet/ParquetWriter.java | 4 + .../tajo/storage/TestCompressionStorages.java | 2 - 22 files changed, 401 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index acd7094..e87feaa 100644 --- a/CHANGES +++ b/CHANGES @@ -31,6 +31,9 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-931: Output file can be punctuated depending on the file size. + (hyunsik) + TAJO-992: Reduce number of hash shuffle output file.(Hyoungjun Kim) TAJO-1008: Protocol buffer De/Serialization for EvalNode. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java index b317ba4..a09275d 100644 --- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java +++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java @@ -23,7 +23,6 @@ import org.apache.tajo.util.KeyValueSet; import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class TestKeyValueSet { http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java index 52e1894..2377427 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java @@ -20,24 +20,12 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.commons.cli.*; -import org.apache.commons.lang.StringUtils; -import org.apache.tajo.QueryId; -import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo; -import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.TajoIdUtils; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; -import java.net.InetSocketAddress; import java.sql.SQLException; -import java.text.DecimalFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.List; public class TajoGetConf { private static final org.apache.commons.cli.Options options; http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java index 220bd43..c9cf7fa 100644 --- a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java @@ -99,6 +99,10 @@ public class OverridableConf extends KeyValueSet { return getBool(key, null); } + public void setInt(ConfigKey key, int val) { + setInt(key.keyname(), val); + } + public int getInt(ConfigKey key, Integer defaultVal) { assertRegisteredEnum(key); @@ -120,6 +124,10 @@ public class OverridableConf extends KeyValueSet { return getInt(key, null); } + public void setLong(ConfigKey key, long val) { + setLong(key.keyname(), val); + } + public long getLong(ConfigKey key, Long defaultVal) { assertRegisteredEnum(key); @@ -141,6 +149,10 @@ public class OverridableConf extends KeyValueSet { return getLong(key, null); } + public void setFloat(ConfigKey key, float val) { + setFloat(key.keyname(), val); + } + public float getFloat(ConfigKey key, Float defaultVal) { assertRegisteredEnum(key); http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java index 9905b6f..e62496a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java @@ -80,7 +80,7 @@ public class BitArray { public void fromByteBuffer(ByteBuffer byteBuffer) { clear(); int i = 0; - while(byteBuffer.hasRemaining()) { + while(i < data.length && byteBuffer.hasRemaining()) { data[i] = byteBuffer.get(); i++; } http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java index 0c3db6d..6edb547 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java @@ -21,7 +21,6 @@ package org.apache.tajo.util; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.gson.annotations.Expose; -import org.apache.tajo.OverridableConf; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.json.CommonGsonHelper; import org.apache.tajo.json.GsonObject; http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 19a16d7..e34548c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -45,6 +45,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.storage.AbstractStorageManager; +import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; @@ -777,6 +778,11 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { return new RangeShuffleFileWriteExec(ctx, sm, subOp, plan.getInSchema(), plan.getInSchema(), sortSpecs); case NONE_SHUFFLE: + // if there is no given NULL CHAR property in the table property and the query is neither CTAS or INSERT, + // we set DEFAULT NULL CHAR to the table property. + if (!ctx.getQueryContext().containsKey(SessionVars.NULL_CHAR)) { + plan.getOptions().set(StorageConstants.CSVFILE_NULL, TajoConf.ConfVars.$CSVFILE_NULL.defaultVal); + } return new StoreTableExec(ctx, plan, subOp); default: http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java index 8d1d90f..61507c8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java @@ -33,7 +33,7 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType; */ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable { @Expose protected StoreType storageType = StoreType.CSV; - @Expose protected KeyValueSet options; + @Expose protected KeyValueSet options = new KeyValueSet(); protected PersistentStoreNode(int pid, NodeType nodeType) { super(pid, nodeType); http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index e90baff..92738e5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -24,10 +24,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.logical.CreateTableNode; import org.apache.tajo.engine.planner.logical.InsertNode; import org.apache.tajo.engine.planner.logical.NodeType; @@ -35,6 +37,7 @@ import org.apache.tajo.engine.planner.logical.StoreTableNode; import org.apache.tajo.storage.Appender; import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -50,6 +53,14 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { protected final int [] keyIds; protected final String [] keyNames; + protected Appender appender; + + // for file punctuation + protected TableStats aggregatedStats; // for aggregating all stats of written files + protected long maxPerFileSize = Long.MAX_VALUE; // default max file size is 2^63 + protected int writtenFileNum = 0; // how many file are written so far? + protected Path lastFileName; // latest written file name + public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) { super(context, plan.getInSchema(), plan.getOutSchema(), child); this.plan = plan; @@ -67,6 +78,12 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { meta = CatalogUtil.newTableMeta(plan.getStorageType()); } + PhysicalPlanUtil.setNullCharIfNecessary(context.getQueryContext(), plan, meta); + + if (context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) { + maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB; + } + // Find column index to name subpartition directory path keyNum = this.plan.getPartitionMethod().getExpressionSchema().size(); @@ -107,33 +124,45 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { if (!fs.exists(storeTablePath.getParent())) { fs.mkdirs(storeTablePath.getParent()); } + + aggregatedStats = new TableStats(); } protected Path getDataFile(String partition) { return StorageUtil.concatPath(storeTablePath.getParent(), partition, storeTablePath.getName()); } - protected Appender makeAppender(String partition) throws IOException { - Path dataFile = getDataFile(partition); - FileSystem fs = dataFile.getFileSystem(context.getConf()); + protected Appender getNextPartitionAppender(String partition) throws IOException { + lastFileName = getDataFile(partition); + FileSystem fs = lastFileName.getFileSystem(context.getConf()); - if (fs.exists(dataFile.getParent())) { - LOG.info("Path " + dataFile.getParent() + " already exists!"); + if (fs.exists(lastFileName.getParent())) { + LOG.info("Path " + lastFileName.getParent() + " already exists!"); } else { - fs.mkdirs(dataFile.getParent()); - LOG.info("Add subpartition path directory :" + dataFile.getParent()); + fs.mkdirs(lastFileName.getParent()); + LOG.info("Add subpartition path directory :" + lastFileName.getParent()); } - if (fs.exists(dataFile)) { - LOG.info("File " + dataFile + " already exists!"); - FileStatus status = fs.getFileStatus(dataFile); + if (fs.exists(lastFileName)) { + LOG.info("File " + lastFileName + " already exists!"); + FileStatus status = fs.getFileStatus(lastFileName); LOG.info("File size: " + status.getLen()); } - Appender appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile); - appender.enableStats(); - appender.init(); + openAppender(0); return appender; } + + public void openAppender(int suffixId) throws IOException { + Path actualFilePath = lastFileName; + if (suffixId > 0) { + actualFilePath = new Path(lastFileName + "_" + suffixId); + } + + appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, actualFilePath); + + appender.enableStats(); + appender.init(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index 44d1270..e27a43d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -18,8 +18,6 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; @@ -39,8 +37,6 @@ import java.util.Map; * This class is a physical operator to store at column partitioned table. */ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec { - private static Log LOG = LogFactory.getLog(HashBasedColPartitionStoreExec.class); - private final Map<String, Appender> appenderMap = new HashMap<String, Appender>(); public HashBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) @@ -56,7 +52,7 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec { Appender appender = appenderMap.get(partition); if (appender == null) { - appender = makeAppender(partition); + appender = getNextPartitionAppender(partition); appenderMap.put(partition, appender); } else { appender = appenderMap.get(partition); http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index fdd1839..0909c76 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -18,6 +18,13 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.SessionVars; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.engine.planner.logical.NodeType; +import org.apache.tajo.engine.planner.logical.PersistentStoreNode; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.storage.StorageConstants; + import java.util.Stack; public class PhysicalPlanUtil { @@ -36,4 +43,60 @@ public class PhysicalPlanUtil { } } } + + /** + * Set nullChar to TableMeta according to file format + * + * @param meta TableMeta + * @param nullChar A character for NULL representation + */ + private static void setNullCharForTextSerializer(TableMeta meta, String nullChar) { + switch (meta.getStoreType()) { + case CSV: + meta.putOption(StorageConstants.CSVFILE_NULL, nullChar); + break; + case RCFILE: + meta.putOption(StorageConstants.RCFILE_NULL, nullChar); + break; + case SEQUENCEFILE: + meta.putOption(StorageConstants.SEQUENCEFILE_NULL, nullChar); + break; + default: // nothing to do + } + } + + /** + * Check if TableMeta contains NULL char property according to file format + * + * @param meta Table Meta + * @return True if TableMeta contains NULL char property according to file format + */ + public static boolean containsNullChar(TableMeta meta) { + switch (meta.getStoreType()) { + case CSV: + return meta.containsOption(StorageConstants.CSVFILE_NULL); + case RCFILE: + return meta.containsOption(StorageConstants.RCFILE_NULL); + case SEQUENCEFILE: + return meta.containsOption(StorageConstants.SEQUENCEFILE_NULL); + default: // nothing to do + return false; + } + } + + /** + * Set session variable null char TableMeta if necessary + * + * @param context QueryContext + * @param plan StoreTableNode + * @param meta TableMeta + */ + public static void setNullCharIfNecessary(QueryContext context, PersistentStoreNode plan, TableMeta meta) { + if (plan.getType() != NodeType.INSERT) { + // table property in TableMeta is the first priority, and session is the second priority + if (!containsNullChar(meta) && context.containsKey(SessionVars.NULL_CHAR)) { + setNullCharForTextSerializer(meta, context.get(SessionVars.NULL_CHAR)); + } + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index 9ce455f..6084f0e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -21,13 +21,9 @@ */ package org.apache.tajo.engine.planner.physical; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.statistics.StatisticsUtil; -import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.planner.logical.StoreTableNode; -import org.apache.tajo.storage.Appender; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.StringUtils; @@ -40,14 +36,9 @@ import java.io.IOException; * ascending or descending order of partition columns. */ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { - private static Log LOG = LogFactory.getLog(SortBasedColPartitionStoreExec.class); - private Tuple currentKey; private Tuple prevKey; - private Appender appender; - private TableStats aggregated; - public SortBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException { super(context, plan, child); @@ -57,12 +48,6 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { super.init(); currentKey = new VTuple(keyNum); - aggregated = new TableStats(); - } - - private Appender getAppender(String partition) throws IOException { - this.appender = makeAppender(partition); - return appender; } private void fillKeyTuple(Tuple inTuple, Tuple keyTuple) { @@ -93,19 +78,30 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { fillKeyTuple(tuple, currentKey); if (prevKey == null) { - appender = getAppender(getSubdirectory(currentKey)); + appender = getNextPartitionAppender(getSubdirectory(currentKey)); prevKey = new VTuple(currentKey); } else { - if (!prevKey.equals(currentKey) && !getSubdirectory(prevKey).equalsIgnoreCase(getSubdirectory(currentKey))) { + if (!prevKey.equals(currentKey)) { appender.close(); - StatisticsUtil.aggregateTableStat(aggregated, appender.getStats()); + StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); - appender = getAppender(getSubdirectory(currentKey)); + appender = getNextPartitionAppender(getSubdirectory(currentKey)); prevKey = new VTuple(currentKey); + + // reset all states for file rotating + writtenFileNum = 0; } } appender.addTuple(tuple); + + if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) { + appender.close(); + writtenFileNum++; + StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); + + openAppender(writtenFileNum); + } } return null; @@ -115,8 +111,10 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { public void close() throws IOException { if (appender != null) { appender.close(); - StatisticsUtil.aggregateTableStat(aggregated, appender.getStats()); - context.setResultStats(aggregated); + + // Collect statistics data + StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); + context.setResultStats(aggregatedStats); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index e73cc2f..fd0c04f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -18,15 +18,20 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.StatisticsUtil; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.logical.InsertNode; import org.apache.tajo.engine.planner.logical.PersistentStoreNode; import org.apache.tajo.storage.Appender; -import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -35,10 +40,19 @@ import java.io.IOException; * This is a physical executor to store a table part into a specified storage. */ public class StoreTableExec extends UnaryPhysicalExec { + private static final Log LOG = LogFactory.getLog(StoreTableExec.class); + private PersistentStoreNode plan; + private TableMeta meta; private Appender appender; private Tuple tuple; + // for file punctuation + private TableStats sumStats; // for aggregating all stats of written files + private long maxPerFileSize = Long.MAX_VALUE; // default max file size is 2^63 + private int writtenFileNum = 0; // how many file are written so far? + private Path lastFileName; // latest written file name + public StoreTableExec(TaskAttemptContext context, PersistentStoreNode plan, PhysicalExec child) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), child); this.plan = plan; @@ -47,26 +61,47 @@ public class StoreTableExec extends UnaryPhysicalExec { public void init() throws IOException { super.init(); - TableMeta meta; if (plan.hasOptions()) { meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions()); } else { meta = CatalogUtil.newTableMeta(plan.getStorageType()); } + PhysicalPlanUtil.setNullCharIfNecessary(context.getQueryContext(), plan, meta); + + if (context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) { + maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB; + } + + openNewFile(writtenFileNum); + sumStats = new TableStats(); + } + + public void openNewFile(int suffixId) throws IOException { + String prevFile = null; + + lastFileName = context.getOutputPath(); + if (suffixId > 0) { + prevFile = lastFileName.toString(); + + lastFileName = new Path(lastFileName + "_" + suffixId); + } + if (plan instanceof InsertNode) { InsertNode createTableNode = (InsertNode) plan; appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, createTableNode.getTableSchema(), context.getOutputPath()); } else { - String nullChar = context.getQueryContext().get(SessionVars.NULL_CHAR); - meta.putOption(StorageConstants.CSVFILE_NULL, nullChar); - appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, - context.getOutputPath()); + appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, lastFileName); } appender.enableStats(); appender.init(); + + if (suffixId > 0) { + LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " + + "The remain output will be written into " + lastFileName.toString()); + } } /* (non-Javadoc) @@ -76,8 +111,16 @@ public class StoreTableExec extends UnaryPhysicalExec { public Tuple next() throws IOException { while((tuple = child.next()) != null) { appender.addTuple(tuple); - } + if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) { + appender.close(); + + writtenFileNum++; + StatisticsUtil.aggregateTableStat(sumStats, appender.getStats()); + openNewFile(writtenFileNum); + } + } + return null; } @@ -93,7 +136,12 @@ public class StoreTableExec extends UnaryPhysicalExec { appender.flush(); appender.close(); // Collect statistics data - context.setResultStats(appender.getStats()); + if (sumStats == null) { + context.setResultStats(appender.getStats()); + } else { + StatisticsUtil.aggregateTableStat(sumStats, appender.getStats()); + context.setResultStats(sumStats); + } if (context.getTaskId() != null) { context.addShuffleFileOutput(0, context.getTaskId().toString()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java index cdf552d..1f21e2a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java @@ -28,7 +28,7 @@ import java.util.Map; import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto; -public class Session implements SessionConstants, ProtoObject<SessionProto> { +public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable { private final String sessionId; private final String userName; private String currentDatabase; @@ -133,4 +133,10 @@ public class Session implements SessionConstants, ProtoObject<SessionProto> { public String toString() { return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + getLastAccessTime(); } + + public Session clone() throws CloneNotSupportedException { + Session newSession = (Session) super.clone(); + newSession.sessionVariables.putAll(getAllVariables()); + return newSession; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index e4d8b0b..191db85 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -29,6 +29,7 @@ import org.apache.tajo.*; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; @@ -50,6 +51,7 @@ import org.apache.tajo.storage.*; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; @@ -88,6 +90,7 @@ public class TestPhysicalPlanner { private static TableDesc employee = null; private static TableDesc score = null; + private static TableDesc largeScore = null; private static MasterPlan masterPlan; @@ -170,6 +173,45 @@ public class TestPhysicalPlanner { defaultContext = LocalTajoTestingUtility.createDummyContext(conf); masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); + + createLargeScoreTable(); + } + + public static void createLargeScoreTable() throws IOException { + // Preparing a large table + Path scoreLargePath = new Path(testDir, "score_large"); + CommonTestingUtil.cleanupTestDir(scoreLargePath.toString()); + + Schema scoreSchmea = score.getSchema(); + TableMeta scoreLargeMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet()); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreLargeMeta, scoreSchmea, + scoreLargePath); + appender.enableStats(); + appender.init(); + largeScore = new TableDesc( + CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score_large"), scoreSchmea, scoreLargeMeta, + scoreLargePath); + + Tuple tuple = new VTuple(scoreSchmea.size()); + int m = 0; + for (int i = 1; i <= 40000; i++) { + for (int k = 3; k < 5; k++) { // |{3,4}| = 2 + for (int j = 1; j <= 3; j++) { // |{1,2,3}| = 3 + tuple.put( + new Datum[] { + DatumFactory.createText("name_" + i), // name_1 ~ 5 (cad: // 5) + DatumFactory.createText(k + "rd"), // 3 or 4rd (cad: 2) + DatumFactory.createInt4(j), // 1 ~ 3 + m % 3 == 1 ? DatumFactory.createText("one") : NullDatum.get()}); + appender.addTuple(tuple); + m++; + } + } + } + appender.flush(); + appender.close(); + largeScore.setStats(appender.getStats()); + catalog.createTable(largeScore); } @AfterClass @@ -375,7 +417,10 @@ public class TestPhysicalPlanner { private String[] CreateTableAsStmts = { "create table grouped1 as select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 0 "create table grouped2 using rcfile as select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 1 - "create table grouped3 partition by column (dept text, class text) as select sum(score), max(score), min(score), deptName, class from score group by deptName, class", // 2 + "create table grouped3 partition by column (dept text, class text) as select sum(score), max(score), min(score), deptName, class from score group by deptName, class", // 2, + "create table score_large_output as select * from score_large", // 4 + "CREATE TABLE score_part (deptname text, score int4, nullable text) PARTITION BY COLUMN (class text) " + + "AS SELECT deptname, score, nullable, class from score_large" // 5 }; @Test @@ -421,6 +466,62 @@ public class TestPhysicalPlanner { } @Test + public final void testStorePlanWithMaxOutputFileSize() throws IOException, PlanningException, + CloneNotSupportedException { + + TableStats stats = largeScore.getStats(); + assertTrue("Checking meaningfulness of test", stats.getNumBytes() > StorageUnit.MB); + + FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(), + largeScore.getPath(), Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithMaxOutputFileSize"); + + QueryContext queryContext = new QueryContext(conf, session); + queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1); + + TaskAttemptContext ctx = new TaskAttemptContext( + queryContext, + LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + ctx.setOutputPath(new Path(workDir, "maxOutput")); + + Expr context = analyzer.parse(CreateTableAsStmts[3]); + + LogicalPlan plan = planner.createPlan(queryContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + + // executing StoreTableExec + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + + // checking the number of punctuated files + int expectedFileNum = (int) (stats.getNumBytes() / (float) StorageUnit.MB); + FileSystem fs = ctx.getOutputPath().getFileSystem(conf); + FileStatus [] statuses = fs.listStatus(ctx.getOutputPath().getParent()); + assertEquals(expectedFileNum, statuses.length); + + // checking the file contents + long totalNum = 0; + for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) { + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner( + CatalogUtil.newTableMeta(StoreType.CSV), + rootNode.getOutSchema(), + status.getPath()); + + scanner.init(); + while ((scanner.next()) != null) { + totalNum++; + } + scanner.close(); + } + assertTrue(totalNum == ctx.getResultStats().getNumRows()); + } + + @Test public final void testStorePlanWithRCFile() throws IOException, PlanningException { FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); @@ -596,6 +697,71 @@ public class TestPhysicalPlanner { } @Test + public final void testPartitionedStorePlanWithMaxFileSize() throws IOException, PlanningException { + + // Preparing working dir and input fragments + FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(), + largeScore.getPath(), Integer.MAX_VALUE); + QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan); + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlanWithMaxFileSize"); + + // Setting session variables + QueryContext queryContext = new QueryContext(conf, session); + queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1); + + // Preparing task context + TaskAttemptContext ctx = new TaskAttemptContext(queryContext, id, new FileFragment[] { frags[0] }, workDir); + ctx.setOutputPath(new Path(workDir, "part-01-000000")); + // SortBasedColumnPartitionStoreExec will be chosen by default. + ctx.setEnforcer(new Enforcer()); + Expr context = analyzer.parse(CreateTableAsStmts[4]); + LogicalPlan plan = planner.createPlan(queryContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + + // Executing CREATE TABLE PARTITION BY + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + + FileSystem fs = sm.getFileSystem(); + FileStatus [] list = fs.listStatus(workDir); + // checking the number of partitions + assertEquals(2, list.length); + + List<FileFragment> fragments = Lists.newArrayList(); + int i = 0; + for (FileStatus status : list) { + assertTrue(status.isDirectory()); + + long fileVolumSum = 0; + FileStatus [] fileStatuses = fs.listStatus(status.getPath()); + for (FileStatus fileStatus : fileStatuses) { + fileVolumSum += fileStatus.getLen(); + fragments.add(new FileFragment("partition", fileStatus.getPath(), 0, fileStatus.getLen())); + } + assertTrue("checking the meaningfulness of test", fileVolumSum > StorageUnit.MB && fileStatuses.length > 1); + + long expectedFileNum = (long) Math.ceil(fileVolumSum / (float)StorageUnit.MB); + assertEquals(expectedFileNum, fileStatuses.length); + } + TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV); + Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments)); + scanner.init(); + + long rowNum = 0; + while (scanner.next() != null) { + rowNum++; + } + + // checking the number of all written rows + assertTrue(largeScore.getStats().getNumRows() == rowNum); + + scanner.close(); + } + + @Test public final void testPartitionedStorePlanWithEmptyGroupingSet() throws IOException, PlanningException { FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(), http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java index 5b42cbd..c5e96ac 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java @@ -31,6 +31,8 @@ public interface Appender extends Closeable { void flush() throws IOException; + long getEstimatedOutputSize() throws IOException; + void close() throws IOException; void enableStats(); http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java index 064841f..04278e9 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -57,5 +57,9 @@ public abstract class FileAppender implements Appender { this.enabledStats = true; } + public long getEstimatedOutputSize() throws IOException { + return getOffset(); + } + public abstract long getOffset() throws IOException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java index 934fd94..40cad32 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java @@ -145,6 +145,11 @@ public class HashShuffleAppender implements Appender { } @Override + public long getEstimatedOutputSize() throws IOException { + return pageSize * pages.size(); + } + + @Override public void close() throws IOException { synchronized(appender) { if (closed.get()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index 10b9331..ff9e43c 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -129,6 +129,10 @@ public class ParquetAppender extends FileAppender { writer.close(); } + public long getEstimatedOutputSize() throws IOException { + return writer.getEstimatedWrittenSize(); + } + /** * If table statistics is enabled, retrieve the table statistics. * http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java index 8ce4b1c..7410d2b 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java @@ -139,6 +139,10 @@ class InternalParquetRecordWriter<T> { } } + public long getEstimatedWrittenSize() throws IOException { + return w.getPos() + store.memSize(); + } + private void flushStore() throws IOException { LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java index 743d168..0447a47 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java @@ -210,6 +210,10 @@ public class ParquetWriter<T> implements Closeable { } } + public long getEstimatedWrittenSize() throws IOException { + return this.writer.getEstimatedWrittenSize(); + } + @Override public void close() throws IOException { try { http://git-wip-us.apache.org/repos/asf/tajo/blob/a1711d16/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index 61f4682..cae0357 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -21,7 +21,6 @@ package org.apache.tajo.storage; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; @@ -48,7 +47,6 @@ import java.util.Arrays; import java.util.Collection; import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public class TestCompressionStorages {
