[ https://issues.apache.org/jira/browse/NUTCH-2543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415731#comment-16415731 ]
ASF GitHub Bot commented on NUTCH-2543: --------------------------------------- sebastian-nagel closed pull request #303: fix for NUTCH-2543 contributed by Jurian Broertjes URL: https://github.com/apache/nutch/pull/303 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java index 9be246a58..ee1b4ba97 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java +++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java @@ -70,12 +70,13 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.util.AbstractChecker; import org.apache.nutch.util.JexlUtil; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.SegmentReaderUtil; import org.apache.nutch.util.StringUtil; import org.apache.nutch.util.TimingUtil; -import org.apache.nutch.util.SegmentReaderUtil; import org.apache.commons.jexl2.Expression; /** @@ -84,13 +85,15 @@ * @author Andrzej Bialecki * */ -public class CrawlDbReader extends Configured implements Closeable, Tool { +public class CrawlDbReader extends AbstractChecker implements Closeable { private static final Logger LOG = LoggerFactory .getLogger(MethodHandles.lookup().lookupClass()); private MapFile.Reader[] readers = null; + protected String crawlDb; + private void openReaders(String crawlDb, Configuration config) throws IOException { if (readers != null) @@ -110,6 +113,7 @@ private void closeReaders() { } } + readers = null; } public static class CrawlDatumCsvOutputFormat extends @@ -593,15 +597,25 @@ public CrawlDatum get(String crawlDb, String url, Configuration config) return res; } - public void readUrl(String crawlDb, String url, Configuration config) + protected int process(String line, StringBuilder output) throws Exception { + Job job = NutchJob.getInstance(getConf()); + Configuration config = job.getConfiguration(); + // Close readers, so we know we're not working on stale data + closeReaders(); + readUrl(this.crawlDb, line, config, output); + return 0; + } + + public void readUrl(String crawlDb, String url, Configuration config, StringBuilder output) throws IOException { CrawlDatum res = get(crawlDb, url, config); - System.out.println("URL: " + url); + output.append("URL: " + url + "\n"); if (res != null) { - System.out.println(res); + output.append(res); } else { - System.out.println("not found"); + output.append("not found"); } + output.append("\n"); } public void processDumpJob(String crawlDb, String output, @@ -792,7 +806,8 @@ public void processTopNJob(String crawlDb, long topN, float min, } - public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException { + + public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, Exception { @SuppressWarnings("resource") CrawlDbReader dbr = new CrawlDbReader(); @@ -827,8 +842,11 @@ public int run(String[] args) throws IOException, InterruptedException, ClassNot } String param = null; String crawlDb = args[0]; + this.crawlDb = crawlDb; + int numConsumed = 0; Job job = NutchJob.getInstance(getConf()); Configuration config = job.getConfiguration(); + for (int i = 1; i < args.length; i++) { if (args[i].equals("-stats")) { boolean toSort = false; @@ -874,7 +892,9 @@ public int run(String[] args) throws IOException, InterruptedException, ClassNot dbr.processDumpJob(crawlDb, param, config, format, regex, status, retry, expr, sample); } else if (args[i].equals("-url")) { param = args[++i]; - dbr.readUrl(crawlDb, param, config); + StringBuilder output = new StringBuilder(); + dbr.readUrl(crawlDb, param, config, output); + System.out.print(output); } else if (args[i].equals("-topN")) { param = args[++i]; long topN = Long.parseLong(param); @@ -884,11 +904,18 @@ public int run(String[] args) throws IOException, InterruptedException, ClassNot min = Float.parseFloat(args[++i]); } dbr.processTopNJob(crawlDb, topN, min, param, config); + } else if ((numConsumed = super.parseArgs(args, i)) > 0) { + i += numConsumed - 1; } else { System.err.println("\nError: wrong argument " + args[i]); return -1; } } + + if (numConsumed > 0) { + // Start listening + return super.run(); + } return 0; } diff --git a/src/java/org/apache/nutch/crawl/LinkDbReader.java b/src/java/org/apache/nutch/crawl/LinkDbReader.java index 717973a16..f5daf4ddf 100644 --- a/src/java/org/apache/nutch/crawl/LinkDbReader.java +++ b/src/java/org/apache/nutch/crawl/LinkDbReader.java @@ -43,6 +43,7 @@ import org.apache.hadoop.util.*; import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.util.AbstractChecker; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.TimingUtil; @@ -52,7 +53,7 @@ import java.io.Closeable; /** . */ -public class LinkDbReader extends Configured implements Tool, Closeable { +public class LinkDbReader extends AbstractChecker implements Closeable { private static final Logger LOG = LoggerFactory .getLogger(MethodHandles.lookup().lookupClass()); @@ -171,6 +172,21 @@ public void processDumpJob(String linkdb, String output, String regex) + TimingUtil.elapsedTime(start, end)); } + protected int process(String line, StringBuilder output) throws Exception { + + Inlinks links = getInlinks(new Text(line)); + if (links == null) { + output.append(" - no link information."); + } else { + Iterator<Inlink> it = links.iterator(); + while (it.hasNext()) { + output.append(it.next().toString()); + } + } + output.append("\n"); + return 0; + } + public static void main(String[] args) throws Exception { int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbReader(), args); @@ -189,35 +205,40 @@ public int run(String[] args) throws Exception { .println("\t-url <url>\tprint information about <url> to System.out"); return -1; } + + int numConsumed = 0; + try { - if (args[1].equals("-dump")) { - String regex = null; - for (int i = 2; i < args.length; i++) { - if (args[i].equals("-regex")) { - regex = args[++i]; + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-dump")) { + String regex = null; + for (int j = i+1; j < args.length; j++) { + if (args[i].equals("-regex")) { + regex = args[++j]; + } } - } - processDumpJob(args[0], args[2], regex); - return 0; - } else if (args[1].equals("-url")) { - init(new Path(args[0])); - Inlinks links = getInlinks(new Text(args[2])); - if (links == null) { - System.out.println(" - no link information."); + processDumpJob(args[0], args[i+1], regex); + return 0; + } else if (args[i].equals("-url")) { + init(new Path(args[0])); + return processSingle(args[++i]); + } else if ((numConsumed = super.parseArgs(args, i)) > 0) { + init(new Path(args[0])); + i += numConsumed - 1; } else { - Iterator<Inlink> it = links.iterator(); - while (it.hasNext()) { - System.out.println(it.next().toString()); - } + System.err.println("Error: wrong argument " + args[1]); + return -1; } - return 0; - } else { - System.err.println("Error: wrong argument " + args[1]); - return -1; } } catch (Exception e) { LOG.error("LinkDbReader: " + StringUtils.stringifyException(e)); return -1; } + + if (numConsumed > 0) { + // Start listening + return super.run(); + } + return 0; } } diff --git a/src/java/org/apache/nutch/util/AbstractChecker.java b/src/java/org/apache/nutch/util/AbstractChecker.java index 4ff52df92..84877d7e6 100644 --- a/src/java/org/apache/nutch/util/AbstractChecker.java +++ b/src/java/org/apache/nutch/util/AbstractChecker.java @@ -19,7 +19,8 @@ import java.io.BufferedReader; import java.io.InputStreamReader; -import java.io.PrintWriter; +import java.io.OutputStream; +import java.io.IOException; import java.lang.invoke.MethodHandles; import java.net.ServerSocket; import java.net.Socket; @@ -104,7 +105,7 @@ protected void processTCP(int tcpPort) throws Exception { server.bind(new InetSocketAddress(tcpPort)); LOG.info(server.toString()); } catch (Exception e) { - LOG.error("Could not listen on port " + tcpPort); + LOG.error("Could not listen on port " + tcpPort, e); System.exit(-1); } @@ -115,7 +116,7 @@ protected void processTCP(int tcpPort) throws Exception { Thread thread = new Thread(worker); thread.start(); } catch (Exception e) { - LOG.error("Accept failed: " + tcpPort); + LOG.error("Accept failed: " + tcpPort, e); System.exit(-1); } } @@ -130,44 +131,53 @@ protected void processTCP(int tcpPort) throws Exception { } public void run() { + // Setup streams + BufferedReader in = null; + OutputStream out = null; + try { + in = new BufferedReader(new InputStreamReader(client.getInputStream())); + out = client.getOutputStream(); + } catch (IOException e) { + LOG.error("Failed initializing streams: ", e); + return; + } + + // Listen for work if (keepClientCnxOpen) { try { - while (readWrite()) {} // keep connection open until it closes + while (readWrite(in, out)) {} // keep connection open until it closes } catch(Exception e) { LOG.error("Read/Write failed: ", e); } } else { try { - readWrite(); + readWrite(in, out); } catch(Exception e) { LOG.error("Read/Write failed: ", e); } - - try { // close ourselves - client.close(); - } catch (Exception e){ - LOG.error(e.toString()); - } + } + + try { // close ourselves + client.close(); + } catch (Exception e){ + LOG.error(e.toString()); } } - protected boolean readWrite() throws Exception { - String line; - BufferedReader in = null; - PrintWriter out = null; - - in = new BufferedReader(new InputStreamReader(client.getInputStream())); + protected boolean readWrite(BufferedReader in, OutputStream out) throws Exception { + String line = in.readLine(); - line = in.readLine(); if (line == null) { // End of stream return false; } if (line.trim().length() > 1) { + // The actual work StringBuilder output = new StringBuilder(); process(line, output); - client.getOutputStream().write(output.toString().getBytes(StandardCharsets.UTF_8)); + output.append("\n"); + out.write(output.toString().getBytes(StandardCharsets.UTF_8)); } return true; } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > readdb & readlinkdb to implement AbstractChecker > ------------------------------------------------ > > Key: NUTCH-2543 > URL: https://issues.apache.org/jira/browse/NUTCH-2543 > Project: Nutch > Issue Type: Improvement > Components: crawldb, linkdb > Reporter: Jurian Broertjes > Priority: Minor > Labels: patch > > Implement AbstractChecker in LinkDbReader & CrawlDbReader classes, so we can > expose them via TCP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)