Do you have some more detailed information? Logs are helpful. On Mon, Sep 8, 2008 at 3:26 AM, Erik Holstad <[EMAIL PROTECTED]> wrote:
> Hi! > I'm trying to run a MR job, but it keeps on failing and I can't understand > why. > Sometimes it shows output at 66% and sometimes 98% or so. > I had a couple of exception before that I didn't catch that made the job to > fail. > > > The log file from the task can be found at: > http://pastebin.com/m4414d369 > > > and the code looks like: > //Java > import java.io.*; > import java.util.*; > import java.net.*; > > //Hadoop > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > import org.apache.hadoop.io.*; > import org.apache.hadoop.mapred.*; > import org.apache.hadoop.util.*; > > //HBase > import org.apache.hadoop.hbase.*; > import org.apache.hadoop.hbase.mapred.*; > import org.apache.hadoop.hbase.io.*; > import org.apache.hadoop.hbase.client.*; > // org.apache.hadoop.hbase.client.HTable > > //Extra > import org.apache.commons.cli.ParseException; > > import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; > import org.apache.commons.httpclient.*; > import org.apache.commons.httpclient.methods.*; > import org.apache.commons.httpclient.params.HttpMethodParams; > > > public class SerpentMR1 extends TableMap implements Mapper, Tool { > > //Setting DebugLevel > private static final int DL = 0; > > //Setting up the variables for the MR job > private static final String NAME = "SerpentMR1"; > private static final String INPUTTABLE = "sources"; > private final String[] COLS = {"content:feedurl", "content:ttl", > "content:updated"}; > > > private Configuration conf; > > public JobConf createSubmittableJob(String[] args) throws IOException{ > JobConf c = new JobConf(getConf(), SerpentMR1.class); > String jar = "/home/hbase/SerpentMR/" +NAME+".jar"; > c.setJar(jar); > c.setJobName(NAME); > > int mapTasks = 4; > int reduceTasks = 20; > > c.setNumMapTasks(mapTasks); > c.setNumReduceTasks(reduceTasks); > > String inputCols = ""; > for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; } > > TableMap.initJob(INPUTTABLE, inputCols, this.getClass(), Text.class, > BytesWritable.class, c); > //Classes between: > > c.setOutputFormat(TextOutputFormat.class); > Path path = new Path("users"); //inserting into a temp table > FileOutputFormat.setOutputPath(c, path); > > c.setReducerClass(MyReducer.class); > return c; > } > > public void map(ImmutableBytesWritable key, RowResult res, > OutputCollector output, Reporter reporter) > throws IOException { > Cell cellLast = res.get(COLS[2].getBytes());//lastupdate > > long oldTime = cellLast.getTimestamp(); > > Cell cell_ttl = res.get(COLS[1].getBytes());//ttl > long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() ); > byte[] url = null; > > long currTime = time.GetTimeInMillis(); > > if(currTime - oldTime > ttl){ > url = res.get(COLS[0].getBytes()).getValue();//url > output.collect(new Text(Base64.encode_strip(res.getRow())), new > BytesWritable(url) );/ > } > } > > > > public static class MyReducer implements Reducer{ > //org.apache.hadoop.mapred.Reducer{ > > > private int timeout = 1000; //Sets the connection timeout time ms; > > public void reduce(Object key, Iterator values, OutputCollector > output, Reporter rep) > throws IOException { > HttpClient client = new HttpClient();//new > MultiThreadedHttpConnectionManager()); > client.getHttpConnectionManager(). > getParams().setConnectionTimeout(timeout); > > GetMethod method = null; > > int stat = 0; > String content = ""; > byte[] colFam = "select".getBytes(); > byte[] column = "lastupdate".getBytes(); > byte[] currTime = null; > > HBaseRef hbref = new HBaseRef(); > JerlType sendjerl = null; //new JerlType(); > ArrayList jd = new ArrayList(); > > InputStream is = null; > > while(values.hasNext()){ > BytesWritable bw = (BytesWritable)values.next(); > > String address = new String(bw.get()); > try{ > System.out.println(address); > > method = new GetMethod(address); > method.setFollowRedirects(true); > > } catch (Exception e){ > System.err.println("Invalid Address"); > e.printStackTrace(); > } > > if (method != null){ > try { > // Execute the method. > stat = client.executeMethod(method); > > if(stat == 200){ > content = ""; > is = > (InputStream)(method.getResponseBodyAsStream()); > > //Write to HBase new stamp select:lastupdate > currTime = > StreamyUtil.LongToBytes(time.GetTimeInMillis() ); > jd.add(new JerlData(INPUTTABLE.getBytes(), > ((Text)key).getBytes(), colFam, column, > currTime)); > > if (is != > null){output.collect(((Text)key).getBytes(), is);} > } > else if(stat == 302){ > System.err.println("302 not complete in reader!! > New url = "); > } > else { > System.err.println("Method failed: " + > method.getStatusLine()); > currTime = > StreamyUtil.LongToBytes(Long.MAX_VALUE); > jd.add(new JerlData(INPUTTABLE.getBytes(), > ((Text)key).getBytes(), colFam, column, > currTime)); > //Set select:lastupdate to Long.MAX_VALUE() > } > > } catch (HttpException e) { > System.err.println("Fatal protocol violation: " + > e.getMessage()); > e.printStackTrace(); > } catch (IOException e) { > System.err.println("Fatal transport error: " + > e.getMessage()); > e.printStackTrace(); > } catch (IllegalStateException e){ > // System.err.println("IllegalStateException: " + > e.getMessage()); > System.err.println("Unsupported protocol error: " + > e.getMessage()); > e.printStackTrace(); > } catch (Exception e){ > System.err.println("Other Exception: " + > e.getMessage()); > e.printStackTrace(); > } finally { > // Release the connection. > method.releaseConnection(); > } > } > } > HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new > JerlData[0])), hbref); > } > > public void configure(JobConf conf){} > > public void close(){} > } > > static int printUsage() { > System.out.println(NAME + "<input> <table_name>"); > return -1; > } > > public int run(@SuppressWarnings("unused") String[] args) throws > Exception { > JobClient.runJob(createSubmittableJob(args)); > return 0; > } > > public Configuration getConf() { > return this.conf; > } > > public void setConf(final Configuration c) { > this.conf = c; > } > > public static void main(String[] args) throws Exception { > int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(), > args); > System.exit(errCode); > } > > } > > > Regards Erik > -- 朱盛凯 Jash Zhu 复旦大学软件学院 Software School, Fudan University