Hi! I'm trying to run a MR job, but it keeps on failing and I can't understand why. Sometimes it shows output at 66% and sometimes 98% or so. I had a couple of exception before that I didn't catch that made the job to fail.
The log file from the task can be found at: http://pastebin.com/m4414d369 and the code looks like: //Java import java.io.*; import java.util.*; import java.net.*; //Hadoop import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; //HBase import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.mapred.*; import org.apache.hadoop.hbase.io.*; import org.apache.hadoop.hbase.client.*; // org.apache.hadoop.hbase.client.HTable //Extra import org.apache.commons.cli.ParseException; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.httpclient.*; import org.apache.commons.httpclient.methods.*; import org.apache.commons.httpclient.params.HttpMethodParams; public class SerpentMR1 extends TableMap implements Mapper, Tool { //Setting DebugLevel private static final int DL = 0; //Setting up the variables for the MR job private static final String NAME = "SerpentMR1"; private static final String INPUTTABLE = "sources"; private final String[] COLS = {"content:feedurl", "content:ttl", "content:updated"}; private Configuration conf; public JobConf createSubmittableJob(String[] args) throws IOException{ JobConf c = new JobConf(getConf(), SerpentMR1.class); String jar = "/home/hbase/SerpentMR/" +NAME+".jar"; c.setJar(jar); c.setJobName(NAME); int mapTasks = 4; int reduceTasks = 20; c.setNumMapTasks(mapTasks); c.setNumReduceTasks(reduceTasks); String inputCols = ""; for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; } TableMap.initJob(INPUTTABLE, inputCols, this.getClass(), Text.class, BytesWritable.class, c); //Classes between: c.setOutputFormat(TextOutputFormat.class); Path path = new Path("users"); //inserting into a temp table FileOutputFormat.setOutputPath(c, path); c.setReducerClass(MyReducer.class); return c; } public void map(ImmutableBytesWritable key, RowResult res, OutputCollector output, Reporter reporter) throws IOException { Cell cellLast = res.get(COLS[2].getBytes());//lastupdate long oldTime = cellLast.getTimestamp(); Cell cell_ttl = res.get(COLS[1].getBytes());//ttl long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() ); byte[] url = null; long currTime = time.GetTimeInMillis(); if(currTime - oldTime > ttl){ url = res.get(COLS[0].getBytes()).getValue();//url output.collect(new Text(Base64.encode_strip(res.getRow())), new BytesWritable(url) );/ } } public static class MyReducer implements Reducer{ //org.apache.hadoop.mapred.Reducer{ private int timeout = 1000; //Sets the connection timeout time ms; public void reduce(Object key, Iterator values, OutputCollector output, Reporter rep) throws IOException { HttpClient client = new HttpClient();//new MultiThreadedHttpConnectionManager()); client.getHttpConnectionManager(). getParams().setConnectionTimeout(timeout); GetMethod method = null; int stat = 0; String content = ""; byte[] colFam = "select".getBytes(); byte[] column = "lastupdate".getBytes(); byte[] currTime = null; HBaseRef hbref = new HBaseRef(); JerlType sendjerl = null; //new JerlType(); ArrayList jd = new ArrayList(); InputStream is = null; while(values.hasNext()){ BytesWritable bw = (BytesWritable)values.next(); String address = new String(bw.get()); try{ System.out.println(address); method = new GetMethod(address); method.setFollowRedirects(true); } catch (Exception e){ System.err.println("Invalid Address"); e.printStackTrace(); } if (method != null){ try { // Execute the method. stat = client.executeMethod(method); if(stat == 200){ content = ""; is = (InputStream)(method.getResponseBodyAsStream()); //Write to HBase new stamp select:lastupdate currTime = StreamyUtil.LongToBytes(time.GetTimeInMillis() ); jd.add(new JerlData(INPUTTABLE.getBytes(), ((Text)key).getBytes(), colFam, column, currTime)); if (is != null){output.collect(((Text)key).getBytes(), is);} } else if(stat == 302){ System.err.println("302 not complete in reader!! New url = "); } else { System.err.println("Method failed: " + method.getStatusLine()); currTime = StreamyUtil.LongToBytes(Long.MAX_VALUE); jd.add(new JerlData(INPUTTABLE.getBytes(), ((Text)key).getBytes(), colFam, column, currTime)); //Set select:lastupdate to Long.MAX_VALUE() } } catch (HttpException e) { System.err.println("Fatal protocol violation: " + e.getMessage()); e.printStackTrace(); } catch (IOException e) { System.err.println("Fatal transport error: " + e.getMessage()); e.printStackTrace(); } catch (IllegalStateException e){ // System.err.println("IllegalStateException: " + e.getMessage()); System.err.println("Unsupported protocol error: " + e.getMessage()); e.printStackTrace(); } catch (Exception e){ System.err.println("Other Exception: " + e.getMessage()); e.printStackTrace(); } finally { // Release the connection. method.releaseConnection(); } } } HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new JerlData[0])), hbref); } public void configure(JobConf conf){} public void close(){} } static int printUsage() { System.out.println(NAME + "<input> <table_name>"); return -1; } public int run(@SuppressWarnings("unused") String[] args) throws Exception { JobClient.runJob(createSubmittableJob(args)); return 0; } public Configuration getConf() { return this.conf; } public void setConf(final Configuration c) { this.conf = c; } public static void main(String[] args) throws Exception { int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(), args); System.exit(errCode); } } Regards Erik