[ 
https://issues.apache.org/jira/browse/TEZ-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15150456#comment-15150456
 ] 

Oleksiy Sayankin commented on TEZ-3074:
---------------------------------------

Hi, Siddharth!

Here is the results of running Tez jobs with logged FileInputFormat. But first 
let me show, where I have added logs in FileInputFormat.java

In method InputSplit[] getSplits (JobConf job, int numSplits)

{code:title=org.apache.hadoop.mapred.FileInputFormat.java|borderStyle=solid}
  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    Stopwatch sw = new Stopwatch().start();
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    LOG.info("###@@@ totalSize = " + totalSize + ", goalSize = " + goalSize + 
", numSplits = " + numSplits + ", minSize = " + minSize);
    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();

      LOG.info("###@@@ file.getPath() = " + path + ", file.getLen() = " + 
length + ", file.getModificationTime() = " + file.getModificationTime());

      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        LOG.info("###@@@ blkLocations.length = " + blkLocations.length);
        if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          long bytesRemaining = length;

          LOG.debug("###@@@ blockSize = " + blockSize + ", splitSize = " + 
splitSize + ", bytesRemaining = " + bytesRemaining);

          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            LOG.info("###@@@ AAA");
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            LOG.info("###@@@ BBB");
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, 
length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          LOG.info("###@@@ CCC");
          String[][] splitHosts = 
getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    return splits.toArray(new FileSplit[splits.size()]);
  }
{code}

in method getBlockIndex(BlockLocation[] blkLocations, long offset)

{code:title=org.apache.hadoop.mapred.FileInputFormat.java|borderStyle=solid}
 protected int getBlockIndex(BlockLocation[] blkLocations, 
                              long offset) {
    for (int i = 0 ; i < blkLocations.length; i++) {
      // is the offset inside this block?
      if ((blkLocations[i].getOffset() <= offset) &&
          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
        return i;
      }
    }

    LOG.info("###@@@ blkLocations.length = " + blkLocations.length);
    LOG.info("###@@@ offset = " + offset);

    BlockLocation last = blkLocations[blkLocations.length -1];
    long fileLength = last.getOffset() + last.getLength() -1;
    throw new IllegalArgumentException("Offset " + offset + 
                                       " is outside of file (0.." +
                                       fileLength + ")");
  }
{code}


In method String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 
long offset, long splitSize, NetworkTopology clusterMap)

