Hi!
I'm trying to run a MR job, but it keeps on failing and I can't understand
why.
Sometimes it shows output at 66% and sometimes 98% or so.
I had a couple of exception before that I didn't catch that made the job to
fail.


The log file from the task can be found at:
http://pastebin.com/m4414d369


and the code looks like:
//Java
import java.io.*;
import java.util.*;
import java.net.*;

//Hadoop
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

//HBase
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.mapred.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.client.*;
// org.apache.hadoop.hbase.client.HTable

//Extra
import org.apache.commons.cli.ParseException;

import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.*;
import org.apache.commons.httpclient.methods.*;
import org.apache.commons.httpclient.params.HttpMethodParams;


public class SerpentMR1 extends TableMap implements Mapper, Tool {

    //Setting DebugLevel
    private static final int DL = 0;

    //Setting up the variables for the MR job
    private static final String NAME = "SerpentMR1";
    private static final String INPUTTABLE = "sources";
    private final String[] COLS = {"content:feedurl", "content:ttl",
"content:updated"};


    private Configuration conf;

    public JobConf createSubmittableJob(String[] args) throws IOException{
        JobConf c = new JobConf(getConf(), SerpentMR1.class);
        String jar = "/home/hbase/SerpentMR/" +NAME+".jar";
        c.setJar(jar);
        c.setJobName(NAME);

        int mapTasks = 4;
        int reduceTasks = 20;

        c.setNumMapTasks(mapTasks);
        c.setNumReduceTasks(reduceTasks);

        String inputCols = "";
        for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; }

        TableMap.initJob(INPUTTABLE, inputCols, this.getClass(), Text.class,
BytesWritable.class, c);
        //Classes between:

        c.setOutputFormat(TextOutputFormat.class);
        Path path = new Path("users"); //inserting into a temp table
        FileOutputFormat.setOutputPath(c, path);

        c.setReducerClass(MyReducer.class);
        return c;
    }

    public void map(ImmutableBytesWritable key, RowResult res,
OutputCollector output, Reporter reporter)
    throws IOException {
        Cell cellLast    = res.get(COLS[2].getBytes());//lastupdate

        long oldTime = cellLast.getTimestamp();

        Cell cell_ttl    = res.get(COLS[1].getBytes());//ttl
        long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() );
        byte[] url = null;

        long currTime = time.GetTimeInMillis();

        if(currTime - oldTime > ttl){
            url = res.get(COLS[0].getBytes()).getValue();//url
            output.collect(new Text(Base64.encode_strip(res.getRow())), new
BytesWritable(url) );/
        }
    }



    public static class MyReducer implements Reducer{
//org.apache.hadoop.mapred.Reducer{


        private int timeout = 1000; //Sets the connection timeout time ms;

        public void reduce(Object key, Iterator values, OutputCollector
output, Reporter rep)
        throws IOException {
            HttpClient client = new HttpClient();//new
MultiThreadedHttpConnectionManager());
            client.getHttpConnectionManager().
                getParams().setConnectionTimeout(timeout);

            GetMethod method = null;

            int stat = 0;
            String content = "";
            byte[] colFam = "select".getBytes();
            byte[] column = "lastupdate".getBytes();
            byte[] currTime = null;

            HBaseRef hbref = new HBaseRef();
            JerlType sendjerl = null; //new JerlType();
            ArrayList jd = new ArrayList();

            InputStream is = null;

            while(values.hasNext()){
                BytesWritable bw = (BytesWritable)values.next();

                String address = new String(bw.get());
                try{
                    System.out.println(address);

                    method = new GetMethod(address);
                    method.setFollowRedirects(true);

                } catch (Exception e){
                    System.err.println("Invalid Address");
                    e.printStackTrace();
                }

                if (method != null){
                    try {
                    // Execute the method.
                        stat = client.executeMethod(method);

                        if(stat == 200){
                            content = "";
                            is =
(InputStream)(method.getResponseBodyAsStream());

                            //Write to HBase new stamp select:lastupdate
                            currTime =
StreamyUtil.LongToBytes(time.GetTimeInMillis() );
                            jd.add(new JerlData(INPUTTABLE.getBytes(),
((Text)key).getBytes(), colFam, column,
                                                currTime));

                            if (is !=
null){output.collect(((Text)key).getBytes(), is);}
                        }
                        else if(stat == 302){
                            System.err.println("302 not complete in reader!!
New url = ");
                        }
                        else {
                            System.err.println("Method failed: " +
method.getStatusLine());
                            currTime  =
StreamyUtil.LongToBytes(Long.MAX_VALUE);
                            jd.add(new JerlData(INPUTTABLE.getBytes(),
((Text)key).getBytes(), colFam, column,
                                                currTime));
                            //Set select:lastupdate to Long.MAX_VALUE()
                        }

                    } catch (HttpException e) {
                        System.err.println("Fatal protocol violation: " +
e.getMessage());
                        e.printStackTrace();
                    } catch (IOException e) {
                        System.err.println("Fatal transport error: " +
e.getMessage());
                        e.printStackTrace();
                    } catch (IllegalStateException e){
//                         System.err.println("IllegalStateException: " +
e.getMessage());
                        System.err.println("Unsupported protocol error: " +
e.getMessage());
                        e.printStackTrace();
                    } catch (Exception e){
                        System.err.println("Other Exception: " +
e.getMessage());
                        e.printStackTrace();
                    } finally {
                    // Release the connection.
                        method.releaseConnection();
                    }
                }
            }
            HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new
JerlData[0])), hbref);
        }

        public void configure(JobConf conf){}

        public void close(){}
    }

    static int printUsage() {
        System.out.println(NAME + "<input> <table_name>");
        return -1;
    }

    public int run(@SuppressWarnings("unused") String[] args) throws
Exception {
        JobClient.runJob(createSubmittableJob(args));
        return 0;
    }

    public Configuration getConf() {
        return this.conf;
    }

    public void setConf(final Configuration c) {
        this.conf = c;
    }

    public static void main(String[] args) throws Exception {
        int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(),
args);
        System.exit(errCode);
    }

}


Regards Erik

Reply via email to