diff --git tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 64475fe..7310736 100644
--- tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.*;
@@ -52,11 +53,12 @@ public class Fetcher {
   private final static Log LOG = LogFactory.getLog(Fetcher.class);
 
   private final URI uri;
-  private final File file;
+  private final FileChunk fileChunk;
   private final TajoConf conf;
 
   private final String host;
   private int port;
+  private final boolean useLocalFile;
 
   private long startTime;
   private long finishTime;
@@ -67,11 +69,12 @@ public class Fetcher {
 
   private ClientBootstrap bootstrap;
 
-  public Fetcher(TajoConf conf, URI uri, File file, ClientSocketChannelFactory factory) {
+  public Fetcher(TajoConf conf, URI uri, FileChunk chunk, ClientSocketChannelFactory factory, boolean useLocalFile) {
     this.uri = uri;
-    this.file = file;
+    this.fileChunk = chunk;
     this.state = TajoProtos.FetcherState.FETCH_INIT;
     this.conf = conf;
+    this.useLocalFile = useLocalFile;
 
     String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
     this.host = uri.getHost() == null ? "localhost" : uri.getHost();
@@ -84,13 +87,15 @@ public class Fetcher {
       }
     }
 
-    bootstrap = new ClientBootstrap(factory);
-    bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
-    bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
-    bootstrap.setOption("tcpNoDelay", true);
-
-    ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file);
-    bootstrap.setPipelineFactory(pipelineFactory);
+    if(!useLocalFile) {
+      bootstrap = new ClientBootstrap(factory);
+      bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
+      bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
+      bootstrap.setOption("tcpNoDelay", true);
+  
+      ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(fileChunk.getFile());
+      bootstrap.setPipelineFactory(pipelineFactory);
+    }
   }
 
   public long getStartTime() {
@@ -113,7 +118,17 @@ public class Fetcher {
     return messageReceiveCount;
   }
 
-  public File get() throws IOException {
+  public FileChunk get() throws IOException {
+    if (useLocalFile) {
+      LOG.info("Get pseudo fetch from local host");
+      startTime = System.currentTimeMillis();
+      finishTime = System.currentTimeMillis();
+      state = TajoProtos.FetcherState.FETCH_FINISHED;
+      return fileChunk;
+    }
+
+    LOG.info("Get real fetch from remote host");
+    
     this.startTime = System.currentTimeMillis();
     this.state = TajoProtos.FetcherState.FETCH_FETCHING;
     ChannelFuture future = null;
@@ -145,7 +160,8 @@ public class Fetcher {
 
       channelFuture.addListener(ChannelFutureListener.CLOSE);
 
-      return file;
+      fileChunk.setLength(fileChunk.getFile().length());
+      return fileChunk;
     } finally {
       if(future != null){
         // Close the channel to exit.
diff --git tajo-core/src/main/java/org/apache/tajo/worker/Task.java tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index d0665ae..318170f 100644
--- tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -21,6 +21,7 @@ package org.apache.tajo.worker;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,15 +47,21 @@ import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
 import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.URI;
 import java.text.NumberFormat;
 import java.util.*;
@@ -92,6 +99,7 @@ public class Task {
   private long finishTime;
 
   private final TableStats inputStats;
+  private List<FileChunk> localChunks;
 
   // TODO - to be refactored
   private ShuffleType shuffleType = null;
@@ -210,6 +218,8 @@ public class Task {
       context.setOutputPath(outFilePath);
     }
 
+    this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
+
     context.setState(TaskAttemptState.TA_PENDING);
     LOG.info("==================================");
     LOG.info("* Subquery " + request.getId() + " is initialized");
@@ -296,6 +306,7 @@ public class Task {
   }
 
   public void fetch() {
+    localChunks.clear();
     for (Fetcher f : fetcherRunners) {
       taskRunnerContext.getFetchLauncher().submit(new FetchRunner(context, f));
     }
@@ -614,6 +625,17 @@ public class Task {
       listTablets.add(tablet);
     }
 
+    // Special treatment for locally pseudo fetched chunks
+    synchronized (localChunks) {
+      for (FileChunk chunk : localChunks) {
+	if (name.equals(chunk.ebId)) {
+	  tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset, chunk.length);
+	  listTablets.add(tablet);
+	  LOG.info("One local chunk is added to listTablets");
+	}
+      }
+    }
+
     FileFragment[] tablets = new FileFragment[listTablets.size()];
     listTablets.toArray(tablets);
 
@@ -648,10 +670,14 @@ public class Task {
             LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
           }
           try {
-            File fetched = fetcher.get();
-            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null) {
-              break;
-            }
+	    FileChunk fetched = fetcher.get();
+	    if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null && fetched.getFile() != null) {
+	      if (fetched.fromRemote == false) {
+		localChunks.add(fetched);
+		LOG.info("Add a new FileChunk to local chunk list");
+	      }
+	      break;
+	    }
           } catch (Throwable e) {
             LOG.error("Fetch failed: " + fetcher.getURI(), e);
           }
@@ -715,19 +741,49 @@ public class Task {
       Path inputDir = lDirAllocator.
           getLocalPathToRead(
               getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
-      File storeDir;
 
       int i = 0;
-      File storeFile;
+      File storeDir;
+      File defaultStoreFile;
+      FileChunk storeChunk = null;      
       List<Fetcher> runnerList = Lists.newArrayList();
+
       for (FetchImpl f : fetches) {
+	storeDir = new File(inputDir.toString(), f.getName());	
+        if (!storeDir.exists()) {
+          storeDir.mkdirs();
+        }
+        
         for (URI uri : f.getURIs()) {
-          storeDir = new File(inputDir.toString(), f.getName());
-          if (!storeDir.exists()) {
-            storeDir.mkdirs();
-          }
-          storeFile = new File(storeDir, "in_" + i);
-          Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory);
+          defaultStoreFile = new File(storeDir, "in_" + i);
+          boolean fetchLocal = false;
+	  InetAddress address = InetAddress.getByName(uri.getHost());
+
+	  if (NetUtils.isLocalAddress(address)) {
+	    boolean hasError = false;
+	    try {
+	      LOG.info("Try to get local file chunk at local host");
+	      storeChunk = getLocalStoredFileChunk(uri, systemConf);
+	    } catch (Throwable t) {
+	      hasError = true;
+	    }
+	    if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset > -1 && hasError == false) {
+	      fetchLocal = true;
+	      storeChunk.fromRemote = false;
+	    } else {
+	      storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+	      storeChunk.fromRemote = true;
+	    }
+	  } else {
+	    storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+	    storeChunk.fromRemote = true;
+	  }
+
+	  // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+	  // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
+	  storeChunk.ebId = f.getName();
+          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk, channelFactory, fetchLocal);
+          LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
           runnerList.add(fetcher);
           i++;
         }
@@ -739,6 +795,106 @@ public class Task {
     }
   }
 
+  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+    // Parse the URI
+    LOG.info("getLocalStoredFileChunk starts");
+    final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).getParameters();
+    final List<String> types = params.get("type");
+    final List<String> qids = params.get("qid");
+    final List<String> taskIdList = params.get("ta");
+    final List<String> subQueryIds = params.get("sid");
+    final List<String> partIds = params.get("p");
+    final List<String> offsetList = params.get("offset");
+    final List<String> lengthList = params.get("length");
+
+    if (types == null || subQueryIds == null || qids == null || partIds == null) {
+      LOG.error("Invalid URI - Required queryId, type, subquery Id, and part id");
+      return null;
+    }
+
+    if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
+      LOG.error("Invalid URI - Required qids, type, taskIds, subquery Id, and part id");
+      return null;
+    }
+
+    String queryId = qids.get(0);
+    String shuffleType = types.get(0);
+    String sid = subQueryIds.get(0);
+    String partId = partIds.get(0);
+
+    if (shuffleType.equals("r") && taskIdList == null) {
+      LOG.error("Invalid URI - For range shuffle, taskId is required");
+      return null;
+    }
+    List<String> taskIds = splitMaps(taskIdList);
+
+    FileChunk chunk = null;
+    long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+    long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+
+    LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+	+ ", taskIds=" + taskIdList);
+
+    // The working directory of Tajo worker for each query, including subquery
+    String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+
+    // If the subquery requires a range shuffle
+    if (shuffleType.equals("r")) {
+      String ta = taskIds.get(0);
+      if (!lDirAllocator.ifExists(queryBaseDir + ta + "/output/", conf)) {
+	LOG.warn("Range shuffle - file not exist");
+	return null;
+      }
+      Path path = localFS.makeQualified(lDirAllocator.getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+      String startKey = params.get("start").get(0);
+      String endKey = params.get("end").get(0);
+      boolean last = params.get("final") != null;
+
+      try {
+	chunk = TajoPullServerService.getFileCunks(path, startKey, endKey, last);
+      } catch (Throwable t) {
+	LOG.error("getFileChunks() throws exception");
+	return null;
+      }
+
+      // If the subquery requires a hash shuffle or a scattered hash shuffle
+    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
+      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
+      if (!lDirAllocator.ifExists(partPath, conf)) {
+	LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+	return null;
+      }
+      Path path = localFS.makeQualified(lDirAllocator.getLocalPathToRead(partPath, conf));
+      File file = new File(path.toUri());
+      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+      if (startPos >= file.length()) {
+	LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+	return null;
+      }
+      chunk = new FileChunk(file, startPos, readLen);
+
+    } else {
+      LOG.error("Unknown shuffle type");
+      return null;
+    }
+
+    return chunk;
+  }
+
+  private List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<String>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
   protected class Reporter {
     private QueryMasterProtocolService.Interface masterStub;
     private Thread pingThread;
diff --git tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 95c06bb..d1fe63f 100644
--- tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -25,6 +25,7 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -93,8 +94,12 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
-    assertNotNull(fetcher.get());
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.fromRemote = true;
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, false);
+    FileChunk chunk = fetcher.get();
+    assertNotNull(chunk);
+    assertNotNull(chunk.getFile());
 
     FileSystem fs = FileSystem.getLocal(new TajoConf());
     FileStatus inStatus = fs.getFileStatus(inputPath);
@@ -133,7 +138,9 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.fromRemote = true;
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, false);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -161,7 +168,9 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.fromRemote = true;
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, false);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -193,7 +202,9 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.fromRemote = true;
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, false);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -207,11 +218,12 @@ public class TestFetcher {
     String ta = "1_0";
     String partId = "1";
 
-    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
     String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.fromRemote = true;
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, false);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     pullServerService.stop();
diff --git tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 150ac85..0316a77 100644
--- tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -705,7 +705,7 @@ public class TajoPullServerService extends AbstractService {
     }
   }
 
