This is my error when I try to Inject urls to HBase database (as inject urls
to crawlDb on Nutch)
I use Doğacan
Güney<https://issues.apache.org/jira/secure/ViewProfile.jspa?name=dogacan>'s
code to implement the class "InjectorHbase", in the class
InjectorHbaseMapper this is Map funtion
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
System.out.println("Vao map");
if (table == null) {
System.out.println("Table == null");
throw new IOException("Can not connect to hbase table");
}
String url = value.toString();
String reversedUrl;
try {
url = urlNormalizers
.normalize(url, URLNormalizers.SCOPE_INJECT);
url = filters.filter(url);
if (url == null) {
return;
}
reversedUrl = TableUtil.reverseUrl(url);
} catch (Exception e) {
LOG.warn("Skipping " + url + ":" + e);
return;
}
BatchUpdate bu = new BatchUpdate(reversedUrl);
bu.put(META_INJECT_KEY, TableUtil.YES_VAL);
table.commit(bu);
}
so when run the program, there is error as
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
org.apache.nutch.crawl.InjectorHbase$InjectorHbaseMapper
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:720)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:744)
I don't know the reason because I have class InjectorHbaseMapper there,
another way, I can't debug on Map/Reduce function by println in Map/Reduce
function although I configured
<property>
<name>mapred.job.tracker</name>
<value>local</value>
</property>
on hadoop-site.xml.
--
Nguyễn Thị Ngọc Hương
// ban dung patch hbase-integration_v1.patch
package org.apache.nutch.crawl;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.util.LogUtil;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutchbase.util.hbase.ImmutableRowPart;
import org.apache.nutchbase.util.hbase.RowPart;
import org.apache.nutchbase.util.hbase.TableColumns;
import org.apache.nutchbase.util.hbase.TableMapReduce;
import org.apache.nutchbase.util.hbase.TableUtil;
import org.apache.hadoop.mapred.IsolationRunner;
public class InjectorHbase extends
TableMapReduce<ImmutableBytesWritable, BooleanWritable> implements Tool {
public static final Log LOG = LogFactory.getLog(InjectorHbase.class);
private static final String INJECT_KEY_STR = "__tmp_inject_key__";
private static final String META_INJECT_KEY_STR = TableColumns.METADATA_STR
+ INJECT_KEY_STR;
private static final byte[] META_INJECT_KEY = Bytes
.toBytes(META_INJECT_KEY_STR);
private static final Set<String> COLUMNS = new HashSet<String>();
static {
COLUMNS.add(META_INJECT_KEY_STR);
COLUMNS.add(TableColumns.STATUS_STR);
}
private int interval;
private float scoreInjected;
private long curTime;
private ImmutableRowPart row = new ImmutableRowPart();
public class InjectorHbaseMapper implements
Mapper<LongWritable, Text, Text, Text> {
private URLNormalizers urlNormalizers;
private URLFilters filters;
private HTable table;
private HBaseConfiguration hbaseConf;
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
System.out.println("Vao map");
if (table == null) {
System.out.println("Table == null");
throw new IOException("Can not connect to hbase table");
}
String url = value.toString();
String reversedUrl;
try {
url = urlNormalizers
.normalize(url, URLNormalizers.SCOPE_INJECT);
url = filters.filter(url);
if (url == null) {
return;
}
reversedUrl = TableUtil.reverseUrl(url);
} catch (Exception e) {
LOG.warn("Skipping " + url + ":" + e);
return;
}
BatchUpdate bu = new BatchUpdate(reversedUrl);
bu.put(META_INJECT_KEY, TableUtil.YES_VAL);
table.commit(bu);
}
public void configure(JobConf job) {
System.out
.println("Vao configrue cua job---------------------------");
urlNormalizers = new URLNormalizers(job,
URLNormalizers.SCOPE_INJECT);
filters = new URLFilters(job);
hbaseConf = new HBaseConfiguration();
try {
table = new HTable(hbaseConf, job.get("input.table"));
} catch (IOException e) {
e.printStackTrace(LogUtil.getFatalStream(LOG));
}
if (table == null) {
System.out.println("Table==null");
}
}
public void close() throws IOException {
}
}
@Override
public void map(ImmutableBytesWritable key, RowResult rowResult,
OutputCollector<ImmutableBytesWritable, BooleanWritable> output,
Reporter reporter) throws IOException {
System.out.println("Vao map");
row = new ImmutableRowPart(rowResult);
if (!row.hasMeta(INJECT_KEY_STR)) {
return;
}
output.collect(key, new BooleanWritable(row
.hasColumn(TableColumns.STATUS)));
}
public void configure(JobConf job) {
System.out.println("Vao configure -----------------------------------");
interval = job.getInt("db.fetch.interval.default", 2592000);
scoreInjected = job.getFloat("db.score.injected", 1.0f);
curTime = job.getLong("injector.current.time", System
.currentTimeMillis());
}
@Override
public void reduce(ImmutableBytesWritable key,
Iterator<BooleanWritable> values,
OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
Reporter reporter) throws IOException {
System.out.println("Vao reduce");
boolean isOld = values.next().get();
RowPart row = new RowPart();
row.deleteMeta(INJECT_KEY_STR);
if (!isOld) {
row.setStatus(CrawlDatumHbase.STATUS_UNFETCHED);
row.setFetchTime(curTime);
row.setFetchInterval(interval);
row.setScore(scoreInjected);
row.setRetriesSinceFetch(0);
}
output.collect(key, row.makeBatchUpdate(key.get()));
}
public void inject(String table, Path urlDir) throws IOException {
Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
+ "/inject-temp-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// System.out.println("Ten cua tempDir la" + tempDir.toString());
// System.out.println("Duong dan urlDirs la " + urlDir.toString());
// System.out.println("Vao inject " + urlDir.toString());
LOG.info("InjectorHbase: starting");
LOG.info("InjectorHbase: urlDir: " + urlDir);
JobConf job = new NutchJob(getConf());
job.setJobName("inject-hbase-p1 " + urlDir);
FileInputFormat.addInputPath(job, urlDir);
FileOutputFormat.setOutputPath(job, tempDir);
//System.out.println(table);
job.setMapperClass(InjectorHbaseMapper.class);
job.setOutputFormat(NullOutputFormat.class);
job.setLong("injector.current.time", System.currentTimeMillis());
job.set("input.table", table);
// job.setJarByClass(InjectorHbase3.class);
JobClient.runJob(job);
job = new NutchJob(getConf());
job.setJobName("inject-hbase-p2 " + urlDir);
TableMapReduce.initJob(table, TableUtil.getColumns(COLUMNS),
InjectorHbase.class, ImmutableBytesWritable.class,
BooleanWritable.class, job);
JobClient.runJob(job);
LOG.info("InjectorHbase: done");
}
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: InjectorHbase <webtable> <url_dir>");
return -1;
}
try {
inject(args[0], new Path(args[1]));
return 0;
} catch (Exception e) {
LOG.fatal("InjectorHbase: " + StringUtils.stringifyException(e));
return -1;
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(),
new InjectorHbase(), args);
//System.out.println("Error");
System.exit(res);
}
}