Sorry, I didn't see the log link. On Tue, Sep 9, 2008 at 12:01 PM, Shengkai Zhu <[EMAIL PROTECTED]> wrote:
> > Do you have some more detailed information? Logs are helpful. > > > On Mon, Sep 8, 2008 at 3:26 AM, Erik Holstad <[EMAIL PROTECTED]>wrote: > >> Hi! >> I'm trying to run a MR job, but it keeps on failing and I can't understand >> why. >> Sometimes it shows output at 66% and sometimes 98% or so. >> I had a couple of exception before that I didn't catch that made the job >> to >> fail. >> >> >> The log file from the task can be found at: >> http://pastebin.com/m4414d369 >> >> >> and the code looks like: >> //Java >> import java.io.*; >> import java.util.*; >> import java.net.*; >> >> //Hadoop >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.fs.Path; >> import org.apache.hadoop.hbase.io.ImmutableBytesWritable; >> import org.apache.hadoop.io.*; >> import org.apache.hadoop.mapred.*; >> import org.apache.hadoop.util.*; >> >> //HBase >> import org.apache.hadoop.hbase.*; >> import org.apache.hadoop.hbase.mapred.*; >> import org.apache.hadoop.hbase.io.*; >> import org.apache.hadoop.hbase.client.*; >> // org.apache.hadoop.hbase.client.HTable >> >> //Extra >> import org.apache.commons.cli.ParseException; >> >> import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; >> import org.apache.commons.httpclient.*; >> import org.apache.commons.httpclient.methods.*; >> import org.apache.commons.httpclient.params.HttpMethodParams; >> >> >> public class SerpentMR1 extends TableMap implements Mapper, Tool { >> >> //Setting DebugLevel >> private static final int DL = 0; >> >> //Setting up the variables for the MR job >> private static final String NAME = "SerpentMR1"; >> private static final String INPUTTABLE = "sources"; >> private final String[] COLS = {"content:feedurl", "content:ttl", >> "content:updated"}; >> >> >> private Configuration conf; >> >> public JobConf createSubmittableJob(String[] args) throws IOException{ >> JobConf c = new JobConf(getConf(), SerpentMR1.class); >> String jar = "/home/hbase/SerpentMR/" +NAME+".jar"; >> c.setJar(jar); >> c.setJobName(NAME); >> >> int mapTasks = 4; >> int reduceTasks = 20; >> >> c.setNumMapTasks(mapTasks); >> c.setNumReduceTasks(reduceTasks); >> >> String inputCols = ""; >> for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; } >> >> TableMap.initJob(INPUTTABLE, inputCols, this.getClass(), >> Text.class, >> BytesWritable.class, c); >> //Classes between: >> >> c.setOutputFormat(TextOutputFormat.class); >> Path path = new Path("users"); //inserting into a temp table >> FileOutputFormat.setOutputPath(c, path); >> >> c.setReducerClass(MyReducer.class); >> return c; >> } >> >> public void map(ImmutableBytesWritable key, RowResult res, >> OutputCollector output, Reporter reporter) >> throws IOException { >> Cell cellLast = res.get(COLS[2].getBytes());//lastupdate >> >> long oldTime = cellLast.getTimestamp(); >> >> Cell cell_ttl = res.get(COLS[1].getBytes());//ttl >> long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() ); >> byte[] url = null; >> >> long currTime = time.GetTimeInMillis(); >> >> if(currTime - oldTime > ttl){ >> url = res.get(COLS[0].getBytes()).getValue();//url >> output.collect(new Text(Base64.encode_strip(res.getRow())), new >> BytesWritable(url) );/ >> } >> } >> >> >> >> public static class MyReducer implements Reducer{ >> //org.apache.hadoop.mapred.Reducer{ >> >> >> private int timeout = 1000; //Sets the connection timeout time ms; >> >> public void reduce(Object key, Iterator values, OutputCollector >> output, Reporter rep) >> throws IOException { >> HttpClient client = new HttpClient();//new >> MultiThreadedHttpConnectionManager()); >> client.getHttpConnectionManager(). >> getParams().setConnectionTimeout(timeout); >> >> GetMethod method = null; >> >> int stat = 0; >> String content = ""; >> byte[] colFam = "select".getBytes(); >> byte[] column = "lastupdate".getBytes(); >> byte[] currTime = null; >> >> HBaseRef hbref = new HBaseRef(); >> JerlType sendjerl = null; //new JerlType(); >> ArrayList jd = new ArrayList(); >> >> InputStream is = null; >> >> while(values.hasNext()){ >> BytesWritable bw = (BytesWritable)values.next(); >> >> String address = new String(bw.get()); >> try{ >> System.out.println(address); >> >> method = new GetMethod(address); >> method.setFollowRedirects(true); >> >> } catch (Exception e){ >> System.err.println("Invalid Address"); >> e.printStackTrace(); >> } >> >> if (method != null){ >> try { >> // Execute the method. >> stat = client.executeMethod(method); >> >> if(stat == 200){ >> content = ""; >> is = >> (InputStream)(method.getResponseBodyAsStream()); >> >> //Write to HBase new stamp select:lastupdate >> currTime = >> StreamyUtil.LongToBytes(time.GetTimeInMillis() ); >> jd.add(new JerlData(INPUTTABLE.getBytes(), >> ((Text)key).getBytes(), colFam, column, >> currTime)); >> >> if (is != >> null){output.collect(((Text)key).getBytes(), is);} >> } >> else if(stat == 302){ >> System.err.println("302 not complete in >> reader!! >> New url = "); >> } >> else { >> System.err.println("Method failed: " + >> method.getStatusLine()); >> currTime = >> StreamyUtil.LongToBytes(Long.MAX_VALUE); >> jd.add(new JerlData(INPUTTABLE.getBytes(), >> ((Text)key).getBytes(), colFam, column, >> currTime)); >> //Set select:lastupdate to Long.MAX_VALUE() >> } >> >> } catch (HttpException e) { >> System.err.println("Fatal protocol violation: " + >> e.getMessage()); >> e.printStackTrace(); >> } catch (IOException e) { >> System.err.println("Fatal transport error: " + >> e.getMessage()); >> e.printStackTrace(); >> } catch (IllegalStateException e){ >> // System.err.println("IllegalStateException: " + >> e.getMessage()); >> System.err.println("Unsupported protocol error: " + >> e.getMessage()); >> e.printStackTrace(); >> } catch (Exception e){ >> System.err.println("Other Exception: " + >> e.getMessage()); >> e.printStackTrace(); >> } finally { >> // Release the connection. >> method.releaseConnection(); >> } >> } >> } >> HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new >> JerlData[0])), hbref); >> } >> >> public void configure(JobConf conf){} >> >> public void close(){} >> } >> >> static int printUsage() { >> System.out.println(NAME + "<input> <table_name>"); >> return -1; >> } >> >> public int run(@SuppressWarnings("unused") String[] args) throws >> Exception { >> JobClient.runJob(createSubmittableJob(args)); >> return 0; >> } >> >> public Configuration getConf() { >> return this.conf; >> } >> >> public void setConf(final Configuration c) { >> this.conf = c; >> } >> >> public static void main(String[] args) throws Exception { >> int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(), >> args); >> System.exit(errCode); >> } >> >> } >> >> >> Regards Erik >> > > > > -- > > 朱盛凯 > > Jash Zhu > > 复旦大学软件学院 > > Software School, Fudan University > -- 朱盛凯 Jash Zhu 复旦大学软件学院 Software School, Fudan University