Author: jnioche Date: Tue Sep 22 14:04:10 2015 New Revision: 1704634 URL: http://svn.apache.org/viewvc?rev=1704634&view=rev Log: NUTCH-2102 WARC Exporter
Added: nutch/trunk/src/java/org/apache/nutch/tools/warc/ nutch/trunk/src/java/org/apache/nutch/tools/warc/WARCExporter.java nutch/trunk/src/java/org/apache/nutch/tools/warc/package-info.java Modified: nutch/trunk/CHANGES.txt nutch/trunk/ivy/ivy.xml nutch/trunk/src/bin/nutch Modified: nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1704634&r1=1704633&r2=1704634&view=diff ============================================================================== --- nutch/trunk/CHANGES.txt (original) +++ nutch/trunk/CHANGES.txt Tue Sep 22 14:04:10 2015 @@ -2,6 +2,8 @@ Nutch Change Log Nutch Current Development 1.11-SNAPSHOT +* NUTCH-2102 WARC Exporter (jnioche) + * NUTCH-2106 Runtime to contain Selenium and dependencies only once (snagel) * NUTCH-2104 Add documentation to the protocol-selenium plugin Readme file Modified: nutch/trunk/ivy/ivy.xml URL: http://svn.apache.org/viewvc/nutch/trunk/ivy/ivy.xml?rev=1704634&r1=1704633&r2=1704634&view=diff ============================================================================== --- nutch/trunk/ivy/ivy.xml (original) +++ nutch/trunk/ivy/ivy.xml Tue Sep 22 14:04:10 2015 @@ -70,6 +70,8 @@ <dependency org="com.google.guava" name="guava" rev="16.0.1" /> <dependency org="com.github.crawler-commons" name="crawler-commons" rev="0.6" /> + + <dependency org="com.martinkl.warc" name="warc-hadoop" rev="0.1.0" /> <dependency org="org.apache.cxf" name="cxf" rev="3.0.4" conf="*->default"/> <dependency org="org.apache.cxf" name="cxf-rt-frontend-jaxws" rev="3.0.4" conf="*->default"/> Modified: nutch/trunk/src/bin/nutch URL: http://svn.apache.org/viewvc/nutch/trunk/src/bin/nutch?rev=1704634&r1=1704633&r2=1704634&view=diff ============================================================================== --- nutch/trunk/src/bin/nutch (original) +++ nutch/trunk/src/bin/nutch Tue Sep 22 14:04:10 2015 @@ -87,6 +87,7 @@ if [ $# = 0 ]; then echo " plugin load a plugin and run one of its classes main()" echo " junit runs the given JUnit test" echo " startserver runs the Nutch Server on localhost:8081" + echo " warc exports crawled data from segments at the WARC format" echo " or" echo " CLASSNAME run the class named CLASSNAME" echo "Most commands print help when invoked w/o parameters." @@ -278,6 +279,8 @@ elif [ "$COMMAND" = "junit" ] ; then CLASS=org.junit.runner.JUnitCore elif [ "$COMMAND" = "startserver" ] ; then CLASS=org.apache.nutch.service.NutchServer +elif [ "$COMMAND" = "warc" ] ; then + CLASS=org.apache.nutch.tools.warc.WARCExporter else CLASS=$COMMAND fi Added: nutch/trunk/src/java/org/apache/nutch/tools/warc/WARCExporter.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/warc/WARCExporter.java?rev=1704634&view=auto ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/tools/warc/WARCExporter.java (added) +++ nutch/trunk/src/java/org/apache/nutch/tools/warc/WARCExporter.java Tue Sep 22 14:04:10 2015 @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools.warc; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.URI; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.UUID; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.parse.ParseSegment; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.martinkl.warc.WARCRecord; +import com.martinkl.warc.WARCWritable; +import com.martinkl.warc.mapred.WARCOutputFormat; + +/** + * MapReduce job to exports Nutch segments as WARC files. The file format is + * documented in the [ISO + * Standard](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf). + * Generates elements of type response if the configuration 'store.http.headers' + * was set to true during the fetching and the http headers were stored + * verbatim; generates elements of type 'resource' otherwise. + **/ + +public class WARCExporter extends Configured implements Tool { + + public static Logger LOG = LoggerFactory.getLogger(WARCExporter.class); + + private static final String CRLF = "\r\n"; + private static final byte[] CRLF_BYTES = { 13, 10 }; + + public WARCExporter() { + super(null); + } + + public WARCExporter(Configuration conf) { + super(conf); + } + + public static class WARCReducer + implements Mapper<Text, Writable, Text, NutchWritable>, + Reducer<Text, NutchWritable, NullWritable, WARCWritable> { + + SimpleDateFormat warcdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", + Locale.ENGLISH); + + @Override + public void configure(JobConf job) { + } + + @Override + public void close() throws IOException { + } + + public void map(Text key, Writable value, + OutputCollector<Text, NutchWritable> output, Reporter reporter) + throws IOException { + output.collect(key, new NutchWritable(value)); + } + + @Override + public void reduce(Text key, Iterator<NutchWritable> values, + OutputCollector<NullWritable, WARCWritable> output, Reporter reporter) + throws IOException { + + Content content = null; + CrawlDatum cd = null; + + // aggregate the values found + while (values.hasNext()) { + final Writable value = values.next().get(); // unwrap + if (value instanceof Content) { + content = (Content) value; + continue; + } + if (value instanceof CrawlDatum) { + cd = (CrawlDatum) value; + continue; + } + } + + // check that we have everything we need + if (content == null) { + LOG.info("Missing content for {}", key); + reporter.getCounter("WARCExporter", "missing content").increment(1); + return; + } + + if (cd == null) { + LOG.info("Missing fetch datum for {}", key); + reporter.getCounter("WARCExporter", "missing metadata").increment(1); + return; + } + + // were the headers stored as is? Can write a response element then + String headersVerbatim = content.getMetadata().get("_response.headers_"); + byte[] httpheaders = new byte[0]; + if (StringUtils.isNotBlank(headersVerbatim)) { + // check that ends with an empty line + if (!headersVerbatim.endsWith(CRLF + CRLF)) { + headersVerbatim += CRLF + CRLF; + } + httpheaders = headersVerbatim.getBytes(); + } + + StringBuilder buffer = new StringBuilder(); + buffer.append(WARCRecord.WARC_VERSION); + buffer.append(CRLF); + + buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:") + .append(UUID.randomUUID().toString()).append(">").append(CRLF); + + int contentLength = 0; + if (content != null) { + contentLength = content.getContent().length; + } + + // add the length of the http header + contentLength += httpheaders.length; + + buffer.append("Content-Length").append(": ") + .append(Integer.toString(contentLength)).append(CRLF); + + Date fetchedDate = new Date(cd.getFetchTime()); + buffer.append("WARC-Date").append(": ").append(warcdf.format(fetchedDate)) + .append(CRLF); + + // check if http headers have been stored verbatim + // if not generate a response instead + String WARCTypeValue = "resource"; + + if (StringUtils.isNotBlank(headersVerbatim)) { + WARCTypeValue = "response"; + } + + buffer.append("WARC-Type").append(": ").append(WARCTypeValue) + .append(CRLF); + + // "WARC-IP-Address" if present + String IP = content.getMetadata().get("_ip_"); + if (StringUtils.isNotBlank(IP)) { + buffer.append("WARC-IP-Address").append(": ").append("IP").append(CRLF); + } + + // detect if truncated only for fetch success + String status = CrawlDatum.getStatusName(cd.getStatus()); + if (status.equalsIgnoreCase("STATUS_FETCH_SUCCESS") + && ParseSegment.isTruncated(content)) { + buffer.append("WARC-Truncated").append(": ").append("unspecified") + .append(CRLF); + } + + // must be a valid URI + try { + String normalised = key.toString().replaceAll(" ", "%20"); + URI uri = URI.create(normalised); + buffer.append("WARC-Target-URI").append(": ") + .append(uri.toASCIIString()).append(CRLF); + } catch (Exception e) { + LOG.error("Invalid URI {} ", key); + reporter.getCounter("WARCExporter", "invalid URI").increment(1); + return; + } + + // provide a ContentType if type response + if (WARCTypeValue.equals("response")) { + buffer.append("Content-Type: application/http; msgtype=response") + .append(CRLF); + } + + // finished writing the WARC headers, now let's serialize it + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + // store the headers + bos.write(buffer.toString().getBytes("UTF-8")); + bos.write(CRLF_BYTES); + // the http headers + bos.write(httpheaders); + + // the binary content itself + if (content.getContent() != null) { + bos.write(content.getContent()); + } + bos.write(CRLF_BYTES); + bos.write(CRLF_BYTES); + + try { + DataInput in = new DataInputStream( + new ByteArrayInputStream(bos.toByteArray())); + WARCRecord record = new WARCRecord(in); + output.collect(NullWritable.get(), new WARCWritable(record)); + reporter.getCounter("WARCExporter", "records generated").increment(1); + } catch (IOException exception) { + LOG.error("Exception when generating WARC record for {} : {}", key, + exception.getMessage(), exception); + reporter.getCounter("WARCExporter", "exception").increment(1); + } + + } + } + + public int generateWARC(String output, List<Path> segments) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("WARCExporter: starting at {}", sdf.format(start)); + + final JobConf job = new NutchJob(getConf()); + job.setJobName("warc-exporter " + output); + + for (final Path segment : segments) { + LOG.info("warc-exporter: adding segment: " + segment); + FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); + FileInputFormat.addInputPath(job, + new Path(segment, CrawlDatum.FETCH_DIR_NAME)); + } + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(WARCReducer.class); + job.setReducerClass(WARCReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(NutchWritable.class); + + FileOutputFormat.setOutputPath(job, new Path(output)); + // using the old api + job.setOutputFormat(WARCOutputFormat.class); + + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(WARCWritable.class); + + try { + RunningJob rj = JobClient.runJob(job); + LOG.info(rj.getCounters().toString()); + long end = System.currentTimeMillis(); + LOG.info("WARCExporter: finished at {}, elapsed: {}", sdf.format(end), + TimingUtil.elapsedTime(start, end)); + } catch (Exception e) { + LOG.error("Exception caught", e); + return -1; + } + + return 0; + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println( + "Usage: WARCExporter <output> (<segment> ... | -dir <segments>)"); + return -1; + } + + final List<Path> segments = new ArrayList<Path>(); + + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-dir")) { + Path dir = new Path(args[++i]); + FileSystem fs = dir.getFileSystem(getConf()); + FileStatus[] fstats = fs.listStatus(dir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path[] files = HadoopFSUtil.getPaths(fstats); + for (Path p : files) { + segments.add(p); + } + } else { + segments.add(new Path(args[i])); + } + } + + return generateWARC(args[0], segments); + } + + public static void main(String[] args) throws Exception { + final int res = ToolRunner.run(NutchConfiguration.create(), + new WARCExporter(), args); + System.exit(res); + } +} Added: nutch/trunk/src/java/org/apache/nutch/tools/warc/package-info.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/warc/package-info.java?rev=1704634&view=auto ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/tools/warc/package-info.java (added) +++ nutch/trunk/src/java/org/apache/nutch/tools/warc/package-info.java Tue Sep 22 14:04:10 2015 @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Tools to import / export between Nutch segments and + * <a href="http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf"> + * WARC archives</a>. + */ +package org.apache.nutch.tools.warc;