[ 
https://issues.apache.org/jira/browse/HAMA-970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14659817#comment-14659817
 ] 

Edward J. Yoon edited comment on HAMA-970 at 8/6/15 10:44 AM:
--------------------------------------------------------------

Hello JongYoon, 

This issue isn't simple, somewhat tricky, and obviously *core* contribution. I 
also thinking about what is best way. :-)

First of all, the number of splits equals to the number of HDFS blocks. For 
instance, 1GB file will be splitted into 8 blocks if HDFS default block size is 
128MB.

Now, let's assume that the Hama cluster has 5 task slots. Currently 
BSPJobClient will throw the "exceed max capacity" exception if user set the 1GB 
file as a input. To address this issue fundamentally, you should assign one 
more splits to each task. Please look into HADOOP-2560 "Processing multiple 
input splits per mapper task" and HAMA-964.

{code}
BSPJobClient.java

      if (maxTasks < splits.length) {
        throw new IOException(
            "Job failed! The number of splits has exceeded the number of max 
tasks. The number of splits: "
                + splits.length + ", The number of max tasks: " + maxTasks);
      }
{code}

Once HAMA-964 is addressed, I think we can clean up these codes. :-)

Also, there's also opposite case. Let's assume that the Hama cluster has 20 
task slots and user want to use MAX tasks for 1GB input data. The framework 
creates 8 tasks for assigning 8 splits first, and then creates 12 tasks without 
assigned split. Then, user can re-distribute input data among 20 tasks within 
BSP program.

{code}
JobInProgress.java

      // creates 8 tasks
      this.tasks = new TaskInProgress[numBSPTasks];
      for (int i = 0; i < splits.length; i++) {
        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
            splits[i], this.conf, this, i);
      }

      // creates 12 tasks without assigned split 
      for (int i = splits.length; i < numBSPTasks; i++) {
        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
            null, this.conf, this, i);
      }
{code}

For this case, another option is use of Input Partitioner. User can split input 
data into multiple files as number user want. It means that user can force-set 
the number of bap tasks.

{code}
Below job has two input files, but if you set bsp.setNumBspTask(4), input 
partitioning job split 2 text files into 4 files, and then main BSP program 
will be run with 4 tasks.

  public void testPartitioner() throws Exception {

    Configuration conf = new Configuration();
    conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
    conf.setBoolean("bsp.input.runtime.partitioning", true);
    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
    bsp.setJobName("Test partitioning with input");
    bsp.setBspClass(PartionedBSP.class);
    bsp.setNumBspTask(4);
    conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
    bsp.setInputFormat(TextInputFormat.class);
    bsp.setOutputFormat(NullOutputFormat.class);
    FileInputFormat.setInputPaths(bsp, "../CHANGES.txt,../README.txt");
    bsp.setPartitioner(HashPartitioner.class);
    assertTrue(bsp.waitForCompletion(true));

    FileSystem fs = FileSystem.get(conf);
    fs.delete(OUTPUT_PATH, true);
  }
{code}

So, if you want to address this issue, my suggestion is like below:

1) Please check first whether CombineFileInputFormat can be used when the size 
of splits is bigger than Hama cluster max capacity.
2) CombineFileInputFormat is only for Text file format. To support binary file 
format, we need to fix HAMA-964.
3) Documentation that explains how handle this problem.

1. 
http://svn.apache.org/repos/asf/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java



was (Author: udanax):
Hello JongYoon, 

This issue isn't simple, somewhat tricky, and obviously *core* contribution. I 
also thinking about what is best way. :-)

First of all, the number of splits equals to the number of HDFS blocks. For 
instance, 1GB file will be splitted into 8 blocks if HDFS default block size is 
128MB.

Now, let's assume that the Hama cluster has 5 task slots. Currently 
BSPJobClient will throw the "exceed max capacity" exception if user set the 1GB 
file as a input. To address this issue fundamentally, you should assign one 
more splits to each task. Please look into HADOOP-2560 "Processing multiple 
input splits per mapper task" and HAMA-964.

{code}
BSPJobClient.java

      if (maxTasks < splits.length) {
        throw new IOException(
            "Job failed! The number of splits has exceeded the number of max 
tasks. The number of splits: "
                + splits.length + ", The number of max tasks: " + maxTasks);
      }
{code}

Once HAMA-964 is addressed, I think we can clean up these codes. :-)

Also, there's also opposite case. Let's assume that the Hama cluster has 20 
task slots and user want to use MAX tasks for 1GB input data. The framework 
creates 8 tasks for assigning 8 splits first, and then creates 12 tasks without 
assigned split. Then, user can re-distribute input data among 20 tasks within 
BSP program.

{code}
JobInProgress.java

      // creates 8 tasks
      this.tasks = new TaskInProgress[numBSPTasks];
      for (int i = 0; i < splits.length; i++) {
        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
            splits[i], this.conf, this, i);
      }

      // creates 12 tasks without assigned split 
      for (int i = splits.length; i < numBSPTasks; i++) {
        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
            null, this.conf, this, i);
      }
{code}

For this case, another option is use of Input Partitioner. User can split input 
data into multiple files as number user want. It means that user can force-set 
the number of bap tasks.

{code}
  public void testPartitioner() throws Exception {

    Configuration conf = new Configuration();
    conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
    conf.setBoolean("bsp.input.runtime.partitioning", true);
    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
    bsp.setJobName("Test partitioning with input");
    bsp.setBspClass(PartionedBSP.class);
    bsp.setNumBspTask(2);
    conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
    bsp.setInputFormat(TextInputFormat.class);
    bsp.setOutputFormat(NullOutputFormat.class);
    FileInputFormat.setInputPaths(bsp, "../CHANGES.txt,../README.txt");
    bsp.setPartitioner(HashPartitioner.class);
    assertTrue(bsp.waitForCompletion(true));

    FileSystem fs = FileSystem.get(conf);
    fs.delete(OUTPUT_PATH, true);
  }
{code}

So, if you want to address this issue, my suggestion is like below:

1) Please check first whether CombineFileInputFormat can be used when the size 
of splits is bigger than Hama cluster max capacity.
2) CombineFileInputFormat is only for Text file format. To support binary file 
format, we need to fix HAMA-964.
3) Documentation that explains how handle this problem.

1. 
http://svn.apache.org/repos/asf/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java


> Exception can occur if the size of splits is bigger than numBSPTasks
> --------------------------------------------------------------------
>
>                 Key: HAMA-970
>                 URL: https://issues.apache.org/jira/browse/HAMA-970
>             Project: Hama
>          Issue Type: Bug
>          Components: bsp core
>    Affects Versions: 0.7.0
>            Reporter: JongYoon Lim
>            Priority: Trivial
>         Attachments: HAMA-970.patch
>
>
> In JonInProgress, it's possble to get Exception in initTasks(). 
> {code:java}
> this.tasks = new TaskInProgress[numBSPTasks];
> for (int i = 0; i < splits.length; i++) {
>   tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(), 
> splits[i], this.conf, this, i);
> }
> {code}
> I'm not sure that *numBSPTask* is always bigger than *splits.length*. 
> So, I think it's better to use bigger value to assign the *tasks* array. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to