This is an automated email from the ASF dual-hosted git repository.
snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push:
new 0347527 NUTCH-2279 LinkRank fails when using Hadoop MR output
compression - read output directory of link counter job to determine output
file name (fail if there is none or more than one file) - determine output
codec and use it to read the output
new 087aea6 Merge pull request #478 from
sebastian-nagel/NUTCH-2279-linkrank-output-compression
0347527 is described below
commit 03475276204cb0a31f1f5f0b6a547d3c92c6a799
Author: Sebastian Nagel <[email protected]>
AuthorDate: Mon Sep 30 17:49:39 2019 +0200
NUTCH-2279 LinkRank fails when using Hadoop MR output compression
- read output directory of link counter job to determine output
file name (fail if there is none or more than one file)
- determine output codec and use it to read the output
---
.../apache/nutch/scoring/webgraph/LinkRank.java | 38 ++++++++++++++++++++--
1 file changed, 35 insertions(+), 3 deletions(-)
diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
index 6829927..b6bfa98 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
@@ -18,6 +18,7 @@ package org.apache.nutch.scoring.webgraph;
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.invoke.MethodHandles;
import java.text.SimpleDateFormat;
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -46,6 +48,8 @@ import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -128,14 +132,42 @@ public class LinkRank extends Configured implements Tool {
// read the first (and only) line from the file which should be the
// number of links in the web graph
- LOG.info("Reading numlinks temp file");
- FSDataInputStream readLinks = fs.open(new Path(numLinksPath,
"part-r-00000"));
- BufferedReader buffer = new BufferedReader(new
InputStreamReader(readLinks));
+ FileStatus[] numLinksFiles = fs.listStatus(numLinksPath);
+ if (numLinksFiles.length == 0) {
+ throw new IOException("Failed to read numlinks temp file: "
+ + " no file found in " + numLinksPath);
+ } else if (numLinksFiles.length > 1) {
+ throw new IOException("Failed to read numlinks temp file: "
+ + " expected only one file but found " + numLinksFiles.length
+ + " files in folder " + numLinksPath);
+ }
+ Path numLinksFile = numLinksFiles[0].getPath();
+ LOG.info("Reading numlinks temp file {}", numLinksFile);
+ FSDataInputStream readLinks = fs.open(numLinksFile);
+ CompressionCodecFactory cf = new CompressionCodecFactory(conf);
+ CompressionCodec codec = cf.getCodec(numLinksFiles[0].getPath());
+ InputStream streamLinks;
+ if (codec == null) {
+ LOG.debug("No compression codec found for {}, trying uncompressed",
+ numLinksFile);
+ streamLinks = readLinks;
+ } else {
+ LOG.info("Compression codec of numlinks temp file: {}",
+ codec.getDefaultExtension());
+ readLinks.seek(0);
+ streamLinks = codec.createInputStream(readLinks);
+ }
+ BufferedReader buffer = new BufferedReader(
+ new InputStreamReader(streamLinks));
+
String numLinksLine = buffer.readLine();
readLinks.close();
// check if there are links to process, if none, webgraph might be empty
if (numLinksLine == null || numLinksLine.length() == 0) {
+ LOG.error(
+ "Failed to determine number of links because of empty line in input
{}",
+ numLinksFile);
fs.delete(numLinksPath, true);
throw new IOException("No links to process, is the webgraph empty?");
}