[
https://issues.apache.org/jira/browse/HAMA-970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14659817#comment-14659817
]
Edward J. Yoon edited comment on HAMA-970 at 8/6/15 10:44 AM:
--------------------------------------------------------------
Hello JongYoon,
This issue isn't simple, somewhat tricky, and obviously *core* contribution. I
also thinking about what is best way. :-)
First of all, the number of splits equals to the number of HDFS blocks. For
instance, 1GB file will be splitted into 8 blocks if HDFS default block size is
128MB.
Now, let's assume that the Hama cluster has 5 task slots. Currently
BSPJobClient will throw the "exceed max capacity" exception if user set the 1GB
file as a input. To address this issue fundamentally, you should assign one
more splits to each task. Please look into HADOOP-2560 "Processing multiple
input splits per mapper task" and HAMA-964.
{code}
BSPJobClient.java
if (maxTasks < splits.length) {
throw new IOException(
"Job failed! The number of splits has exceeded the number of max
tasks. The number of splits: "
+ splits.length + ", The number of max tasks: " + maxTasks);
}
{code}
Once HAMA-964 is addressed, I think we can clean up these codes. :-)
Also, there's also opposite case. Let's assume that the Hama cluster has 20
task slots and user want to use MAX tasks for 1GB input data. The framework
creates 8 tasks for assigning 8 splits first, and then creates 12 tasks without
assigned split. Then, user can re-distribute input data among 20 tasks within
BSP program.
{code}
JobInProgress.java
// creates 8 tasks
this.tasks = new TaskInProgress[numBSPTasks];
for (int i = 0; i < splits.length; i++) {
tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
splits[i], this.conf, this, i);
}
// creates 12 tasks without assigned split
for (int i = splits.length; i < numBSPTasks; i++) {
tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
null, this.conf, this, i);
}
{code}
For this case, another option is use of Input Partitioner. User can split input
data into multiple files as number user want. It means that user can force-set
the number of bap tasks.
{code}
Below job has two input files, but if you set bsp.setNumBspTask(4), input
partitioning job split 2 text files into 4 files, and then main BSP program
will be run with 4 tasks.
public void testPartitioner() throws Exception {
Configuration conf = new Configuration();
conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
conf.setBoolean("bsp.input.runtime.partitioning", true);
BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
bsp.setJobName("Test partitioning with input");
bsp.setBspClass(PartionedBSP.class);
bsp.setNumBspTask(4);
conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
bsp.setInputFormat(TextInputFormat.class);
bsp.setOutputFormat(NullOutputFormat.class);
FileInputFormat.setInputPaths(bsp, "../CHANGES.txt,../README.txt");
bsp.setPartitioner(HashPartitioner.class);
assertTrue(bsp.waitForCompletion(true));
FileSystem fs = FileSystem.get(conf);
fs.delete(OUTPUT_PATH, true);
}
{code}
So, if you want to address this issue, my suggestion is like below:
1) Please check first whether CombineFileInputFormat can be used when the size
of splits is bigger than Hama cluster max capacity.
2) CombineFileInputFormat is only for Text file format. To support binary file
format, we need to fix HAMA-964.
3) Documentation that explains how handle this problem.
1.
http://svn.apache.org/repos/asf/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
was (Author: udanax):
Hello JongYoon,
This issue isn't simple, somewhat tricky, and obviously *core* contribution. I
also thinking about what is best way. :-)
First of all, the number of splits equals to the number of HDFS blocks. For
instance, 1GB file will be splitted into 8 blocks if HDFS default block size is
128MB.
Now, let's assume that the Hama cluster has 5 task slots. Currently
BSPJobClient will throw the "exceed max capacity" exception if user set the 1GB
file as a input. To address this issue fundamentally, you should assign one
more splits to each task. Please look into HADOOP-2560 "Processing multiple
input splits per mapper task" and HAMA-964.
{code}
BSPJobClient.java
if (maxTasks < splits.length) {
throw new IOException(
"Job failed! The number of splits has exceeded the number of max
tasks. The number of splits: "
+ splits.length + ", The number of max tasks: " + maxTasks);
}
{code}
Once HAMA-964 is addressed, I think we can clean up these codes. :-)
Also, there's also opposite case. Let's assume that the Hama cluster has 20
task slots and user want to use MAX tasks for 1GB input data. The framework
creates 8 tasks for assigning 8 splits first, and then creates 12 tasks without
assigned split. Then, user can re-distribute input data among 20 tasks within
BSP program.
{code}
JobInProgress.java
// creates 8 tasks
this.tasks = new TaskInProgress[numBSPTasks];
for (int i = 0; i < splits.length; i++) {
tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
splits[i], this.conf, this, i);
}
// creates 12 tasks without assigned split
for (int i = splits.length; i < numBSPTasks; i++) {
tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
null, this.conf, this, i);
}
{code}
For this case, another option is use of Input Partitioner. User can split input
data into multiple files as number user want. It means that user can force-set
the number of bap tasks.
{code}
public void testPartitioner() throws Exception {
Configuration conf = new Configuration();
conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
conf.setBoolean("bsp.input.runtime.partitioning", true);
BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
bsp.setJobName("Test partitioning with input");
bsp.setBspClass(PartionedBSP.class);
bsp.setNumBspTask(2);
conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
bsp.setInputFormat(TextInputFormat.class);
bsp.setOutputFormat(NullOutputFormat.class);
FileInputFormat.setInputPaths(bsp, "../CHANGES.txt,../README.txt");
bsp.setPartitioner(HashPartitioner.class);
assertTrue(bsp.waitForCompletion(true));
FileSystem fs = FileSystem.get(conf);
fs.delete(OUTPUT_PATH, true);
}
{code}
So, if you want to address this issue, my suggestion is like below:
1) Please check first whether CombineFileInputFormat can be used when the size
of splits is bigger than Hama cluster max capacity.
2) CombineFileInputFormat is only for Text file format. To support binary file
format, we need to fix HAMA-964.
3) Documentation that explains how handle this problem.
1.
http://svn.apache.org/repos/asf/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
> Exception can occur if the size of splits is bigger than numBSPTasks
> --------------------------------------------------------------------
>
> Key: HAMA-970
> URL: https://issues.apache.org/jira/browse/HAMA-970
> Project: Hama
> Issue Type: Bug
> Components: bsp core
> Affects Versions: 0.7.0
> Reporter: JongYoon Lim
> Priority: Trivial
> Attachments: HAMA-970.patch
>
>
> In JonInProgress, it's possble to get Exception in initTasks().
> {code:java}
> this.tasks = new TaskInProgress[numBSPTasks];
> for (int i = 0; i < splits.length; i++) {
> tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
> splits[i], this.conf, this, i);
> }
> {code}
> I'm not sure that *numBSPTask* is always bigger than *splits.length*.
> So, I think it's better to use bigger value to assign the *tasks* array.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)