Repository: tajo
Updated Branches:
  refs/heads/master ea5ce54d8 -> f3d63b46b


http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index a184a9a..e4d8b0b 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -25,15 +25,13 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
@@ -64,6 +62,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -534,9 +533,8 @@ public class TestPhysicalPlanner {
     FileFragment[] frags = StorageManager.splitNG(conf, "default.score", 
score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     QueryUnitAttemptId id = 
LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
-    Path workDir = 
CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        id, new FileFragment[] { frags[0] }, workDir);
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), 
id, new FileFragment[] { frags[0] },
+        
CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan"));
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(defaultContext, context);
@@ -553,27 +551,35 @@ public class TestPhysicalPlanner {
     TableMeta outputMeta = 
CatalogUtil.newTableMeta(dataChannel.getStoreType());
 
     FileSystem fs = sm.getFileSystem();
+    QueryId queryId = id.getQueryUnitId().getExecutionBlockId().getQueryId();
+    ExecutionBlockId ebId = id.getQueryUnitId().getExecutionBlockId();
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
     exec.close();
+    ctx.getHashShuffleAppenderManager().close(ebId);
 
-    Path path = new Path(workDir, "output");
-    FileStatus [] list = fs.listStatus(path);
-    assertEquals(numPartitions, list.length);
+    String executionBlockBaseDir = queryId.toString() + "/output" + "/" + 
ebId.getId() + "/hash-shuffle";
+    Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) 
+ "/" + executionBlockBaseDir);
+    FileStatus [] list = fs.listStatus(queryLocalTmpDir);
 
-    FileFragment[] fragments = new FileFragment[list.length];
-    int i = 0;
+    List<FileFragment> fragments = new ArrayList<FileFragment>();
     for (FileStatus status : list) {
-      fragments[i++] = new FileFragment("partition", status.getPath(), 0, 
status.getLen());
+      assertTrue(status.isDirectory());
+      FileStatus [] files = fs.listStatus(status.getPath());
+      for (FileStatus eachFile: files) {
+        fragments.add(new FileFragment("partition", eachFile.getPath(), 0, 
eachFile.getLen()));
+      }
     }
+
+    assertEquals(numPartitions, fragments.size());
     Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), 
outputMeta, TUtil.newList(fragments));
     scanner.init();
 
     Tuple tuple;
-    i = 0;
+    int i = 0;
     while ((tuple = scanner.next()) != null) {
       assertEquals(6, tuple.get(2).asInt4()); // sum
       assertEquals(3, tuple.get(3).asInt4()); // max
@@ -585,6 +591,8 @@ public class TestPhysicalPlanner {
 
     // Examine the statistics information
     assertEquals(10, ctx.getResultStats().getNumRows().longValue());
+
+    fs.delete(queryLocalTmpDir, true);
   }
 
   @Test
@@ -611,27 +619,36 @@ public class TestPhysicalPlanner {
 
     TableMeta outputMeta = 
CatalogUtil.newTableMeta(dataChannel.getStoreType());
 
+    FileSystem fs = sm.getFileSystem();
+    QueryId queryId = id.getQueryUnitId().getExecutionBlockId().getQueryId();
+    ExecutionBlockId ebId = id.getQueryUnitId().getExecutionBlockId();
+
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
     exec.close();
+    ctx.getHashShuffleAppenderManager().close(ebId);
 
-    Path path = new Path(workDir, "output");
-    FileSystem fs = sm.getFileSystem();
-
-    FileStatus [] list = fs.listStatus(path);
-    assertEquals(numPartitions, list.length);
+    String executionBlockBaseDir = queryId.toString() + "/output" + "/" + 
ebId.getId() + "/hash-shuffle";
+    Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) 
+ "/" + executionBlockBaseDir);
+    FileStatus [] list = fs.listStatus(queryLocalTmpDir);
 
-    FileFragment[] fragments = new FileFragment[list.length];
-    int i = 0;
+    List<FileFragment> fragments = new ArrayList<FileFragment>();
     for (FileStatus status : list) {
-      fragments[i++] = new FileFragment("partition", status.getPath(), 0, 
status.getLen());
+      assertTrue(status.isDirectory());
+      FileStatus [] files = fs.listStatus(status.getPath());
+      for (FileStatus eachFile: files) {
+        fragments.add(new FileFragment("partition", eachFile.getPath(), 0, 
eachFile.getLen()));
+      }
     }
+
+    assertEquals(numPartitions, fragments.size());
+
     Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), 
outputMeta, TUtil.newList(fragments));
     scanner.init();
     Tuple tuple;
