Do you have some more detailed information? Logs are helpful.

On Mon, Sep 8, 2008 at 3:26 AM, Erik Holstad <[EMAIL PROTECTED]> wrote:

> Hi!
> I'm trying to run a MR job, but it keeps on failing and I can't understand
> why.
> Sometimes it shows output at 66% and sometimes 98% or so.
> I had a couple of exception before that I didn't catch that made the job to
> fail.
>
>
> The log file from the task can be found at:
> http://pastebin.com/m4414d369
>
>
> and the code looks like:
> //Java
> import java.io.*;
> import java.util.*;
> import java.net.*;
>
> //Hadoop
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapred.*;
> import org.apache.hadoop.util.*;
>
> //HBase
> import org.apache.hadoop.hbase.*;
> import org.apache.hadoop.hbase.mapred.*;
> import org.apache.hadoop.hbase.io.*;
> import org.apache.hadoop.hbase.client.*;
> // org.apache.hadoop.hbase.client.HTable
>
> //Extra
> import org.apache.commons.cli.ParseException;
>
> import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
> import org.apache.commons.httpclient.*;
> import org.apache.commons.httpclient.methods.*;
> import org.apache.commons.httpclient.params.HttpMethodParams;
>
>
> public class SerpentMR1 extends TableMap implements Mapper, Tool {
>
>    //Setting DebugLevel
>    private static final int DL = 0;
>
>    //Setting up the variables for the MR job
>    private static final String NAME = "SerpentMR1";
>    private static final String INPUTTABLE = "sources";
>    private final String[] COLS = {"content:feedurl", "content:ttl",
> "content:updated"};
>
>
>    private Configuration conf;
>
>    public JobConf createSubmittableJob(String[] args) throws IOException{
>        JobConf c = new JobConf(getConf(), SerpentMR1.class);
>        String jar = "/home/hbase/SerpentMR/" +NAME+".jar";
>        c.setJar(jar);
>        c.setJobName(NAME);
>
>        int mapTasks = 4;
>        int reduceTasks = 20;
>
>        c.setNumMapTasks(mapTasks);
>        c.setNumReduceTasks(reduceTasks);
>
>        String inputCols = "";
>        for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; }
>
>        TableMap.initJob(INPUTTABLE, inputCols, this.getClass(), Text.class,
> BytesWritable.class, c);
>        //Classes between:
>
>        c.setOutputFormat(TextOutputFormat.class);
>        Path path = new Path("users"); //inserting into a temp table
>        FileOutputFormat.setOutputPath(c, path);
>
>        c.setReducerClass(MyReducer.class);
>        return c;
>    }
>
>    public void map(ImmutableBytesWritable key, RowResult res,
> OutputCollector output, Reporter reporter)
>    throws IOException {
>        Cell cellLast    = res.get(COLS[2].getBytes());//lastupdate
>
>        long oldTime = cellLast.getTimestamp();
>
>        Cell cell_ttl    = res.get(COLS[1].getBytes());//ttl
>        long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() );
>        byte[] url = null;
>
>        long currTime = time.GetTimeInMillis();
>
>        if(currTime - oldTime > ttl){
>            url = res.get(COLS[0].getBytes()).getValue();//url
>            output.collect(new Text(Base64.encode_strip(res.getRow())), new
> BytesWritable(url) );/
>        }
>    }
>
>
>
>    public static class MyReducer implements Reducer{
> //org.apache.hadoop.mapred.Reducer{
>
>
>        private int timeout = 1000; //Sets the connection timeout time ms;
>
>        public void reduce(Object key, Iterator values, OutputCollector
> output, Reporter rep)
>        throws IOException {
>            HttpClient client = new HttpClient();//new
> MultiThreadedHttpConnectionManager());
>            client.getHttpConnectionManager().
>                getParams().setConnectionTimeout(timeout);
>
>            GetMethod method = null;
>
>            int stat = 0;
>            String content = "";
>            byte[] colFam = "select".getBytes();
>            byte[] column = "lastupdate".getBytes();
>            byte[] currTime = null;
>
>            HBaseRef hbref = new HBaseRef();
>            JerlType sendjerl = null; //new JerlType();
>            ArrayList jd = new ArrayList();
>
>            InputStream is = null;
>
>            while(values.hasNext()){
>                BytesWritable bw = (BytesWritable)values.next();
>
>                String address = new String(bw.get());
>                try{
>                    System.out.println(address);
>
>                    method = new GetMethod(address);
>                    method.setFollowRedirects(true);
>
>                } catch (Exception e){
>                    System.err.println("Invalid Address");
>                    e.printStackTrace();
>                }
>
>                if (method != null){
>                    try {
>                    // Execute the method.
>                        stat = client.executeMethod(method);
>
>                        if(stat == 200){
>                            content = "";
>                            is =
> (InputStream)(method.getResponseBodyAsStream());
>
>                            //Write to HBase new stamp select:lastupdate
>                            currTime =
> StreamyUtil.LongToBytes(time.GetTimeInMillis() );
>                            jd.add(new JerlData(INPUTTABLE.getBytes(),
> ((Text)key).getBytes(), colFam, column,
>                                                currTime));
>
>                            if (is !=
> null){output.collect(((Text)key).getBytes(), is);}
>                        }
>                        else if(stat == 302){
>                            System.err.println("302 not complete in reader!!
> New url = ");
>                        }
>                        else {
>                            System.err.println("Method failed: " +
> method.getStatusLine());
>                            currTime  =
> StreamyUtil.LongToBytes(Long.MAX_VALUE);
>                            jd.add(new JerlData(INPUTTABLE.getBytes(),
> ((Text)key).getBytes(), colFam, column,
>                                                currTime));
>                            //Set select:lastupdate to Long.MAX_VALUE()
>                        }
>
>                    } catch (HttpException e) {
>                        System.err.println("Fatal protocol violation: " +
> e.getMessage());
>                        e.printStackTrace();
>                    } catch (IOException e) {
>                        System.err.println("Fatal transport error: " +
> e.getMessage());
>                        e.printStackTrace();
>                    } catch (IllegalStateException e){
> //                         System.err.println("IllegalStateException: " +
> e.getMessage());
>                        System.err.println("Unsupported protocol error: " +
> e.getMessage());
>                        e.printStackTrace();
>                    } catch (Exception e){
>                        System.err.println("Other Exception: " +
> e.getMessage());
>                        e.printStackTrace();
>                    } finally {
>                    // Release the connection.
>                        method.releaseConnection();
>                    }
>                }
>            }
>            HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new
> JerlData[0])), hbref);
>        }
>
>        public void configure(JobConf conf){}
>
>        public void close(){}
>    }
>
>    static int printUsage() {
>        System.out.println(NAME + "<input> <table_name>");
>        return -1;
>    }
>
>    public int run(@SuppressWarnings("unused") String[] args) throws
> Exception {
>        JobClient.runJob(createSubmittableJob(args));
>        return 0;
>    }
>
>    public Configuration getConf() {
>        return this.conf;
>    }
>
>    public void setConf(final Configuration c) {
>        this.conf = c;
>    }
>
>    public static void main(String[] args) throws Exception {
>        int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(),
> args);
>        System.exit(errCode);
>    }
>
> }
>
>
> Regards Erik
>



-- 

朱盛凯

Jash Zhu

复旦大学软件学院

Software School, Fudan University

Reply via email to