I wrote a very small Hama program to test it on a Yarn cluster running
on my laptop to isolate the problem:
final public class BSPTest extends BSP<LongWritable, Text, LongWritable,
Text, Text> {
@Override
public final void bsp( BSPPeer<LongWritable, Text, LongWritable,
Text, Text> peer)
throws IOException, InterruptedException, SyncException {
LongWritable key = new LongWritable();
Text value = new Text();
peer.readNext(key,value);
peer.write(key,value);
}
public static void main ( String[] args ) throws Exception {
HamaConfiguration conf = new HamaConfiguration();
conf.set("yarn.resourcemanager.address","localhost:8032");
YARNBSPJob job = new YARNBSPJob(conf);
job.setMemoryUsedPerTaskInMb(500);
job.setNumBspTask(4);
job.setJobName("test");
job.setBspClass(BSPTest.class);
job.setJarByClass(BSPTest.class);
job.setInputKeyClass(LongWritable.class);
job.setInputValueClass(Text.class);
job.setInputPath(new Path("in"));
job.setInputFormat(TextInputFormat.class);
job.setPartitioner(org.apache.hama.bsp.HashPartitioner.class);
job.set("bsp.min.split.size",Long.toString(1000));
job.setOutputPath(new Path("out"));
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
job.waitForCompletion(true);
}
}
where "in" is a small text file stored on HDFS. It does the file
partitioning into 4 files but then it gives me the same error:
15/07/26 06:46:25 INFO ipc.Server: IPC Server handler 0 on 10000, call
getTask(attempt_appattempt_1437858941768_0042_000001_0000_000004_4) from
127.0.0.1:54752: error: java.io.IOException:
java.lang.ArrayIndexOutOfBoundsException: 4
java.io.IOException: java.lang.ArrayIndexOutOfBoundsException: 4
at
org.apache.hama.bsp.ApplicationMaster.getTask(ApplicationMaster.java:950)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hama.ipc.RPC$Server.call(RPC.java:615)
at org.apache.hama.ipc.Server$Handler$1.run(Server.java:1211)
at org.apache.hama.ipc.Server$Handler$1.run(Server.java:1207)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
I get the same error even when I remove the partitioning and I use 1 task.
Leonidas
On 07/19/2015 06:55 PM, Edward J. Yoon wrote:
It seems that the reason is that I have 1 input block (1 split) but I
use 4 tasks.
Thanks for your report, it should be addressed.
But the Application master shouldn't crash; it should have used 1 task
instead.
Or, we can launch 1 task and 3 tasks without split. In this case, you
should distribute the input data yourself within your BSP program.
Graph package of 0.7.0 partitions vertices into empty tasks directly
using barrier sync if tasks num is greater than blocks num.
2) If I use the PartitioningRunner using:
job.setPartitioner(org.apache.hama.bsp.HashPartitioner.class);
job.setNumBspTask(4);
job.set("bsp.min.split.size","102");
it fails because it expects a Long key. Here is the log:
By default, PartitioningRunner reads and re-writes key and value pairs
based on "bsp.input.key/value.class". I guess your input is Text file
and so key is automatically Long but you've set MRContainer as a input
key/value class. Can you provide information about job configuration?
On Wed, Jul 15, 2015 at 4:12 PM, Leonidas Fegaras <[email protected]> wrote:
Hi,
I am extending MRQL to support Hama v0.7 on Yarn (see
https://issues.apache.org/jira/browse/MRQL-75 ).
Currently, MRQL on Hama works fine on Mesos but I have problems running it
on Yarn.
1) Without using the PartitioningRunner, the Yarn Application master
crashes.
It seems that the reason is that I have 1 input block (1 split) but I
use 4 tasks.
This may be caused by my input format.
But the Application master shouldn't crash; it should have used 1 task
instead.
The log is attached below.
2) If I use the PartitioningRunner using:
job.setPartitioner(org.apache.hama.bsp.HashPartitioner.class);
job.setNumBspTask(4);
job.set("bsp.min.split.size","102");
it fails because it expects a Long key. Here is the log:
15/07/15 09:31:40 INFO bsp.BSPJobClient: Running job: job_localrunner_0001
15/07/15 09:31:42 INFO bsp.LocalBSPRunner: Setting up a new barrier for 4
tasks!
15/07/15 09:31:42 ERROR bsp.LocalBSPRunner: Exception during BSP execution!
java.io.IOException: wrong key class: org.apache.mrql.MRContainer is not
class org.apache.hadoop.io.LongWritable
at
org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1306)
at
org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1298)
at
org.apache.hama.bsp.SequenceFileRecordWriter.write(SequenceFileRecordWriter.java:47)
at
org.apache.hama.bsp.SequenceFileRecordWriter.write(SequenceFileRecordWriter.java:31)
at org.apache.hama.bsp.BSPPeerImpl$1.collect(BSPPeerImpl.java:335)
at org.apache.hama.bsp.BSPPeerImpl.write(BSPPeerImpl.java:628)
at
org.apache.hama.bsp.PartitioningRunner.bsp(PartitioningRunner.java:156)
Thanks,
Leonidas Fegaras