-  public FileChunk getFileCunks(Path outDir,
+  public static FileChunk getFileCunks(Path outDir,
                                       String startKey,
                                       String endKey,
                                       boolean last) throws IOException {
diff --git tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
index a8b424e..063f8d7 100644
--- tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
+++ tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
@@ -21,10 +21,24 @@ package org.apache.tajo.pullserver.retriever;
 import java.io.File;
 import java.io.FileNotFoundException;
 
+import org.apache.tajo.ExecutionBlockId;
+
 public class FileChunk {
   private final File file;
   public final long startOffset;
-  public final long length;
+  public long length;
+
+  /**
+   * TRUE if this.file is created by getting data from a remote host
+   * (e.g., by HttpRequest). FALSE otherwise.
+   */
+
+  public boolean fromRemote;
+
+  /**
+   * ExecutionBlockId
+   */
+  public String ebId;
 
   public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
     this.file = file;
@@ -44,8 +58,12 @@ public class FileChunk {
     return this.length;
   }
 
+  public void setLength(long newLength) {
+    this.length = newLength;
+  }
+
   public String toString() {
-    return " (start=" + startOffset() + ", length=" + length + ") "
-        + file.getAbsolutePath();
+    return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote 
+	+ ", ebId=" + ebId + ") " + file.getAbsolutePath();
   }
 }
