Repository: tajo Updated Branches: refs/heads/master d952b61ae -> d99bd085e
TAJO-717: Improve file splitting for large number of splits. (jinho) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d99bd085 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d99bd085 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d99bd085 Branch: refs/heads/master Commit: d99bd085e4f02bd11a9133c6bef8942b9dc27723 Parents: d952b61 Author: jinossy <[email protected]> Authored: Thu Apr 10 14:54:01 2014 +0900 Committer: jinossy <[email protected]> Committed: Thu Apr 10 14:54:01 2014 +0900 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../tajo/master/DefaultTaskScheduler.java | 33 ++-- .../tajo/master/querymaster/Repartitioner.java | 6 +- tajo-storage/pom.xml | 28 +++ .../tajo/storage/AbstractStorageManager.java | 196 ++++++++++++------- .../tajo/storage/fragment/FileFragment.java | 15 +- .../org/apache/tajo/storage/rcfile/RCFile.java | 9 +- .../apache/tajo/storage/TestStorageManager.java | 111 ++++++++++- 8 files changed, 299 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a83dd25..fe2349f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -143,6 +143,8 @@ Release 0.8.0 - unreleased IMPROVEMENTS + TAJO-717: Improve file splitting for large number of splits. (jinho) + TAJO-356: Improve TajoClient to directly get query results in the first request. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index 61fa84e..409a1b1 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -44,8 +44,6 @@ import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.NetUtils; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.URI; import java.util.*; import java.util.Map.Entry; @@ -582,8 +580,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>()); private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>()); private Map<String, HostVolumeMapping> leafTaskHostMapping = new HashMap<String, HostVolumeMapping>(); - private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping = - new HashMap<String, LinkedList<QueryUnitAttemptId>>(); + private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping = + new HashMap<String, HashSet<QueryUnitAttemptId>>(); private void addLeafTask(QueryUnitAttemptScheduleEvent event) { QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt(); @@ -604,13 +602,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { LOG.debug("Added attempt req to host " + host); } - LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack()); + HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack()); if (list == null) { - list = new LinkedList<QueryUnitAttemptId>(); + list = new HashSet<QueryUnitAttemptId>(); leafTasksRackMapping.put(hostVolumeMapping.getRack(), list); } - if(!list.contains(queryUnitAttempt.getId())) list.add(queryUnitAttempt.getId()); + list.add(queryUnitAttempt.getId()); if (LOG.isDebugEnabled()) { LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack()); @@ -687,14 +685,19 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { //find task in rack if (attemptId == null) { - LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack); - while (list != null && list.size() > 0) { - QueryUnitAttemptId tId = list.removeFirst(); - - if (leafTasks.contains(tId)) { - leafTasks.remove(tId); - attemptId = tId; - break; + HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack); + if (list != null) { + synchronized (list) { + Iterator<QueryUnitAttemptId> iterator = list.iterator(); + while (iterator.hasNext()) { + QueryUnitAttemptId tId = iterator.next(); + iterator.remove(); + if (leafTasks.contains(tId)) { + leafTasks.remove(tId); + attemptId = tId; + break; + } + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index a72c222..6704230 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -237,10 +237,8 @@ public class Repartitioner { TableDesc table) throws IOException { List<FileFragment> fragments = Lists.newArrayList(); PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan; - for (Path path : partitionsScan.getInputPaths()) { - fragments.addAll(sm.getSplits( - scan.getCanonicalName(), table.getMeta(), table.getSchema(), path)); - } + fragments.addAll(sm.getSplits( + scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths())); partitionsScan.setInputPaths(null); return fragments; } http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-storage/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml index b9a162a..5850ed4 100644 --- a/tajo-storage/pom.xml +++ b/tajo-storage/pom.xml @@ -231,6 +231,34 @@ </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java index a7ed981..6615208 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java @@ -90,6 +90,11 @@ public abstract class AbstractStorageManager { throws IOException { FileSystem fs = path.getFileSystem(conf); FileStatus status = fs.getFileStatus(path); + return getFileScanner(meta, schema, path, status); + } + + public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) + throws IOException { FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); return getScanner(meta, schema, fragment); } @@ -337,9 +342,8 @@ public abstract class AbstractStorageManager { * @return array of FileStatus objects * @throws IOException if zero items. */ - protected List<FileStatus> listStatus(Path path) throws IOException { + protected List<FileStatus> listStatus(Path... dirs) throws IOException { List<FileStatus> result = new ArrayList<FileStatus>(); - Path[] dirs = new Path[]{path}; if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } @@ -392,18 +396,15 @@ public abstract class AbstractStorageManager { * so that Mappers process entire files. * * - * @param filename the file name to check + * @param path the file name to check + * @param status get the file length * @return is this file isSplittable? */ - protected boolean isSplittable(TableMeta meta, Schema schema, Path filename) throws IOException { - Scanner scanner = getFileScanner(meta, schema, filename); - return scanner.isSplittable(); - } - - - protected long computeSplitSize(long blockSize, long minSize, - long maxSize) { - return Math.max(minSize, Math.min(maxSize, blockSize)); + protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { + Scanner scanner = getFileScanner(meta, schema, path, status); + boolean split = scanner.isSplittable(); + scanner.close(); + return split; } private static final double SPLIT_SLOP = 1.1; // 10% slop @@ -428,22 +429,22 @@ public abstract class AbstractStorageManager { * A factory that makes the split for this class. It can be overridden * by sub-classes to make sub-types */ - protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) { + protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) { return new FileFragment(fragmentId, file, start, length); } - protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length, + protected FileFragment makeSplit(String fragmentId, Path file, long start, long length, String[] hosts) { return new FileFragment(fragmentId, file, start, length, hosts); } - protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation, - int[] diskIds) throws IOException { - return new FileFragment(fragmentId, file, blockLocation, diskIds); + protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation) + throws IOException { + return new FileFragment(fragmentId, file, blockLocation); } // for Non Splittable. eg, compressed gzip TextFile - protected FileFragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length, + protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, BlockLocation[] blkLocations) throws IOException { Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>(); @@ -535,79 +536,128 @@ public abstract class AbstractStorageManager { * * @throws IOException */ - public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException { + public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs) + throws IOException { // generate splits' - List<FileFragment> splits = new ArrayList<FileFragment>(); - FileSystem fs = inputPath.getFileSystem(conf); - List<FileStatus> files; - if (fs.isFile(inputPath)) { - files = Lists.newArrayList(fs.getFileStatus(inputPath)); - } else { - files = listStatus(inputPath); - } - for (FileStatus file : files) { - Path path = file.getPath(); - long length = file.getLen(); - if (length > 0) { - BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); - boolean splittable = isSplittable(meta, schema, path); - if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { - // supported disk volume - BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs) - .getFileBlockStorageLocations(Arrays.asList(blkLocations)); - if (splittable) { - for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { - splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation - .getVolumeIds()))); - } - } else { // Non splittable - long blockSize = blockStorageLocations[0].getLength(); - if (blockSize >= length) { - for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { - splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation - .getVolumeIds()))); + List<FileFragment> splits = Lists.newArrayList(); + List<FileFragment> volumeSplits = Lists.newArrayList(); + List<BlockLocation> blockLocations = Lists.newArrayList(); + + for (Path p : inputs) { + FileSystem fs = p.getFileSystem(conf); + ArrayList<FileStatus> files = Lists.newArrayList(); + if (fs.isFile(p)) { + files.addAll(Lists.newArrayList(fs.getFileStatus(p))); + } else { + files.addAll(listStatus(p)); + } + + int previousSplitSize = splits.size(); + for (FileStatus file : files) { + Path path = file.getPath(); + long length = file.getLen(); + if (length > 0) { + // Get locations of blocks of file + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + boolean splittable = isSplittable(meta, schema, path, file); + if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { + + if (splittable) { + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } + blockLocations.addAll(Arrays.asList(blkLocations)); + + } else { // Non splittable + long blockSize = blkLocations[0].getLength(); + if (blockSize >= length) { + blockLocations.addAll(Arrays.asList(blkLocations)); + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } + } else { + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); } - } else { - splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations)); } - } - } else { - if (splittable) { + } else { + if (splittable) { - long minSize = Math.max(getMinSplitSize(), 1); + long minSize = Math.max(getMinSplitSize(), 1); - long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one - long splitSize = Math.max(minSize, blockSize); - long bytesRemaining = length; + long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one + long splitSize = Math.max(minSize, blockSize); + long bytesRemaining = length; - // for s3 - while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(tableName, meta, path, length - bytesRemaining, splitSize, - blkLocations[blkIndex].getHosts())); - bytesRemaining -= splitSize; - } - if (bytesRemaining > 0) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(tableName, meta, path, length - bytesRemaining, bytesRemaining, - blkLocations[blkIndex].getHosts())); + // for s3 + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, + blkLocations[blkIndex].getHosts())); + bytesRemaining -= splitSize; + } + if (bytesRemaining > 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, + blkLocations[blkIndex].getHosts())); + } + } else { // Non splittable + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); } - } else { // Non splittable - splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations)); } + } else { + //for zero length files + splits.add(makeSplit(tableName, path, 0, length)); } - } else { - //for zero length files - splits.add(makeSplit(tableName, meta, path, 0, length)); + } + if(LOG.isDebugEnabled()){ + LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize)); } } + // Combine original fileFragments with new VolumeId information + setVolumeMeta(volumeSplits, blockLocations); + splits.addAll(volumeSplits); LOG.info("Total # of splits: " + splits.size()); return splits; } + private void setVolumeMeta(List<FileFragment> splits, final List<BlockLocation> blockLocations) + throws IOException { + + int locationSize = blockLocations.size(); + int splitSize = splits.size(); + if (locationSize == 0 || splitSize == 0) return; + + if (locationSize != splitSize) { + // splits and locations don't match up + LOG.warn("Number of block locations not equal to number of splits: " + + "#locations=" + locationSize + + " #splits=" + splitSize); + return; + } + + DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf); + int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); + int blockLocationIdx = 0; + + Iterator<FileFragment> iter = splits.iterator(); + while (locationSize > blockLocationIdx) { + + int subSize = Math.min(locationSize - blockLocationIdx, lsLimit); + List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize); + //BlockStorageLocation containing additional volume location information for each replica of each block. + BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations); + + for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { + iter.next().setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds())); + blockLocationIdx++; + } + } + LOG.info("# of splits with volumeId " + splitSize); + } + private static class InvalidInputException extends IOException { List<IOException> errors; public InvalidInputException(List<IOException> errors) { http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java index ea8bf9f..6fe6841 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java @@ -50,16 +50,17 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab init(builder.build()); } - public FileFragment(String tableName, Path uri, BlockLocation blockLocation, int[] diskIds) + public FileFragment(String tableName, Path uri, BlockLocation blockLocation) throws IOException { - this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), - blockLocation.getHosts(), diskIds); + this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null); } + public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) { + this.set(tableName, uri, start, length, hosts, diskIds); + } // Non splittable public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) { - this.set(tableName, uri, start, length, null, null); - this.hosts = hosts; + this.set(tableName, uri, start, length, hosts, null); } public FileFragment(String fragmentId, Path path, long start, long length) { @@ -115,6 +116,10 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab return diskIds; } + public void setDiskIds(int[] diskIds){ + this.diskIds = diskIds; + } + public String getTableName() { return this.tableName; } http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index bbb9df1..1beea99 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -1134,14 +1134,14 @@ public class RCFile { private CompressionCodec codec = null; private Metadata metadata = null; - private final byte[] sync = new byte[SYNC_HASH_SIZE]; - private final byte[] syncCheck = new byte[SYNC_HASH_SIZE]; + private byte[] sync; + private byte[] syncCheck; private boolean syncSeen; private long lastSeenSyncPos = 0; private long headerEnd; private long start, end; - private long startOffset, endOffset; + private final long startOffset, endOffset; private int[] targetColumnIndexes; private int currentKeyLength; @@ -1188,6 +1188,9 @@ public class RCFile { @Override public void init() throws IOException { + sync = new byte[SYNC_HASH_SIZE]; + syncCheck = new byte[SYNC_HASH_SIZE]; + more = startOffset < endOffset; rowId = new LongWritable(); readBytes = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java index 083670a..be8b6de 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java @@ -18,8 +18,11 @@ package org.apache.tajo.storage; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.*; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; @@ -28,14 +31,18 @@ import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.CommonTestingUtil; import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; +import java.util.List; +import java.util.UUID; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; public class TestStorageManager { private TajoConf conf; @@ -43,6 +50,7 @@ public class TestStorageManager { AbstractStorageManager sm = null; private Path testDir; private FileSystem fs; + @Before public void setUp() throws Exception { conf = new TajoConf(); @@ -90,4 +98,105 @@ public class TestStorageManager { } assertEquals(4,i); } + + @Test + public void testGetSplit() throws Exception { + final Configuration conf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + + int testCount = 100; + Path tablePath = new Path("/testGetSplit"); + try { + DistributedFileSystem fs = cluster.getFileSystem(); + + // Create test partitions + List<Path> partitions = Lists.newArrayList(); + for (int i =0; i < testCount; i++){ + Path tmpFile = new Path(tablePath, String.valueOf(i)); + DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl); + partitions.add(tmpFile); + } + + assertTrue(fs.exists(tablePath)); + AbstractStorageManager sm = StorageManagerFactory.getStorageManager(new TajoConf(conf), tablePath); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age",Type.INT4); + schema.addColumn("name",Type.TEXT); + TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); + + List<FileFragment> splits = Lists.newArrayList(); + // Get FileFragments in partition batch + splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()]))); + assertEquals(testCount, splits.size()); + // -1 is unknown volumeId + assertEquals(-1, splits.get(0).getDiskIds()[0]); + + splits.clear(); + splits.addAll(sm.getSplits("data", meta, schema, + partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2]))); + assertEquals(testCount / 2, splits.size()); + assertEquals(1, splits.get(0).getHosts().length); + assertEquals(-1, splits.get(0).getDiskIds()[0]); + fs.close(); + } finally { + cluster.shutdown(); + + File dir = new File(testDataPath); + dir.delete(); + } + } + + @Test + public void testGetSplitWithBlockStorageLocationsBatching() throws Exception { + final Configuration conf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2).build(); + + int testCount = 100; + Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching"); + try { + DistributedFileSystem fs = cluster.getFileSystem(); + + // Create test files + for (int i = 0; i < testCount; i++) { + Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat"); + DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl); + } + assertTrue(fs.exists(tablePath)); + AbstractStorageManager sm = StorageManagerFactory.getStorageManager(new TajoConf(conf), tablePath); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT4); + schema.addColumn("name", Type.TEXT); + TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); + + List<FileFragment> splits = Lists.newArrayList(); + splits.addAll(sm.getSplits("data", meta, schema, tablePath)); + + assertEquals(testCount, splits.size()); + assertEquals(2, splits.get(0).getHosts().length); + assertEquals(2, splits.get(0).getDiskIds().length); + assertNotEquals(-1, splits.get(0).getDiskIds()[0]); + fs.close(); + } finally { + cluster.shutdown(); + + File dir = new File(testDataPath); + dir.delete(); + } + } }