-    i = 0;
+    int i = 0;
     while ((tuple = scanner.next()) != null) {
       assertEquals(60, tuple.get(0).asInt4()); // sum
       assertEquals(3, tuple.get(1).asInt4()); // max
@@ -643,6 +660,7 @@ public class TestPhysicalPlanner {
 
     // Examine the statistics information
     assertEquals(1, ctx.getResultStats().getNumRows().longValue());
+    fs.delete(queryLocalTmpDir, true);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index ffb1915..fccec26 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -687,12 +687,8 @@ public class TestGroupByQuery extends QueryTestCaseBase {
       assertTrue(!subQueries.isEmpty());
       for (SubQuery subQuery: subQueries) {
         if (subQuery.getId().toStringNoPrefix().endsWith("_000001")) {
-          QueryUnit[] queryUnits = subQuery.getQueryUnits();
-          assertNotNull(queryUnits);
-          for (QueryUnit eachQueryUnit: queryUnits) {
-            for (ShuffleFileOutput output: 
eachQueryUnit.getShuffleFileOutputs()) {
-              partitionIds.add(output.getPartId());
-            }
+          for (QueryUnit.IntermediateEntry eachInterm: 
subQuery.getHashShuffleIntermediateEntries()) {
+            partitionIds.add(eachInterm.getPartId());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index d80fdb5..5bf2944 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -31,25 +31,36 @@ import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.master.querymaster.Query;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.worker.TajoWorker;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.sql.ResultSet;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static 
org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE;
 import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 public class TestTablePartitions extends QueryTestCaseBase {
 
@@ -788,6 +799,67 @@ public class TestTablePartitions extends QueryTestCaseBase 
{
   }
 
   @Test
+  public void testScatteredHashShuffle() throws Exception {
+    
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname,
 "2");
+    
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.varname,
 "1");
+    try {
+      KeyValueSet tableOptions = new KeyValueSet();
+      tableOptions.set(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+      tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+      Schema schema = new Schema();
+      schema.addColumn("col1", TajoDataTypes.Type.TEXT);
+      schema.addColumn("col2", TajoDataTypes.Type.TEXT);
+
+      List<String> data = new ArrayList<String>();
+      int totalBytes = 0;
+      Random rand = new Random(System.currentTimeMillis());
+      String col2Data = 
"Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2"
 +
+          
"Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2"
 +
+          
"Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2";
+
+      int index = 0;
+      while(true) {
+        int col1RandomValue = 1;
+        String str = col1RandomValue + "|col2-" + index + "-" + col2Data;
+        data.add(str);
+
+        totalBytes += str.getBytes().length;
+
+        if (totalBytes > 4 * 1024 * 1024) {
+          break;
+        }
+        index++;
+      }
+
+      TajoTestingCluster.createTable("testscatteredhashshuffle", schema, 
tableOptions, data.toArray(new String[]{}), 3);
+      CatalogService catalog = testingCluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable("default", "testscatteredhashshuffle"));
+
+      executeString("create table test_partition (col2 text) partition by 
column (col1 text)").close();
+      executeString("insert into test_partition select col2, col1 from 
testscatteredhashshuffle").close();
+
+      ResultSet res = executeString("select col1 from test_partition");
+
+      int numRows = 0;
+      while (res.next()) {
+        numRows++;
+      }
+      assertEquals(data.size(), numRows);
+
+      // assert data file size
+
+    } finally {
+      
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname,
+          TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.defaultVal);
+      
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.varname,
+          TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.defaultVal);
+      executeString("DROP TABLE test_partition PURGE").close();
+      executeString("DROP TABLE testScatteredHashShuffle PURGE").close();
+    }
+  }
+
+  @Test
   public final void TestSpecialCharPartitionKeys1() throws Exception {
     // See - TAJO-947: ColPartitionStoreExec can cause URISyntaxException due 
to special characters.
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java 
b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 29aeccd..f969a08 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -26,9 +26,9 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.TestTajoIds;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.master.querymaster.Repartitioner;
 import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.FetchImpl;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.junit.Test;
@@ -48,45 +48,62 @@ public class TestRepartitioner {
     String hostName = "tajo1";
     int port = 1234;
     ExecutionBlockId sid = new ExecutionBlockId(q1, 2);
-    int partitionId = 2;
+    int numPartition = 10;
 
-    List<QueryUnit.IntermediateEntry> intermediateEntries = TUtil.newList();
+    Map<Integer, List<IntermediateEntry>> intermediateEntries = new 
HashMap<Integer, List<IntermediateEntry>>();
+    for (int i = 0; i < numPartition; i++) {
+      intermediateEntries.put(i, new ArrayList<IntermediateEntry>());
+    }
     for (int i = 0; i < 1000; i++) {
-      intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, 
partitionId, new QueryUnit.PullHost(hostName, port)));
+      int partitionId = i % numPartition;
+      IntermediateEntry entry = new IntermediateEntry(i, 0, partitionId, new 
QueryUnit.PullHost(hostName, port));
+      entry.setEbId(sid);
+      entry.setVolume(10);
+      intermediateEntries.get(partitionId).add(entry);
     }
 
-    FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), 
ShuffleType.HASH_SHUFFLE,
-        sid, partitionId, intermediateEntries);
+    Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries =
+        new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>();
+
+    for (Map.Entry<Integer, List<IntermediateEntry>> eachEntry: 
intermediateEntries.entrySet()) {
+      FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), 
TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE,
+          sid, eachEntry.getKey(), eachEntry.getValue());
+
+      fetch.setName(sid.toString());
+
+      TajoWorkerProtocol.FetchProto proto = fetch.getProto();
+      fetch = new FetchImpl(proto);
+      assertEquals(proto, fetch.getProto());
 
-    fetch.setName(sid.toString());
+      Map<ExecutionBlockId, List<IntermediateEntry>> ebEntries = new 
HashMap<ExecutionBlockId, List<IntermediateEntry>>();
+      ebEntries.put(sid, eachEntry.getValue());
 
-    TajoWorkerProtocol.FetchProto proto = fetch.getProto();
-    fetch = new FetchImpl(proto);
-    assertEquals(proto, fetch.getProto());
-    List<URI> uris = fetch.getURIs();
+      hashEntries.put(eachEntry.getKey(), ebEntries);
 
-    List<String> taList = TUtil.newList();
-    for (URI uri : uris) {
+      List<URI> uris = fetch.getURIs();
+      assertEquals(1, uris.size());   //In Hash Suffle, Fetcher return only 
one URI per partition.
+
+      URI uri = uris.get(0);
       final Map<String, List<String>> params =
           new QueryStringDecoder(uri).getParameters();
-      taList.addAll(splitMaps(params.get("ta")));
-    }
 
-    int checkTaskId = 0;
-    for (String ta : taList) {
-      assertEquals(checkTaskId++, Integer.parseInt(ta.split("_")[0]));
+      assertEquals(eachEntry.getKey().toString(), params.get("p").get(0));
+      assertEquals("h", params.get("type").get(0));
+      assertEquals("" + sid.getId(), params.get("sid").get(0));
     }
-  }
 
-  private List<String> splitMaps(List<String> mapq) {
-    if (null == mapq) {
-      return null;
-    }
-    final List<String> ret = new ArrayList<String>();
-    for (String s : mapq) {
-      Collections.addAll(ret, s.split(","));
+    Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> 
mergedHashEntries =
+        Repartitioner.mergeIntermediateByPullHost(hashEntries);
+
+    assertEquals(numPartition, mergedHashEntries.size());
+    for (int i = 0; i < numPartition; i++) {
+      Map<ExecutionBlockId, List<IntermediateEntry>> eachEntry = 
mergedHashEntries.get(0);
+      assertEquals(1, eachEntry.size());
+      List<IntermediateEntry> interEntry = eachEntry.get(sid);
+      assertEquals(1, interEntry.size());
+
+      assertEquals(1000, interEntry.get(0).getVolume());
     }
-    return ret;
   }
 
   @Test
@@ -148,6 +165,244 @@ public class TestRepartitioner {
     }
   }
 
+  @Test
+  public void testMergeIntermediates() {
+    //Test Merge
+    List<IntermediateEntry> intermediateEntries = new 
ArrayList<IntermediateEntry>();
+
+    int[] pageLengths = {10 * 1024 * 1024, 10 * 1024 * 1024, 10 * 1024 * 1024, 
5 * 1024 * 1024};   //35 MB
+    long expectedTotalLength = 0;
+    for (int i = 0; i < 20; i++) {
+      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+      long offset = 0;
+      for (int j = 0; j < pageLengths.length; j++) {
+        pages.add(new Pair(offset, pageLengths[j]));
+        offset += pageLengths[j];
+        expectedTotalLength += pageLengths[j];
+      }
+      IntermediateEntry interm = new IntermediateEntry(i, -1, -1, new 
QueryUnit.PullHost("" + i, i));
+      interm.setPages(pages);
+      interm.setVolume(offset);
+      intermediateEntries.add(interm);
+    }
+
+    long splitVolume = 128 * 1024 * 1024;
+    List<List<FetchImpl>> fetches = 
Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+        splitVolume, 10 * 1024 * 1024);
+    assertEquals(6, fetches.size());
+
+    int totalInterms = 0;
+    int index = 0;
+    int numZeroPosFetcher = 0;
+    long totalLength = 0;
+    for (List<FetchImpl> eachFetchList: fetches) {
+      totalInterms += eachFetchList.size();
+      long eachFetchVolume = 0;
+      for (FetchImpl eachFetch: eachFetchList) {
+        eachFetchVolume += eachFetch.getLength();
+        if (eachFetch.getOffset() == 0) {
+          numZeroPosFetcher++;
+        }
+        totalLength += eachFetch.getLength();
+      }
+      assertTrue(eachFetchVolume + " should be smaller than splitVolume", 
eachFetchVolume < splitVolume);
+      if (index < fetches.size() - 1) {
+        assertTrue(eachFetchVolume + " should be great than 100MB", 
eachFetchVolume >= 100 * 1024 * 1024);
+      }
+      index++;
+    }
+    assertEquals(23, totalInterms);
+    assertEquals(20, numZeroPosFetcher);
+    assertEquals(expectedTotalLength, totalLength);
+  }
+
+  @Test
+  public void testSplitIntermediates() {
+    List<IntermediateEntry> intermediateEntries = new 
ArrayList<IntermediateEntry>();
+
+    int[] pageLengths = new int[20];  //195MB
+    for (int i = 0 ; i < pageLengths.length; i++) {
+      if (i < pageLengths.length - 1) {
+        pageLengths[i] =  10 * 1024 * 1024;
+      } else {
+        pageLengths[i] =  5 * 1024 * 1024;
+      }
+    }
+
+    long expectedTotalLength = 0;
+    for (int i = 0; i < 20; i++) {
+      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+      long offset = 0;
+      for (int j = 0; j < pageLengths.length; j++) {
+        pages.add(new Pair(offset, pageLengths[j]));
+        offset += pageLengths[j];
+        expectedTotalLength += pageLengths[j];
+      }
+      IntermediateEntry interm = new IntermediateEntry(i, -1, 0, new 
QueryUnit.PullHost("" + i, i));
+      interm.setPages(pages);
+      interm.setVolume(offset);
+      intermediateEntries.add(interm);
+    }
+
+    long splitVolume = 128 * 1024 * 1024;
+    List<List<FetchImpl>> fetches = 
Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+        splitVolume, 10 * 1024 * 1024);
+    assertEquals(32, fetches.size());
+
+    int index = 0;
+    int numZeroPosFetcher = 0;
+    long totalLength = 0;
+    Set<String> uniqPullHost = new HashSet<String>();
+
+    for (List<FetchImpl> eachFetchList: fetches) {
+      long length = 0;
+      for (FetchImpl eachFetch: eachFetchList) {
+        if (eachFetch.getOffset() == 0) {
+          numZeroPosFetcher++;
+        }
+        totalLength += eachFetch.getLength();
+        length += eachFetch.getLength();
+        uniqPullHost.add(eachFetch.getPullHost().toString());
+      }
+      assertTrue(length + " should be smaller than splitVolume", length < 
splitVolume);
+      if (index < fetches.size() - 1) {
+        assertTrue(length + " should be great than 100MB" + fetches.size() + 
"," + index, length >= 100 * 1024 * 1024);
+      }
+      index++;
+    }
+    assertEquals(20, numZeroPosFetcher);
+    assertEquals(20, uniqPullHost.size());
+    assertEquals(expectedTotalLength, totalLength);
+  }
+
+  @Test
+  public void testSplitIntermediates2() {
+    long[][] pageDatas = {
+        {0, 10538717},
+        {10538717, 10515884},
+        {21054601, 10514343},
+        {31568944, 10493988},
+        {42062932, 10560639},
+        {52623571, 10548486},
+        {63172057, 10537811},
+        {73709868, 10571060},
+        {84280928, 10515062},
+        {94795990, 10502964},
+        {105298954, 10514011},
+        {115812965, 10532154},
+        {126345119, 10534133},
+        {136879252, 10549749},
+        {147429001, 10566547},
+        {157995548, 10543700},
+        {168539248, 10490324},
+        {179029572, 10500720},
+        {189530292, 10505425},
+        {200035717, 10548418},
+        {210584135, 10562887},
+        {221147022, 10554967},
+        {231701989, 10507297},
+        {242209286, 10515612},
+        {252724898, 10491274},
+        {263216172, 10512956},
+        {273729128, 10490736},
+        {284219864, 10501878},
+        {294721742, 10564568},
+        {305286310, 10488896},
+        {315775206, 10516308},
+        {326291514, 10517965},
+        {336809479, 10487038},
+        {347296517, 10603472},
+        {357899989, 10507330},
+        {368407319, 10549429},
+        {378956748, 10533443},
+        {389490191, 10530852},
+        {400021043, 11036431},
+        {411057474, 10541007},
+        {421598481, 10600477},
+        {432198958, 10519805},
+        {442718763, 10500769},
+        {453219532, 10507192},
+        {463726724, 10540424},
+        {474267148, 10509129},
+        {484776277, 10527100},
+        {495303377, 10720789},
+        {506024166, 10568542},
+        {516592708, 11046886},
+        {527639594, 10580358},
+        {538219952, 10508940},
+        {548728892, 10523968},
+        {559252860, 10580626},
+        {569833486, 10539361},
+        {580372847, 10496662},
+        {590869509, 10505280},
+        {601374789, 10564655},
+        {611939444, 10505842},
+        {622445286, 10523889},
+        {632969175, 10553186},
+        {643522361, 10535866},
+        {654058227, 10501796},
+        {664560023, 10530358},
+        {675090381, 10585340},
+        {685675721, 10602017},
+        {696277738, 10546614},
+        {706824352, 10511511},
+        {717335863, 11019221},
+        {728355084, 10558143},
+        {738913227, 10516245},
+        {749429472, 10502613},
+        {759932085, 10522145},
+        {770454230, 10489373},
+        {780943603, 10520973},
+        {791464576, 11021218},
+        {802485794, 10496362},
+        {812982156, 10502354},
+        {823484510, 10515932},
+        {834000442, 10591044},
+        {844591486, 5523957}
+    };
+
+    List<IntermediateEntry> entries = new ArrayList<IntermediateEntry>();
+    for (int i = 0; i < 2; i++) {
+      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+      for (int j = 0; j < pageDatas.length; j++) {
+        pages.add(new Pair(pageDatas[j][0], (int) (pageDatas[j][1])));
+      }
+      IntermediateEntry entry = new IntermediateEntry(-1, -1, 1, new 
QueryUnit.PullHost("host" + i , 9000));
+      entry.setPages(pages);
+
+      entries.add(entry);
+    }
+
+    long splitVolume = 256 * 1024 * 1024;
+    List<List<FetchImpl>> fetches = 
Repartitioner.splitOrMergeIntermediates(null, entries, splitVolume,
+        10 * 1024 * 1024);
+
+
+    long[][] expected = {
+        {0,263216172},
+        {263216172,264423422},
+        {527639594,263824982},
+        {791464576,58650867},
+        {0,200035717},
+        {200035717,263691007},
+        {463726724,264628360},
+        {728355084,121760359},
+    };
+    int index = 0;
+    for (List<FetchImpl> eachFetchList: fetches) {
+      if (index == 3) {
+        assertEquals(2, eachFetchList.size());
+      } else {
+        assertEquals(1, eachFetchList.size());
+      }
+      for (FetchImpl eachFetch: eachFetchList) {
+        assertEquals(expected[index][0], eachFetch.getOffset());
+        assertEquals(expected[index][1], eachFetch.getLength());
+        index++;
+      }
+    }
+  }
+
   private static void assertFetchImpl(FetchImpl [] expected, Map<String, 
List<FetchImpl>>[] result) {
     Set<FetchImpl> expectedURLs = Sets.newHashSet();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java
 
b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java
new file mode 100644
index 0000000..114b232
--- /dev/null
+++ 
b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.tajo.util.Pair;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestIntermediateEntry {
+  @Test
+  public void testPage() {
+    QueryUnit.IntermediateEntry interm = new QueryUnit.IntermediateEntry(-1, 
-1, 1, null);
+
+    List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+    pages.add(new Pair(0L, 1441275));
+    pages.add(new Pair(1441275L, 1447446));
+    pages.add(new Pair(2888721L, 1442507));
+
+    interm.setPages(pages);
+
+    long splitBytes = 3 * 1024 * 1024;
+
+    List<Pair<Long, Long>> splits = interm.split(splitBytes, splitBytes);
+    assertEquals(2, splits.size());
+
+    long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} 
};
+    for (int i = 0; i < 2; i++) {
+      Pair<Long, Long> eachSplit = splits.get(i);
+      assertEquals(expected[i][0], eachSplit.getFirst().longValue());
+      assertEquals(expected[i][1], eachSplit.getSecond().longValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
 
b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
index b4be00b..d64e4c7 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
@@ -57,7 +57,7 @@ public class TestQueryUnitStatusUpdate extends 
QueryTestCaseBase {
       // tpch/lineitem.tbl
       long[] expectedNumRows = new long[]{5, 2, 2, 2};
       long[] expectedNumBytes = new long[]{604, 18, 18, 8};
-      long[] expectedReadBytes = new long[]{604, 0, 18, 0};
+      long[] expectedReadBytes = new long[]{604, 604, 18, 0};
 
       assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes);
     } finally {
@@ -75,7 +75,7 @@ public class TestQueryUnitStatusUpdate extends 
QueryTestCaseBase {
       // tpch/lineitem.tbl
       long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2};
       long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194};
-      long[] expectedReadBytes = new long[]{604, 0, 162, 0, 138, 0};
+      long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0};
 
       assertStatus(3, expectedNumRows, expectedNumBytes, expectedReadBytes);
     } finally {
@@ -106,7 +106,7 @@ public class TestQueryUnitStatusUpdate extends 
QueryTestCaseBase {
       // in/out * subquery(4)
       long[] expectedNumRows = new long[]{2, 2, 5, 5, 7, 2, 2, 2};
       long[] expectedNumBytes = new long[]{8, 34, 20, 75, 109, 34, 34, 18};
-      long[] expectedReadBytes = new long[]{8, 0, 20, 0, 109, 0, 34, 0};
+      long[] expectedReadBytes = new long[]{8, 8, 20, 20, 109, 0, 34, 0};
 
       assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes);
     } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index c13842b..eb63c27 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -23,8 +23,10 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.junit.After;
