Repository: tajo Updated Branches: refs/heads/master 9e026a9a2 -> 644b7cd99
TAJO-983: Worker should directly read Intermediate data stored in localhost rather than fetching. (Mai Hai Thanh via hyunsik) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/644b7cd9 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/644b7cd9 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/644b7cd9 Branch: refs/heads/master Commit: 644b7cd991ea402115c6dc1d198e7f1d7f41771b Parents: 9e026a9 Author: Hyunsik Choi <[email protected]> Authored: Wed Sep 24 01:08:43 2014 -0700 Committer: Hyunsik Choi <[email protected]> Committed: Wed Sep 24 01:08:43 2014 -0700 ---------------------------------------------------------------------- CHANGES | 3 + .../engine/planner/UniformRangePartition.java | 3 +- .../planner/physical/ExternalSortExec.java | 92 +++++++--- .../java/org/apache/tajo/worker/Fetcher.java | 37 ++-- .../main/java/org/apache/tajo/worker/Task.java | 182 ++++++++++++++++++- .../org/apache/tajo/worker/TestFetcher.java | 25 ++- .../java/org/apache/tajo/storage/RawFile.java | 71 +++++--- .../tajo/pullserver/PullServerAuxService.java | 6 +- .../tajo/pullserver/TajoPullServerService.java | 8 +- .../tajo/pullserver/retriever/FileChunk.java | 38 +++- 10 files changed, 371 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 7cbd524..6a64bc9 100644 --- a/CHANGES +++ b/CHANGES @@ -31,6 +31,9 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-983: Worker should directly read Intermediate data stored in localhost + rather than fetching. (Mai Hai Thanh via hyunsik) + TAJO-910: Simple query (non-forwarded query) should be supported against partition tables. (Hyoungjun Kim) http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java index db12285..551a9d0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java @@ -20,7 +20,6 @@ package org.apache.tajo.engine.planner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.primitives.UnsignedLong; import com.sun.tools.javac.util.Convert; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SortSpec; @@ -571,7 +570,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm { } else { if (isPureAscii[i]) { - lastBigInt = UnsignedLong.valueOf(new BigInteger(last.get(i).asByteArray())).bigIntegerValue(); + lastBigInt = new BigInteger(last.get(i).asByteArray()); if (sortSpecs[i].isAscending()) { end.put(i, DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray())); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 700e34d..0094590 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.LocalDirAllocator; @@ -45,6 +46,7 @@ import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; +import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.*; @@ -66,6 +68,8 @@ import static org.apache.tajo.storage.RawFile.RawFileScanner; public class ExternalSortExec extends SortExec { /** Class logger */ private static final Log LOG = LogFactory.getLog(ExternalSortExec.class); + /** The prefix of fragment name for intermediate */ + private static final String INTERMEDIATE_FILE_PREFIX = "@interFile_"; private SortNode plan; private final TableMeta meta; @@ -86,9 +90,9 @@ public class ExternalSortExec extends SortExec { /** local file system */ private final RawLocalFileSystem localFS; /** final output files which are used for cleaning */ - private List<Path> finalOutputFiles = null; + private List<FileFragment> finalOutputFiles = null; /** for directly merging sorted inputs */ - private List<Path> mergedInputPaths = null; + private List<FileFragment> mergedInputFragments = null; /////////////////////////////////////////////////// // transient variables @@ -129,10 +133,10 @@ public class ExternalSortExec extends SortExec { final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException { this(context, sm, plan); - mergedInputPaths = TUtil.newList(); + mergedInputFragments = TUtil.newList(); for (CatalogProtos.FragmentProto proto : fragments) { FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto); - mergedInputPaths.add(fragment.getPath()); + mergedInputFragments.add(fragment); } } @@ -269,9 +273,9 @@ public class ExternalSortExec extends SortExec { if (!sorted) { // if not sorted, first sort all data // if input files are given, it starts merging directly. - if (mergedInputPaths != null) { + if (mergedInputFragments != null) { try { - this.result = externalMergeAndSort(mergedInputPaths); + this.result = externalMergeAndSort(mergedInputFragments); } catch (Exception e) { throw new PhysicalPlanningException(e); } @@ -287,7 +291,14 @@ public class ExternalSortExec extends SortExec { } else { // if input data exceeds main-memory at least once try { - this.result = externalMergeAndSort(chunks); + List<FileFragment> fragments = TUtil.newList(); + for (Path chunk : chunks) { + FileFragment frag = new FileFragment("", chunk, 0, + new File(localFS.makeQualified(chunk).toUri()).length()); + fragments.add(frag); + } + + this.result = externalMergeAndSort(fragments); } catch (Exception e) { throw new PhysicalPlanningException(e); } @@ -328,11 +339,11 @@ public class ExternalSortExec extends SortExec { return computedFanout; } - private Scanner externalMergeAndSort(List<Path> chunks) + private Scanner externalMergeAndSort(List<FileFragment> chunks) throws IOException, ExecutionException, InterruptedException { int level = 0; - final List<Path> inputFiles = TUtil.newList(chunks); - final List<Path> outputFiles = TUtil.newList(); + final List<FileFragment> inputFiles = TUtil.newList(chunks); + final List<FileFragment> outputFiles = TUtil.newList(); int remainRun = inputFiles.size(); int chunksSize = chunks.size(); @@ -368,7 +379,7 @@ public class ExternalSortExec extends SortExec { info(LOG, "Unbalanced merge possibility detected: number of remain input (" + remainInputRuns + ") and output files (" + outputFileNum + ") <= " + defaultFanout); - List<Path> switched = TUtil.newList(); + List<FileFragment> switched = TUtil.newList(); // switch the remain inputs to the next outputs for (int j = startIdx; j < inputFiles.size(); j++) { switched.add(inputFiles.get(j)); @@ -383,7 +394,7 @@ public class ExternalSortExec extends SortExec { // wait for all sort runners int finishedMerger = 0; int index = 0; - for (Future<Path> future : futures) { + for (Future<FileFragment> future : futures) { outputFiles.add(future.get()); // Getting the number of merged files finishedMerger += numberOfMergingFiles.get(index++); @@ -391,11 +402,32 @@ public class ExternalSortExec extends SortExec { progress = ((float)finishedMerger/(float)chunksSize) * 0.5f; } - // delete merged intermediate files - for (Path path : inputFiles) { - localFS.delete(path, true); + /* + * delete merged intermediate files + * + * There may be 4 different types of file fragments in the list inputFiles + * + A: a fragment created from fetched data from a remote host. By default, this fragment represents + * a whole physical file (i.e., startOffset == 0 and length == length of physical file) + * + B1: a fragment created from a local file (pseudo-fetched data from local host) in which the fragment + * represents the whole physical file (i.e., startOffset == 0 AND length == length of physical file) + * + B2: a fragment created from a local file (pseudo-fetched data from local host) in which the fragment + * represents only a part of the physical file (i.e., startOffset > 0 OR length != length of physical file) + * + C: a fragment created from merging some fragments of the above types. When this fragment is created, + * its startOffset is set to 0 and its length is set to the length of the physical file, automatically + * + * Fragments of types A, B1, and B2 are inputs of ExternalSortExec. Among them, only B2-type fragments will + * possibly be used by another task in the future. Thus, ideally, all fragments of types A, B1, and C can be + * deleted at this point. However, for the ease of future code maintenance, we delete only type-C fragments here + */ + int numDeletedFiles = 0; + for (FileFragment frag : inputFiles) { + if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX) == true) { + localFS.delete(frag.getPath(), true); + numDeletedFiles++; + LOG.info("Delete merged intermediate file: " + frag); + } } - info(LOG, inputFiles.size() + " merged intermediate files deleted"); + info(LOG, numDeletedFiles + " merged intermediate files deleted"); // switch input files to output files, and then clear outputFiles inputFiles.clear(); @@ -418,15 +450,15 @@ public class ExternalSortExec extends SortExec { /** * Merge Thread */ - private class KWayMergerCaller implements Callable<Path> { + private class KWayMergerCaller implements Callable<FileFragment> { final int level; final int nextRunId; - final List<Path> inputFiles; + final List<FileFragment> inputFiles; final int startIdx; final int mergeFanout; final boolean updateInputStats; - public KWayMergerCaller(final int level, final int nextRunId, final List<Path> inputFiles, + public KWayMergerCaller(final int level, final int nextRunId, final List<FileFragment> inputFiles, final int startIdx, final int mergeFanout, final boolean updateInputStats) { this.level = level; this.nextRunId = nextRunId; @@ -437,7 +469,7 @@ public class ExternalSortExec extends SortExec { } @Override - public Path call() throws Exception { + public FileFragment call() throws Exception { final Path outputPath = getChunkPathForWrite(level + 1, nextRunId); info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName()); long mergeStartTime = System.currentTimeMillis(); @@ -455,7 +487,9 @@ public class ExternalSortExec extends SortExec { info(LOG, outputPath.getName() + " is written to a disk. (" + FileUtil.humanReadableByteCount(output.getOffset(), false) + " bytes, " + (mergeEndTime - mergeStartTime) + " msec)"); - return outputPath; + File f = new File(localFS.makeQualified(outputPath).toUri()); + FileFragment frag = new FileFragment(INTERMEDIATE_FILE_PREFIX + outputPath.getName(), outputPath, 0, f.length()); + return frag; } } @@ -469,7 +503,7 @@ public class ExternalSortExec extends SortExec { /** * Create a merged file scanner or k-way merge scanner. */ - private Scanner createFinalMerger(List<Path> inputs) throws IOException { + private Scanner createFinalMerger(List<FileFragment> inputs) throws IOException { if (inputs.size() == 1) { this.result = getFileScanner(inputs.get(0)); } else { @@ -478,11 +512,11 @@ public class ExternalSortExec extends SortExec { return result; } - private Scanner getFileScanner(Path path) throws IOException { - return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, path); + private Scanner getFileScanner(FileFragment frag) throws IOException { + return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, frag); } - private Scanner createKWayMerger(List<Path> inputs, final int startChunkId, final int num) throws IOException { + private Scanner createKWayMerger(List<FileFragment> inputs, final int startChunkId, final int num) throws IOException { final Scanner [] sources = new Scanner[num]; for (int i = 0; i < num; i++) { sources[i] = getFileScanner(inputs.get(startChunkId + i)); @@ -741,8 +775,12 @@ public class ExternalSortExec extends SortExec { } if (finalOutputFiles != null) { - for (Path path : finalOutputFiles) { - localFS.delete(path, true); + for (FileFragment frag : finalOutputFiles) { + File tmpFile = new File(localFS.makeQualified(frag.getPath()).toUri()); + if (frag.getStartKey() == 0 && frag.getEndKey() == tmpFile.length()) { + localFS.delete(frag.getPath(), true); + LOG.info("Delete file: " + frag); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index 4867fe4..742a025 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.pullserver.retriever.FileChunk; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.*; @@ -51,11 +52,12 @@ public class Fetcher { private final static Log LOG = LogFactory.getLog(Fetcher.class); private final URI uri; - private final File file; + private final FileChunk fileChunk; private final TajoConf conf; private final String host; private int port; + private final boolean useLocalFile; private long startTime; private long finishTime; @@ -66,9 +68,10 @@ public class Fetcher { private ClientBootstrap bootstrap; - public Fetcher(TajoConf conf, URI uri, File file, ClientSocketChannelFactory factory, Timer timer) { + public Fetcher(TajoConf conf, URI uri, FileChunk chunk, ClientSocketChannelFactory factory, Timer timer) { this.uri = uri; - this.file = file; + this.fileChunk = chunk; + this.useLocalFile = !chunk.fromRemote(); this.state = TajoProtos.FetcherState.FETCH_INIT; this.conf = conf; this.timer = timer; @@ -84,13 +87,15 @@ public class Fetcher { } } - bootstrap = new ClientBootstrap(factory); - bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec - bootstrap.setOption("receiveBufferSize", 1048576); // set 1M - bootstrap.setOption("tcpNoDelay", true); + if (!useLocalFile) { + bootstrap = new ClientBootstrap(factory); + bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec + bootstrap.setOption("receiveBufferSize", 1048576); // set 1M + bootstrap.setOption("tcpNoDelay", true); - ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file); - bootstrap.setPipelineFactory(pipelineFactory); + ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(fileChunk.getFile()); + bootstrap.setPipelineFactory(pipelineFactory); + } } public long getStartTime() { @@ -113,7 +118,16 @@ public class Fetcher { return messageReceiveCount; } - public File get() throws IOException { + public FileChunk get() throws IOException { + if (useLocalFile) { + LOG.info("Get pseudo fetch from local host"); + startTime = System.currentTimeMillis(); + finishTime = System.currentTimeMillis(); + state = TajoProtos.FetcherState.FETCH_FINISHED; + return fileChunk; + } + + LOG.info("Get real fetch from remote host"); this.startTime = System.currentTimeMillis(); this.state = TajoProtos.FetcherState.FETCH_FETCHING; ChannelFuture future = null; @@ -145,7 +159,8 @@ public class Fetcher { channelFuture.addListener(ChannelFutureListener.CLOSE); - return file; + fileChunk.setLength(fileChunk.getFile().length()); + return fileChunk; } finally { if(future != null){ // Close the channel to exit. http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 66e0f87..a7eaaf8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -21,6 +21,7 @@ package org.apache.tajo.worker; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; + import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,15 +49,21 @@ import org.apache.tajo.engine.query.QueryUnitRequest; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.*; import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; +import org.apache.tajo.pullserver.TajoPullServerService; +import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.NetUtils; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.jboss.netty.util.Timer; import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.URI; import java.text.NumberFormat; import java.util.*; @@ -91,6 +98,7 @@ public class Task { private long finishTime; private final TableStats inputStats; + private List<FileChunk> localChunks; // TODO - to be refactored private ShuffleType shuffleType = null; @@ -190,6 +198,8 @@ public class Task { context.setOutputPath(outFilePath); } + this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>()); + context.setState(TaskAttemptState.TA_PENDING); LOG.info("=================================="); LOG.info("* Subquery " + request.getId() + " is initialized"); @@ -586,6 +596,17 @@ public class Task { listTablets.add(tablet); } + // Special treatment for locally pseudo fetched chunks + synchronized (localChunks) { + for (FileChunk chunk : localChunks) { + if (name.equals(chunk.getEbId())) { + tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); + listTablets.add(tablet); + LOG.info("One local chunk is added to listTablets"); + } + } + } + FileFragment[] tablets = new FileFragment[listTablets.size()]; listTablets.toArray(tablets); @@ -620,8 +641,13 @@ public class Task { LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); } try { - File fetched = fetcher.get(); - if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null) { + FileChunk fetched = fetcher.get(); + if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null + && fetched.getFile() != null) { + if (fetched.fromRemote() == false) { + localChunks.add(fetched); + LOG.info("Add a new FileChunk to local chunk list"); + } break; } } catch (Throwable e) { @@ -677,19 +703,55 @@ public class Task { Timer timer = executionBlockContext.getRPCTimer(); Path inputDir = executionBlockContext.getLocalDirAllocator(). getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); - File storeDir; int i = 0; - File storeFile; + File storeDir; + File defaultStoreFile; + FileChunk storeChunk = null; List<Fetcher> runnerList = Lists.newArrayList(); + for (FetchImpl f : fetches) { + storeDir = new File(inputDir.toString(), f.getName()); + if (!storeDir.exists()) { + storeDir.mkdirs(); + } + for (URI uri : f.getURIs()) { - storeDir = new File(inputDir.toString(), f.getName()); - if (!storeDir.exists()) { - storeDir.mkdirs(); + defaultStoreFile = new File(storeDir, "in_" + i); + InetAddress address = InetAddress.getByName(uri.getHost()); + + if (NetUtils.isLocalAddress(address)) { + boolean hasError = false; + try { + LOG.info("Try to get local file chunk at local host"); + storeChunk = getLocalStoredFileChunk(uri, systemConf); + } catch (Throwable t) { + hasError = true; + } + + // When a range request is out of range, storeChunk will be NULL. This case is normal state. + // So, we should skip and don't need to create storeChunk. + if (storeChunk == null && !hasError) { + continue; + } + + if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 + && hasError == false) { + storeChunk.setFromRemote(false); + } else { + storeChunk = new FileChunk(defaultStoreFile, 0, -1); + storeChunk.setFromRemote(true); + } + } else { + storeChunk = new FileChunk(defaultStoreFile, 0, -1); + storeChunk.setFromRemote(true); } - storeFile = new File(storeDir, "in_" + i); - Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory, timer); + + // If we decide that intermediate data should be really fetched from a remote host, storeChunk + // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it + storeChunk.setEbId(f.getName()); + Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk, channelFactory, timer); + LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); runnerList.add(fetcher); i++; } @@ -701,6 +763,108 @@ public class Task { } } + private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { + // Parse the URI + LOG.info("getLocalStoredFileChunk starts"); + final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).getParameters(); + final List<String> types = params.get("type"); + final List<String> qids = params.get("qid"); + final List<String> taskIdList = params.get("ta"); + final List<String> subQueryIds = params.get("sid"); + final List<String> partIds = params.get("p"); + final List<String> offsetList = params.get("offset"); + final List<String> lengthList = params.get("length"); + + if (types == null || subQueryIds == null || qids == null || partIds == null) { + LOG.error("Invalid URI - Required queryId, type, subquery Id, and part id"); + return null; + } + + if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) { + LOG.error("Invalid URI - Required qids, type, taskIds, subquery Id, and part id"); + return null; + } + + String queryId = qids.get(0); + String shuffleType = types.get(0); + String sid = subQueryIds.get(0); + String partId = partIds.get(0); + + if (shuffleType.equals("r") && taskIdList == null) { + LOG.error("Invalid URI - For range shuffle, taskId is required"); + return null; + } + List<String> taskIds = splitMaps(taskIdList); + + FileChunk chunk = null; + long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; + long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; + + LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId + + ", taskIds=" + taskIdList); + + // The working directory of Tajo worker for each query, including subquery + String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; + + // If the subquery requires a range shuffle + if (shuffleType.equals("r")) { + String ta = taskIds.get(0); + if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { + LOG.warn("Range shuffle - file not exist"); + return null; + } + Path path = executionBlockContext.getLocalFS().makeQualified( + executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); + String startKey = params.get("start").get(0); + String endKey = params.get("end").get(0); + boolean last = params.get("final") != null; + + try { + chunk = TajoPullServerService.getFileCunks(path, startKey, endKey, last); + } catch (Throwable t) { + LOG.error("getFileChunks() throws exception"); + return null; + } + + // If the subquery requires a hash shuffle or a scattered hash shuffle + } else if (shuffleType.equals("h") || shuffleType.equals("s")) { + int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); + String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; + if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { + LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); + return null; + } + Path path = executionBlockContext.getLocalFS().makeQualified( + executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); + File file = new File(path.toUri()); + long startPos = (offset >= 0 && length >= 0) ? offset : 0; + long readLen = (offset >= 0 && length >= 0) ? length : file.length(); + + if (startPos >= file.length()) { + LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); + return null; + } + chunk = new FileChunk(file, startPos, readLen); + + } else { + LOG.error("Unknown shuffle type"); + return null; + } + + return chunk; + } + + private List<String> splitMaps(List<String> mapq) { + if (null == mapq) { + return null; + } + final List<String> ret = new ArrayList<String>(); + for (String s : mapq) { + Collections.addAll(ret, s.split(",")); + } + return ret; + } + public static Path getTaskAttemptDir(QueryUnitAttemptId quid) { Path workDir = StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()), http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java index b15d523..551610b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java @@ -25,6 +25,7 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.pullserver.TajoPullServerService; +import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.util.CommonTestingUtil; @@ -98,8 +99,12 @@ public class TestFetcher { stream.close(); URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); - final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer); - assertNotNull(fetcher.get()); + FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); + storeChunk.setFromRemote(true); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer); + FileChunk chunk = fetcher.get(); + assertNotNull(chunk); + assertNotNull(chunk.getFile()); FileSystem fs = FileSystem.getLocal(new TajoConf()); FileStatus inStatus = fs.getFileStatus(inputPath); @@ -140,7 +145,9 @@ public class TestFetcher { stream.close(); URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); - final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer); + FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); + storeChunk.setFromRemote(true); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer); assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); fetcher.get(); @@ -168,7 +175,9 @@ public class TestFetcher { stream.close(); URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); - final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer); + FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); + storeChunk.setFromRemote(true); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer); assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); fetcher.get(); @@ -200,7 +209,9 @@ public class TestFetcher { stream.close(); URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); - final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer); + FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); + storeChunk.setFromRemote(true); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer); assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); fetcher.get(); @@ -218,7 +229,9 @@ public class TestFetcher { String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); - final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer); + FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); + storeChunk.setFromRemote(true); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer); assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); pullServerService.stop(); http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java index 1f57675..edcf686 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage; import com.google.protobuf.Message; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,6 +33,7 @@ import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatumFactory; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.BitArray; import java.io.File; @@ -47,36 +49,31 @@ public class RawFile { public static class RawFileScanner extends FileScanner implements SeekableScanner { private FileChannel channel; private DataType[] columnTypes; - private Path path; private ByteBuffer buffer; + private int bufferSize; private Tuple tuple; - private int headerSize = 0; + private int headerSize = 0; // Header size of a tuple private BitArray nullFlags; private static final int RECORD_SIZE = 4; private boolean eof = false; - private long fileSize; + private long fileLimit; // If this.fragment represents a complete file, this value is equal to the file's size + private long numBytesRead; private FileInputStream fis; private long recordCount; - public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { - super(conf, schema, meta, null); - this.path = path; - } - - @SuppressWarnings("unused") public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException { - this(conf, schema, meta, fragment.getPath()); + super(conf, schema, meta, fragment); } public void init() throws IOException { File file; try { - if (path.toUri().getScheme() != null) { - file = new File(path.toUri()); + if (fragment.getPath().toUri().getScheme() != null) { + file = new File(fragment.getPath().toUri()); } else { - file = new File(path.toString()); + file = new File(fragment.getPath().toString()); } } catch (IllegalArgumentException iae) { throw new IOException(iae); @@ -84,16 +81,22 @@ public class RawFile { fis = new FileInputStream(file); channel = fis.getChannel(); - fileSize = channel.size(); + fileLimit = fragment.getStartKey() + fragment.getEndKey(); // fileLimit is less than or equal to fileSize if (tableStats != null) { - tableStats.setNumBytes(fileSize); + tableStats.setNumBytes(fragment.getEndKey()); } if (LOG.isDebugEnabled()) { - LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size()); + LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", total file size :" + channel.size() + + ", fragment size :" + fragment.getEndKey() + ", fileLimit: " + fileLimit); } - buffer = ByteBuffer.allocateDirect(64 * 1024); + if (fragment.getEndKey() < 64 * StorageUnit.KB) { + bufferSize = fragment.getEndKey().intValue(); + } else { + bufferSize = 64 * StorageUnit.KB; + } + buffer = ByteBuffer.allocateDirect(bufferSize); columnTypes = new DataType[schema.size()]; for (int i = 0; i < schema.size(); i++) { @@ -101,14 +104,14 @@ public class RawFile { } tuple = new VTuple(columnTypes.length); + nullFlags = new BitArray(schema.size()); + headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize // initial read - channel.read(buffer); + channel.position(fragment.getStartKey()); + numBytesRead = channel.read(buffer); buffer.flip(); - nullFlags = new BitArray(schema.size()); - headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); - super.init(); } @@ -125,19 +128,31 @@ public class RawFile { } else { buffer.clear(); channel.position(offset); - channel.read(buffer); + int bytesRead = channel.read(buffer); + numBytesRead = bytesRead; buffer.flip(); eof = false; } } private boolean fillBuffer() throws IOException { + if (numBytesRead >= fragment.getEndKey()) { + eof = true; + return false; + } + int currentDataSize = buffer.remaining(); buffer.compact(); - if (channel.read(buffer) == -1) { + int bytesRead = channel.read(buffer); + if (bytesRead == -1) { eof = true; return false; } else { buffer.flip(); + long realRemaining = fragment.getEndKey() - numBytesRead; + numBytesRead += bytesRead; + if (realRemaining < bufferSize) { + buffer.limit(currentDataSize + (int) realRemaining); + } return true; } } @@ -356,7 +371,7 @@ public class RawFile { } } - if(!buffer.hasRemaining() && channel.position() == fileSize){ + if(!buffer.hasRemaining() && channel.position() == fileLimit){ eof = true; } return new VTuple(tuple); @@ -368,7 +383,7 @@ public class RawFile { buffer.clear(); // reload initial buffer channel.position(0); - channel.read(buffer); + numBytesRead = channel.read(buffer); buffer.flip(); eof = false; } @@ -376,7 +391,7 @@ public class RawFile { @Override public void close() throws IOException { if (tableStats != null) { - tableStats.setReadBytes(fileSize); + tableStats.setReadBytes(fragment.getEndKey()); tableStats.setNumRows(recordCount); } @@ -410,14 +425,14 @@ public class RawFile { } if(eof || channel == null) { - tableStats.setReadBytes(fileSize); + tableStats.setReadBytes(fragment.getEndKey()); return 1.0f; } if (filePos == 0) { return 0.0f; } else { - return Math.min(1.0f, ((float)filePos / (float)fileSize)); + return Math.min(1.0f, ((float)filePos / fragment.getEndKey().floatValue())); } } catch (IOException e) { LOG.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java index 5f9f9e8..1c63c8a 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java @@ -488,20 +488,20 @@ public class PullServerAuxService extends AuxiliaryService { ChannelFuture writeFuture; if (ch.getPipeline().get(SslHandler.class) == null) { final FadvisedFileRegion partition = new FadvisedFileRegion(spill, - file.startOffset, file.length(), manageOsCache, readaheadLength, + file.startOffset(), file.length(), manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); writeFuture = ch.write(partition); writeFuture.addListener(new FileCloseListener(partition, null, 0, null)); } else { // HTTPS cannot be done with zero copy. final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, - file.startOffset, file.length, sslFileBufferSize, + file.startOffset(), file.length(), sslFileBufferSize, manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); writeFuture = ch.write(chunk); } metrics.shuffleConnections.incr(); - metrics.shuffleOutputBytes.incr(file.length); // optimistic + metrics.shuffleOutputBytes.incr(file.length()); // optimistic return writeFuture; } http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 3fa67ae..2fb7c29 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -644,14 +644,14 @@ public class TajoPullServerService extends AbstractService { spill = new RandomAccessFile(file.getFile(), "r"); if (ch.getPipeline().get(SslHandler.class) == null) { final FadvisedFileRegion filePart = new FadvisedFileRegion(spill, - file.startOffset, file.length(), manageOsCache, readaheadLength, + file.startOffset(), file.length(), manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); writeFuture = ch.write(filePart); writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this)); } else { // HTTPS cannot be done with zero copy. final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, - file.startOffset, file.length, sslFileBufferSize, + file.startOffset(), file.length(), sslFileBufferSize, manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); writeFuture = ch.write(chunk); @@ -667,7 +667,7 @@ public class TajoPullServerService extends AbstractService { return null; } metrics.shuffleConnections.incr(); - metrics.shuffleOutputBytes.incr(file.length); // optimistic + metrics.shuffleOutputBytes.incr(file.length()); // optimistic return writeFuture; } @@ -698,7 +698,7 @@ public class TajoPullServerService extends AbstractService { } } - public FileChunk getFileCunks(Path outDir, + public static FileChunk getFileCunks(Path outDir, String startKey, String endKey, boolean last) throws IOException { http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java index a8b424e..67cff21 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java @@ -23,8 +23,18 @@ import java.io.FileNotFoundException; public class FileChunk { private final File file; - public final long startOffset; - public final long length; + private final long startOffset; + private long length; + + /** + * TRUE if this.file is created by getting data from a remote host (e.g., by HttpRequest). FALSE otherwise. + */ + private boolean fromRemote; + + /** + * ExecutionBlockId + */ + private String ebId; public FileChunk(File file, long startOffset, long length) throws FileNotFoundException { this.file = file; @@ -44,8 +54,28 @@ public class FileChunk { return this.length; } + public void setLength(long newLength) { + this.length = newLength; + } + + public boolean fromRemote() { + return this.fromRemote; + } + + public void setFromRemote(boolean newVal) { + this.fromRemote = newVal; + } + + public String getEbId() { + return this.ebId; + } + + public void setEbId(String newVal) { + this.ebId = newVal; + } + public String toString() { - return " (start=" + startOffset() + ", length=" + length + ") " - + file.getAbsolutePath(); + return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") " + + file.getAbsolutePath(); } }