{code:title=org.apache.hadoop.mapred.FileInputFormat.java|borderStyle=solid}
 private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 
      long offset, long splitSize, NetworkTopology clusterMap)
  throws IOException {

    LOG.info("###@@@ blkLocations.length = " + blkLocations.length);
    LOG.info("###@@@ offset = " + offset);

    int startIndex = getBlockIndex(blkLocations, offset);
{code}

This is the result:

{noformat}
2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] 
mapred.FileInputFormat: ###@@@ file.getPath() = 
hdfs:///user/hive/warehouse/ptest1/z=66/tempfile533, file.getLen() = 65536, 
file.getModificationTime() = 1455235539287
2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] 
mapred.FileInputFormat: ###@@@ blkLocations.length = 0
2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] 
mapred.FileInputFormat: ###@@@ BBB
2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] 
mapred.FileInputFormat: ###@@@ blkLocations.length = 0
2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] 
mapred.FileInputFormat: ###@@@ offset = 0
2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] 
mapred.FileInputFormat: ###@@@ blkLocations.length = 0
2016-02-12 09:05:40,558 INFO [InputInitializer [Map 1] #0] 
mapred.FileInputFormat: ###@@@ offset = 0
{noformat}


According to the log, tempfile533 is the cause of the issue on my case.

{noformat}
[root@host1 z=66]# ls -l|grep tempfile533 
-rw-rw-r-- 1 hdfs hdfs 1048576 Feb 12 09:05 tempfile533
{noformat}

It seemed to be being generated or has been generated then


> Multithreading issue java.lang.ArrayIndexOutOfBoundsException: -1 while 
> working with Tez
> ----------------------------------------------------------------------------------------
>
>                 Key: TEZ-3074
>                 URL: https://issues.apache.org/jira/browse/TEZ-3074
>             Project: Apache Tez
>          Issue Type: Bug
>    Affects Versions: 0.5.3
>            Reporter: Oleksiy Sayankin
>             Fix For: 0.5.3
>
>         Attachments: tempsource.data
>
>
> *STEP 1. Install and configure Tez on yarn*
> *STEP 2. Configure hive for tez*
> *STEP 3. Create test tables in Hive and fill it with data*
> Enable dynamic partitioning in Hive. Add to {{hive-site.xml}} and restart 
> Hive.
> {code:xml}
> <!-- DYNAMIC PARTITION -->
> <property>
>   <name>hive.exec.dynamic.partition</name>
>   <value>true</value>
> </property>
> <property>
>   <name>hive.exec.dynamic.partition.mode</name>
>   <value>nonstrict</value>
> </property>
> <property>
>   <name>hive.exec.max.dynamic.partitions.pernode</name>
>   <value>2000</value>
> </property>
> <property>
>   <name>hive.exec.max.dynamic.partitions</name>
>   <value>2000</value>
> </property>
> {code}
> Execute in command line
> {code}
> hadoop fs -put tempsource.data /
> {code}
> Execute in command line. Use attached file {{tempsource.data}}
> {code}
> hive> CREATE TABLE test3 (x INT, y STRING) ROW FORMAT DELIMITED FIELDS 
> TERMINATED BY ',';
> hive> CREATE TABLE ptest1 (x INT, y STRING) PARTITIONED BY (z STRING) ROW 
> FORMAT DELIMITED FIELDS TERMINATED BY ',';
> hive> CREATE TABLE tempsource (x INT, y STRING, z STRING) ROW FORMAT 
> DELIMITED FIELDS TERMINATED BY ',';
> hive> LOAD DATA INPATH '/tempsource.data' OVERWRITE INTO TABLE tempsource;
> hive> INSERT OVERWRITE TABLE ptest1 PARTITION (z) SELECT x,y,z FROM 
> tempsource;
> {code}
> *STEP 4. Mount NFS on cluster*
> *STEP 5. Run teragen test application*
> Use separate console
> {code}
> /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.5.1.jar 
> teragen -Dmapred.map.tasks=7 -Dmapreduce.map.disk=0 
> -Dmapreduce.map.cpu.vcores=0 1000000000 /user/hdfs/input
> {code}
> *STEP 6. Create many test files*
> Use separate console
> {code}
> cd /hdfs/cluster/user/hive/warehouse/ptest1/z=66
> for i in `seq 1 10000`; do dd if=/dev/urandom of=tempfile$i bs=1M count=1;
> done
> {code}
> *STEP 7. Run the following query repeatedly in other console*
> Use separate console
> {code}
> hive> insert overwrite table test3 select x,y from ( select x,y,z from 
> (select x,y,z from ptest1 where x > 5 and x < 1000 union all select x,y,z 
> from ptest1 where x > 5 and x < 1000) a)b;
> {code}
> After some time of working it gives an exception.
> {noformat}
> Status: Failed
> Vertex failed, vertexName=Map 3, vertexId=vertex_1443452487059_0426_1_01,
> diagnostics=[Vertex vertex_1443452487059_0426_1_01 [Map 3] killed/failed due
> to:ROOT_INPUT_INIT_FAILURE, Vertex Input: ptest1 initializer failed,
> vertex=vertex_1443452487059_0426_1_01 [Map 3],
> java.lang.ArrayIndexOutOfBoundsException: -1
>     at
> org.apache.hadoop.mapred.FileInputFormat.getBlockIndex(FileInputFormat.java:395)
>     at
> org.apache.hadoop.mapred.FileInputFormat.getSplitHostsAndCachedHosts(FileInputFormat.java:579)
>     at
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:359)
>     at
> org.apache.hadoop.hive.ql.io.HiveInputFormat.addSplitsForGroup(HiveInputFormat.java:300)
>     at
> org.apache.hadoop.hive.ql.io.HiveInputFormat.getSplits(HiveInputFormat.java:402)
>     at
> org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator.initialize(HiveSplitGenerator.java:132)
>     at
> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:245)
>     at
> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:239)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1566)
>     at
> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:239)
>     at
> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:226)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> ]
> Vertex killed, vertexName=Map 1, vertexId=vertex_1443452487059_0426_1_00,
> diagnostics=[Vertex received Kill in INITED state., Vertex
> vertex_1443452487059_0426_1_00 [Map 1] killed/failed due to:null]
> DAG failed due to vertex failure. failedVertices:1 killedVertices:1
> FAILED: Execution Error, return code 2 from
> org.apache.hadoop.hive.ql.exec.tez.TezTask
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to