[ https://issues.apache.org/jira/browse/TEZ-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15150456#comment-15150456 ]
Oleksiy Sayankin commented on TEZ-3074: --------------------------------------- Hi, Siddharth! Here is the results of running Tez jobs with logged FileInputFormat. But first let me show, where I have added logs in FileInputFormat.java In method InputSplit[] getSplits (JobConf job, int numSplits) {code:title=org.apache.hadoop.mapred.FileInputFormat.java|borderStyle=solid} public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { Stopwatch sw = new Stopwatch().start(); FileStatus[] files = listStatus(job); // Save the number of input files for metrics/loadgen job.setLong(NUM_INPUT_FILES, files.length); long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); LOG.info("###@@@ totalSize = " + totalSize + ", goalSize = " + goalSize + ", numSplits = " + numSplits + ", minSize = " + minSize); // generate splits ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); LOG.info("###@@@ file.getPath() = " + path + ", file.getLen() = " + length + ", file.getModificationTime() = " + file.getModificationTime()); if (length != 0) { FileSystem fs = path.getFileSystem(job); BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } LOG.info("###@@@ blkLocations.length = " + blkLocations.length); if (isSplitable(fs, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; LOG.debug("###@@@ blockSize = " + blockSize + ", splitSize = " + splitSize + ", bytesRemaining = " + bytesRemaining); while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { LOG.info("###@@@ AAA"); String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { LOG.info("###@@@ BBB"); String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else { LOG.info("###@@@ CCC"); String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return splits.toArray(new FileSplit[splits.size()]); } {code} in method getBlockIndex(BlockLocation[] blkLocations, long offset) {code:title=org.apache.hadoop.mapred.FileInputFormat.java|borderStyle=solid} protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { for (int i = 0 ; i < blkLocations.length; i++) { // is the offset inside this block? if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ return i; } } LOG.info("###@@@ blkLocations.length = " + blkLocations.length); LOG.info("###@@@ offset = " + offset); BlockLocation last = blkLocations[blkLocations.length -1]; long fileLength = last.getOffset() + last.getLength() -1; throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")"); } {code} In method String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, long offset, long splitSize, NetworkTopology clusterMap) {code:title=org.apache.hadoop.mapred.FileInputFormat.java|borderStyle=solid} private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, long offset, long splitSize, NetworkTopology clusterMap) throws IOException { LOG.info("###@@@ blkLocations.length = " + blkLocations.length); LOG.info("###@@@ offset = " + offset); int startIndex = getBlockIndex(blkLocations, offset); {code} This is the result: {noformat} 2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] mapred.FileInputFormat: ###@@@ file.getPath() = hdfs:///user/hive/warehouse/ptest1/z=66/tempfile533, file.getLen() = 65536, file.getModificationTime() = 1455235539287 2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] mapred.FileInputFormat: ###@@@ blkLocations.length = 0 2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] mapred.FileInputFormat: ###@@@ BBB 2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] mapred.FileInputFormat: ###@@@ blkLocations.length = 0 2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] mapred.FileInputFormat: ###@@@ offset = 0 2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] mapred.FileInputFormat: ###@@@ blkLocations.length = 0 2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] mapred.FileInputFormat: ###@@@ offset = 0 {noformat} According to the log, tempfile533 is the cause of the issue on my case. {noformat} [root@host1 z=66]# ls -l|grep tempfile533 -rw-rw-r-- 1 hdfs hdfs 1048576 Feb 12 09:05 tempfile533 {noformat} It seemed to be being generated or has been generated then > Multithreading issue java.lang.ArrayIndexOutOfBoundsException: -1 while > working with Tez > ---------------------------------------------------------------------------------------- > > Key: TEZ-3074 > URL: https://issues.apache.org/jira/browse/TEZ-3074 > Project: Apache Tez > Issue Type: Bug > Affects Versions: 0.5.3 > Reporter: Oleksiy Sayankin > Fix For: 0.5.3 > > Attachments: tempsource.data > > > *STEP 1. Install and configure Tez on yarn* > *STEP 2. Configure hive for tez* > *STEP 3. Create test tables in Hive and fill it with data* > Enable dynamic partitioning in Hive. Add to {{hive-site.xml}} and restart > Hive. > {code:xml} > <!-- DYNAMIC PARTITION --> > <property> > <name>hive.exec.dynamic.partition</name> > <value>true</value> > </property> > <property> > <name>hive.exec.dynamic.partition.mode</name> > <value>nonstrict</value> > </property> > <property> > <name>hive.exec.max.dynamic.partitions.pernode</name> > <value>2000</value> > </property> > <property> > <name>hive.exec.max.dynamic.partitions</name> > <value>2000</value> > </property> > {code} > Execute in command line > {code} > hadoop fs -put tempsource.data / > {code} > Execute in command line. Use attached file {{tempsource.data}} > {code} > hive> CREATE TABLE test3 (x INT, y STRING) ROW FORMAT DELIMITED FIELDS > TERMINATED BY ','; > hive> CREATE TABLE ptest1 (x INT, y STRING) PARTITIONED BY (z STRING) ROW > FORMAT DELIMITED FIELDS TERMINATED BY ','; > hive> CREATE TABLE tempsource (x INT, y STRING, z STRING) ROW FORMAT > DELIMITED FIELDS TERMINATED BY ','; > hive> LOAD DATA INPATH '/tempsource.data' OVERWRITE INTO TABLE tempsource; > hive> INSERT OVERWRITE TABLE ptest1 PARTITION (z) SELECT x,y,z FROM > tempsource; > {code} > *STEP 4. Mount NFS on cluster* > *STEP 5. Run teragen test application* > Use separate console > {code} > /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.5.1.jar > teragen -Dmapred.map.tasks=7 -Dmapreduce.map.disk=0 > -Dmapreduce.map.cpu.vcores=0 1000000000 /user/hdfs/input > {code} > *STEP 6. Create many test files* > Use separate console > {code} > cd /hdfs/cluster/user/hive/warehouse/ptest1/z=66 > for i in `seq 1 10000`; do dd if=/dev/urandom of=tempfile$i bs=1M count=1; > done > {code} > *STEP 7. Run the following query repeatedly in other console* > Use separate console > {code} > hive> insert overwrite table test3 select x,y from ( select x,y,z from > (select x,y,z from ptest1 where x > 5 and x < 1000 union all select x,y,z > from ptest1 where x > 5 and x < 1000) a)b; > {code} > After some time of working it gives an exception. > {noformat} > Status: Failed > Vertex failed, vertexName=Map 3, vertexId=vertex_1443452487059_0426_1_01, > diagnostics=[Vertex vertex_1443452487059_0426_1_01 [Map 3] killed/failed due > to:ROOT_INPUT_INIT_FAILURE, Vertex Input: ptest1 initializer failed, > vertex=vertex_1443452487059_0426_1_01 [Map 3], > java.lang.ArrayIndexOutOfBoundsException: -1 > at > org.apache.hadoop.mapred.FileInputFormat.getBlockIndex(FileInputFormat.java:395) > at > org.apache.hadoop.mapred.FileInputFormat.getSplitHostsAndCachedHosts(FileInputFormat.java:579) > at > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:359) > at > org.apache.hadoop.hive.ql.io.HiveInputFormat.addSplitsForGroup(HiveInputFormat.java:300) > at > org.apache.hadoop.hive.ql.io.HiveInputFormat.getSplits(HiveInputFormat.java:402) > at > org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator.initialize(HiveSplitGenerator.java:132) > at > org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:245) > at > org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:239) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1566) > at > org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:239) > at > org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:226) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > ] > Vertex killed, vertexName=Map 1, vertexId=vertex_1443452487059_0426_1_00, > diagnostics=[Vertex received Kill in INITED state., Vertex > vertex_1443452487059_0426_1_00 [Map 1] killed/failed due to:null] > DAG failed due to vertex failure. failedVertices:1 killedVertices:1 > FAILED: Execution Error, return code 2 from > org.apache.hadoop.hive.ql.exec.tez.TezTask > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)