@@ -73,11 +75,13 @@ public class TestFetcher {
     Random rnd = new Random();
     QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
     String sid = "1";
-    String ta = "1_0";
     String partId = "1";
 
-    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + 
"/" +ta + "/output/" + partId;
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, 
sid, partId, "h", ta);
+    int partParentId = 
HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    String dataPath = conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) +
+       queryId.toString() + "/output/" + sid + "/hash-shuffle/" + partParentId 
+ "/" + partId;
+
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s", queryId, sid, 
partId, "h");
 
     Path inputPath = new Path(dataPath);
     FSDataOutputStream stream =  LocalFileSystem.get(conf).create(inputPath, 
true);

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
index a2dda76..c6be73b 100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -172,7 +172,6 @@ public abstract class AbstractStorageManager {
     return appender;
   }
 
-
   public TableMeta getTableMeta(Path tablePath) throws IOException {
     TableMeta meta;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
new file mode 100644
index 0000000..934fd94
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HashShuffleAppender implements Appender {
+  private static Log LOG = LogFactory.getLog(HashShuffleAppender.class);
+
+  private FileAppender appender;
+  private AtomicBoolean closed = new AtomicBoolean(false);
+  private int partId;
+
+  private TableStats tableStats;
+
+  //<taskId,<page start offset,<task start, task end>>>
+  private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> 
taskTupleIndexes;
+
+  //page start offset, length
+  private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, 
Integer>>();
+
+  private Pair<Long, Integer> currentPage;
+
+  private int pageSize; //MB
+
+  private int rowNumInPage;
+
+  private int totalRows;
+
+  private long offset;
+
+  private ExecutionBlockId ebId;
+
+  public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, 
FileAppender appender) {
+    this.ebId = ebId;
+    this.partId = partId;
+    this.appender = appender;
+    this.pageSize = pageSize;
+  }
+
+  @Override
+  public void init() throws IOException {
+    currentPage = new Pair(0L, 0);
+    taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, 
Pair<Integer, Integer>>>>();
+    rowNumInPage = 0;
+  }
+
+  /**
+   * Write multiple tuples. Each tuple is written by a FileAppender which is 
responsible specified partition.
+   * After writing if a current page exceeds pageSize, pageOffset will be 
added.
+   * @param taskId
+   * @param tuples
+   * @return written bytes
+   * @throws IOException
+   */
+  public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws 
IOException {
+    synchronized(appender) {
+      if (closed.get()) {
+        return 0;
+      }
+      long currentPos = appender.getOffset();
+
+      for (Tuple eachTuple: tuples) {
+        appender.addTuple(eachTuple);
+      }
+      long posAfterWritten = appender.getOffset();
+
+      int writtenBytes = (int)(posAfterWritten - currentPos);
+
+      int nextRowNum = rowNumInPage + tuples.size();
+      List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = 
taskTupleIndexes.get(taskId);
+      if (taskIndexes == null) {
+        taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+        taskTupleIndexes.put(taskId, taskIndexes);
+      }
+      taskIndexes.add(
+          new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new 
Pair(rowNumInPage, nextRowNum)));
+      rowNumInPage = nextRowNum;
+
+      if (posAfterWritten - currentPage.getFirst() > pageSize) {
+        nextPage(posAfterWritten);
+        rowNumInPage = 0;
+      }
+
+      totalRows += tuples.size();
+      return writtenBytes;
+    }
+  }
+
+  public long getOffset() throws IOException {
+    if (closed.get()) {
+      return offset;
+    } else {
+      return appender.getOffset();
+    }
+  }
+
+  private void nextPage(long pos) {
+    currentPage.setSecond((int) (pos - currentPage.getFirst()));
+    pages.add(currentPage);
+    currentPage = new Pair(pos, 0);
+  }
+
+  @Override
+  public void addTuple(Tuple t) throws IOException {
+    throw new IOException("Not support addTuple, use addTuples()");
+  }
+
+  @Override
+  public void flush() throws IOException {
+    synchronized(appender) {
+      if (closed.get()) {
+        return;
+      }
+      appender.flush();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized(appender) {
+      if (closed.get()) {
+        return;
+      }
+      appender.flush();
+      offset = appender.getOffset();
+      if (offset > currentPage.getFirst()) {
+        nextPage(offset);
+      }
+      appender.close();
+      if (LOG.isDebugEnabled()) {
+        if (!pages.isEmpty()) {
+          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + 
offset + ", pages=" + pages.size()
+              + ", lastPage=" + pages.get(pages.size() - 1));
+        } else {
+          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + 
offset + ", pages=" + pages.size());
+        }
+      }
+      closed.set(true);
+      tableStats = appender.getStats();
+    }
+  }
+
+  @Override
+  public void enableStats() {
+  }
+
+  @Override
+  public TableStats getStats() {
+    synchronized(appender) {
+      return appender.getStats();
+    }
+  }
+
+  public List<Pair<Long, Integer>> getPages() {
+    return pages;
+  }
+
+  public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> 
getTaskTupleIndexes() {
+    return taskTupleIndexes;
+  }
+
+  public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
+    List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, 
Pair<Integer, Integer>>>();
+
+    for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: 
taskTupleIndexes.values()) {
+      merged.addAll(eachFailureIndex);
+    }
+
+    return merged;
+  }
+
+  public void taskFinished(QueryUnitAttemptId taskId) {
+    taskTupleIndexes.remove(taskId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
new file mode 100644
index 0000000..f0699b7
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.util.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HashShuffleAppenderManager {
+  private static final Log LOG = 
LogFactory.getLog(HashShuffleAppenderManager.class);
+
+  private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> 
appenderMap =
+      new ConcurrentHashMap<ExecutionBlockId, Map<Integer, 
PartitionAppenderMeta>>();
+  private TajoConf systemConf;
+  private FileSystem defaultFS;
+  private FileSystem localFS;
+  private LocalDirAllocator lDirAllocator;
+  private int pageSize;
+
+  public HashShuffleAppenderManager(TajoConf systemConf) throws IOException {
+    this.systemConf = systemConf;
+
+    // initialize LocalDirAllocator
+    lDirAllocator = new 
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+    // initialize DFS and LocalFileSystems
+    defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
+    localFS = FileSystem.getLocal(systemConf);
+    pageSize = 
systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024;
+  }
+
+  public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId 
ebId, int partId,
+                              TableMeta meta, Schema outSchema) throws 
IOException {
+    synchronized (appenderMap) {
+      Map<Integer, PartitionAppenderMeta> partitionAppenderMap = 
appenderMap.get(ebId);
+
+      if (partitionAppenderMap == null) {
+        partitionAppenderMap = new ConcurrentHashMap<Integer, 
PartitionAppenderMeta>();
+        appenderMap.put(ebId, partitionAppenderMap);
+      }
+
+      PartitionAppenderMeta partitionAppenderMeta = 
partitionAppenderMap.get(partId);
+      if (partitionAppenderMeta == null) {
+        Path dataFile = getDataFile(ebId, partId);
+        FileSystem fs = dataFile.getFileSystem(systemConf);
+        if (fs.exists(dataFile)) {
+          FileStatus status = fs.getFileStatus(dataFile);
+          LOG.info("File " + dataFile + " already exists, size=" + 
status.getLen());
+        }
+
+        if (!fs.exists(dataFile.getParent())) {
+          fs.mkdirs(dataFile.getParent());
+        }
+        FileAppender appender = (FileAppender) 
StorageManagerFactory.getStorageManager(
+            tajoConf).getAppender(meta, outSchema, dataFile);
+        appender.enableStats();
+        appender.init();
+
+        partitionAppenderMeta = new PartitionAppenderMeta();
+        partitionAppenderMeta.partId = partId;
+        partitionAppenderMeta.dataFile = dataFile;
+        partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, 
pageSize, appender);
+        partitionAppenderMeta.appender.init();
+        partitionAppenderMap.put(partId, partitionAppenderMeta);
+
+        LOG.info("Create Hash shuffle file(partId=" + partId + "): " + 
dataFile);
+      }
+
+      return partitionAppenderMeta.appender;
+    }
+  }
+
+  public static int getPartParentId(int partId, TajoConf tajoConf) {
+    return partId % 
tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
+  }
+
+  private Path getDataFile(ExecutionBlockId ebId, int partId) throws 
IOException {
+    try {
+      // the base dir for an output dir
+      String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" 
+ "/" + ebId.getId() + "/hash-shuffle";
+      Path baseDirPath = 
localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, 
systemConf));
+      //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
+
+      // If EB has many partition, too many shuffle file are in single 
directory.
+      return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, 
systemConf), "" + partId);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new IOException(e);
+    }
+  }
+
+  public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws 
IOException {
+    Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null;
+    synchronized (appenderMap) {
+      partitionAppenderMap = appenderMap.remove(ebId);
+    }
+
+    if (partitionAppenderMap == null) {
+      LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle");
+      return null;
+    }
+
+    // Send Intermediate data to QueryMaster.
+    List<HashShuffleIntermediate> intermEntries = new 
ArrayList<HashShuffleIntermediate>();
+    for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) {
+      try {
+        eachMeta.appender.close();
+        HashShuffleIntermediate intermediate =
+            new HashShuffleIntermediate(eachMeta.partId, 
eachMeta.appender.getOffset(),
+                eachMeta.appender.getPages(),
+                eachMeta.appender.getMergedTupleIndexes());
+        intermEntries.add(intermediate);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        throw e;
+      }
+    }
+
+    LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + 
intermEntries.size());
+
+    return intermEntries;
+  }
+
+  public void finalizeTask(QueryUnitAttemptId taskId) {
+    synchronized (appenderMap) {
+      Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
+        appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId());
+      if (partitionAppenderMap == null) {
+        return;
+      }
+
+      for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) {
+        eachAppender.appender.taskFinished(taskId);
+      }
+    }
+  }
+
+  public static class HashShuffleIntermediate {
+    private int partId;
+
+    private long volume;
+
+    //[<page start offset,<task start, task end>>]
+    private Collection<Pair<Long, Pair<Integer, Integer>>> 
failureTskTupleIndexes;
+
+    //[<page start offset, length>]
+    private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, 
Integer>>();
+
+    public HashShuffleIntermediate(int partId, long volume,
+                                   List<Pair<Long, Integer>> pages,
+                                   Collection<Pair<Long, Pair<Integer, 
Integer>>> failureTskTupleIndexes) {
+      this.partId = partId;
+      this.volume = volume;
+      this.failureTskTupleIndexes = failureTskTupleIndexes;
+      this.pages = pages;
+    }
+
+    public int getPartId() {
+      return partId;
+    }
+
+    public long getVolume() {
+      return volume;
+    }
+
+    public Collection<Pair<Long, Pair<Integer, Integer>>> 
getFailureTskTupleIndexes() {
+      return failureTskTupleIndexes;
+    }
+
+    public List<Pair<Long, Integer>> getPages() {
+      return pages;
+    }
+  }
+
+  static class PartitionAppenderMeta {
+    int partId;
+    HashShuffleAppender appender;
+    Path dataFile;
+
+    public int getPartId() {
+      return partId;
+    }
+
+    public HashShuffleAppender getAppender() {
+      return appender;
+    }
+
+    public Path getDataFile() {
+      return dataFile;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 41d1e05..1f57675 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -729,6 +729,7 @@ public class RawFile {
     @Override
     public TableStats getStats() {
       if (enabledStats) {
+        stats.setNumBytes(pos);
         return stats.getTableStat();
       } else {
         return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 3b0ee1f..e68e351 100644
--- 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -47,6 +47,7 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.pullserver.listener.FileCloseListener;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.Tuple;
@@ -207,10 +208,12 @@ public class TajoPullServerService extends 
AbstractService {
       selector = 
RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
 
       localFS = new LocalFileSystem();
-      super.init(new Configuration(conf));
+      super.init(conf);
 
       this.getConfig().setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
           , TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
+
+      LOG.info("Tajo PullServer initialized: readaheadLength=" + 
readaheadLength);
     } catch (Throwable t) {
       LOG.error(t);
     }
@@ -228,9 +231,11 @@ public class TajoPullServerService extends AbstractService 
{
       throw new RuntimeException(ex);
     }
     bootstrap.setPipelineFactory(pipelineFact);
+
     port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
         ConfVars.PULLSERVER_PORT.defaultIntVal);
     Channel ch = bootstrap.bind(new InetSocketAddress(port));
+
     accepted.add(ch);
     port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
     conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
@@ -373,10 +378,11 @@ public class TajoPullServerService extends 
AbstractService {
       final List<String> taskIdList = params.get("ta");
       final List<String> subQueryIds = params.get("sid");
       final List<String> partIds = params.get("p");
+      final List<String> offsetList = params.get("offset");
+      final List<String> lengthList = params.get("length");
 
-      if (types == null || taskIdList == null || subQueryIds == null || qids 
== null
-          || partIds == null) {
-        sendError(ctx, "Required queryId, type, taskIds, subquery Id, and part 
id",
+      if (types == null || subQueryIds == null || qids == null || partIds == 
null) {
+        sendError(ctx, "Required queryId, type, subquery Id, and part id",
             BAD_REQUEST);
         return;
       }
@@ -387,12 +393,18 @@ public class TajoPullServerService extends 
AbstractService {
         return;
       }
 
-      final List<FileChunk> chunks = Lists.newArrayList();
-
+      String partId = partIds.get(0);
       String queryId = qids.get(0);
       String shuffleType = types.get(0);
       String sid = subQueryIds.get(0);
-      String partId = partIds.get(0);
+
+      long offset = (offsetList != null && !offsetList.isEmpty()) ? 
Long.parseLong(offsetList.get(0)) : -1L;
+      long length = (lengthList != null && !lengthList.isEmpty()) ? 
Long.parseLong(lengthList.get(0)) : -1L;
+
+      if (!shuffleType.equals("h") && !shuffleType.equals("s") && taskIdList 
== null) {
+        sendError(ctx, "Required taskIds", BAD_REQUEST);
+      }
+
       List<String> taskIds = splitMaps(taskIdList);
 
       LOG.info("PullServer request param: shuffleType=" + shuffleType +
@@ -403,6 +415,8 @@ public class TajoPullServerService extends AbstractService {
 
       LOG.info("PullServer baseDir: " + 
conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
 
+      final List<FileChunk> chunks = Lists.newArrayList();
+
       // if a subquery requires a range shuffle
       if (shuffleType.equals("r")) {
         String ta = taskIds.get(0);
@@ -431,18 +445,29 @@ public class TajoPullServerService extends 
AbstractService {
 
         // if a subquery requires a hash shuffle or a scattered hash shuffle
       } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-        for (String ta : taskIds) {
-          if (!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + 
"/output/" + partId, conf)) {
-            LOG.warn(e);
-            sendError(ctx, NO_CONTENT);
-            return;
-          }
-          Path path = localFS.makeQualified(
-              lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta 
+ "/output/" + partId, conf));
-          File file = new File(path.toUri());
-          FileChunk chunk = new FileChunk(file, 0, file.length());
-          chunks.add(chunk);
+        int partParentId = 
HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) 
conf);
+        String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + 
partParentId + "/" + partId;
+        if (!lDirAlloc.ifExists(partPath, conf)) {
+          LOG.warn("Partition shuffle file not exists: " + partPath);
+          sendError(ctx, NO_CONTENT);
+          return;
+        }
+
+        Path path = 
localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath, conf));
+
+        File file = new File(path.toUri());
+        long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+        long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+        if (startPos >= file.length()) {
+          String errorMessage = "Start pos[" + startPos + "] great than file 
length [" + file.length() + "]";
+          LOG.error(errorMessage);
+          sendError(ctx, errorMessage, BAD_REQUEST);
+          return;
         }
+        LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + 
file.length());
+        FileChunk chunk = new FileChunk(file, startPos, readLen);
+        chunks.add(chunk);
       } else {
         LOG.error("Unknown shuffle type: " + shuffleType);
         sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST);

Reply via email to