Author: jnioche Date: Tue Jul 15 09:16:47 2014 New Revision: 1610628 URL: http://svn.apache.org/r1610628 Log: NUTCH-1502 Test for CrawlDatum state transitions (snagel)
Added: nutch/trunk/src/test/org/apache/nutch/crawl/ContinuousCrawlTestUtil.java nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDbUpdateUtil.java nutch/trunk/src/test/org/apache/nutch/crawl/TODOTestCrawlDbStates.java nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java Modified: nutch/trunk/CHANGES.txt Modified: nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1610628&r1=1610627&r2=1610628&view=diff ============================================================================== --- nutch/trunk/CHANGES.txt (original) +++ nutch/trunk/CHANGES.txt Tue Jul 15 09:16:47 2014 @@ -2,6 +2,8 @@ Nutch Change Log Nutch Current Development +* NUTCH-1502 Test for CrawlDatum state transitions (snagel) + * NUTCH-1804 Move JUnit dependency to test scope (jnioche) * NUTCH-1811 bin/nutch junit to use junit 4 test runner (snagel) Added: nutch/trunk/src/test/org/apache/nutch/crawl/ContinuousCrawlTestUtil.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/crawl/ContinuousCrawlTestUtil.java?rev=1610628&view=auto ============================================================================== --- nutch/trunk/src/test/org/apache/nutch/crawl/ContinuousCrawlTestUtil.java (added) +++ nutch/trunk/src/test/org/apache/nutch/crawl/ContinuousCrawlTestUtil.java Tue Jul 15 09:16:47 2014 @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.nutch.crawl.CrawlDbUpdateUtil; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.TimingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Emulate a continuous crawl for one URL. + * + */ +public class ContinuousCrawlTestUtil extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousCrawlTestUtil.class); + + protected static Text dummyURL = new Text("http://nutch.apache.org/"); + + protected static Configuration defaultConfig = CrawlDBTestUtil + .createConfiguration(); + + protected long interval = FetchSchedule.SECONDS_PER_DAY*1000; // (default) launch crawler every day + protected long duration = 2*365L*FetchSchedule.SECONDS_PER_DAY*1000L; // run for two years + + protected Configuration configuration; + private FetchSchedule schedule; + + /** status a fetched datum should get */ + protected byte fetchStatus = CrawlDatum.STATUS_FETCH_SUCCESS; + /** expected status of the resulting Db datum */ + protected byte expectedDbStatus = CrawlDatum.STATUS_DB_FETCHED; + + /** for signature calculation */ + protected Signature signatureImpl; + protected Content content = new Content(); + + { + byte[] data = {'n', 'u', 't', 'c', 'h'}; + content.setContent(data); + } + + protected ContinuousCrawlTestUtil(Configuration conf) { + configuration = conf; + schedule = FetchScheduleFactory.getFetchSchedule(new JobConf(conf)); + signatureImpl = SignatureFactory.getSignature(conf); + } + + protected ContinuousCrawlTestUtil(Configuration conf, byte fetchStatus, + byte expectedDbStatus) { + this(conf); + this.fetchStatus = fetchStatus; + this.expectedDbStatus = expectedDbStatus; + } + + protected ContinuousCrawlTestUtil() { + this(defaultConfig); + } + + protected ContinuousCrawlTestUtil(byte fetchStatus, byte expectedDbStatus) { + this(defaultConfig, fetchStatus, expectedDbStatus); + } + + /** set the interval the crawl is relaunched (default: every day) */ + protected void setInterval(int seconds) { + interval = seconds*1000L; + } + + /** set the duration of the continuous crawl (default = 2 years) */ + protected void setDuraction(int seconds) { + duration = seconds*1000L; + } + + /** + * default fetch action: set status and time + * + * @param datum + * CrawlDatum to fetch + * @param currentTime + * current time used to set the fetch time via + * {@link CrawlDatum#setFetchTime(long)} + * @return the modified CrawlDatum + */ + protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { + datum.setStatus(fetchStatus); + datum.setFetchTime(currentTime); + return datum; + } + + /** + * get signature for content and configured signature implementation + */ + protected byte[] getSignature() { + return signatureImpl.calculate(content, null); + } + + /** + * change content to force a changed signature + */ + protected void changeContent() { + byte [] data = Arrays.copyOf(content.getContent(), content.getContent().length+1); + data[content.getContent().length] = '2'; // append one byte + content.setContent(data); + LOG.info("document content changed"); + } + + + /** + * default parse action: add signature if successfully fetched + * + * @param fetchDatum + * fetch datum + * @return list of all datums resulting from parse (status: signature, linked, parse_metadata) + */ + protected List<CrawlDatum> parse(CrawlDatum fetchDatum) { + List<CrawlDatum> parseDatums = new ArrayList<CrawlDatum>(0); + if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_SUCCESS) { + CrawlDatum signatureDatum = new CrawlDatum(CrawlDatum.STATUS_SIGNATURE, 0); + signatureDatum.setSignature(getSignature()); + parseDatums.add(signatureDatum); + } + return parseDatums; + } + + /** + * default implementation to check the result state + * + * @param datum + * the CrawlDatum to be checked + * @return true if the check succeeds + */ + protected boolean check(CrawlDatum datum) { + if (datum.getStatus() != expectedDbStatus) + return false; + return true; + } + + /** + * Run the continuous crawl. + * <p> + * A loop emulates a continuous crawl launched in regular intervals (see + * {@link #setInterval(int)} over a longer period ({@link #setDuraction(int)}. + * + * <ul> + * <li>every "round" emulates + * <ul> + * <li>a fetch (see {@link #fetch(CrawlDatum, long)})</li> + * <li>{@literal updatedb} which returns a {@link CrawlDatum}</li> + * </ul> + * <li>the returned CrawlDatum is used as input for the next round</li> + * <li>and is checked whether it is correct (see {@link #check(CrawlDatum)}) + * </ul> + * </p> + * + * @param maxErrors + * (if > 0) continue crawl even if the checked CrawlDatum is not + * correct, but stop after max. number of errors + * + * @return false if a check of CrawlDatum failed, true otherwise + */ + protected boolean run(int maxErrors) { + + long now = System.currentTimeMillis(); + + CrawlDbUpdateUtil<CrawlDbReducer> updateDb = new CrawlDbUpdateUtil<CrawlDbReducer>( + new CrawlDbReducer(), configuration); + + /* start with a db_unfetched */ + CrawlDatum dbDatum = new CrawlDatum(); + dbDatum.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); + schedule.initializeSchedule(dummyURL, dbDatum); // initialize fetchInterval + dbDatum.setFetchTime(now); + + LOG.info("Emulate a continuous crawl, launched every " + + (interval / (FetchSchedule.SECONDS_PER_DAY * 1000)) + " day (" + + (interval / 1000) + " seconds)"); + long maxTime = (now + duration); + long nextTime = now; + long lastFetchTime = -1; + boolean ok = true; // record failure but keep going + CrawlDatum fetchDatum = new CrawlDatum(); + /* Keep copies because CrawlDbReducer.reduce() + * and FetchSchedule.shouldFetch() may alter the references. + * Copies are used for verbose logging in case of an error. */ + CrawlDatum copyDbDatum = new CrawlDatum(); + CrawlDatum copyFetchDatum = new CrawlDatum(); + CrawlDatum afterShouldFetch = new CrawlDatum(); + int errorCount = 0; + while (nextTime < maxTime) { + LOG.info("check: " + new Date(nextTime)); + fetchDatum.set(dbDatum); + copyDbDatum.set(dbDatum); + if (schedule.shouldFetch(dummyURL, fetchDatum, nextTime)) { + LOG.info("... fetching now (" + new Date(nextTime) + ")"); + if (lastFetchTime > -1) { + LOG.info("(last fetch: " + new Date(lastFetchTime) + " = " + + TimingUtil.elapsedTime(lastFetchTime, nextTime) + " ago)"); + } + lastFetchTime = nextTime; + afterShouldFetch.set(fetchDatum); + fetchDatum = fetch(fetchDatum, nextTime); + copyFetchDatum.set(fetchDatum); + List<CrawlDatum> values = new ArrayList<CrawlDatum>(); + values.add(dbDatum); + values.add(fetchDatum); + values.addAll(parse(fetchDatum)); + List<CrawlDatum> res = updateDb.update(values); + assertNotNull("null returned", res); + assertFalse("no CrawlDatum", 0 == res.size()); + assertEquals("more than one CrawlDatum", 1, res.size()); + if (!check(res.get(0))) { + LOG.info("previously in CrawlDb: " + copyDbDatum); + LOG.info("after shouldFetch(): " + afterShouldFetch); + LOG.info("fetch: " + fetchDatum); + LOG.warn("wrong result in CrawlDb: " + res.get(0)); + if (++errorCount >= maxErrors) { + if (maxErrors > 0) { + LOG.error("Max. number of errors " + maxErrors + + " reached. Stopping."); + } + return false; + } else { + ok = false; // record failure but keep going + } + } + /* use the returned CrawlDatum for the next fetch */ + dbDatum = res.get(0); + } + nextTime += interval; + } + return ok; + } + +} Added: nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDbUpdateUtil.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDbUpdateUtil.java?rev=1610628&view=auto ============================================================================== --- nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDbUpdateUtil.java (added) +++ nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDbUpdateUtil.java Tue Jul 15 09:16:47 2014 @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility to test transitions of {@link CrawlDatum} states during an update of + * {@link CrawlDb} (command {@literal updatedb}): call + * {@link CrawlDbReducer#reduce(Text, Iterator, OutputCollector, Reporter)} with + * the old CrawlDatum (db status) and the new one (fetch status) + */ +public class CrawlDbUpdateUtil<T extends Reducer<Text, CrawlDatum, Text, CrawlDatum>> { + + private static final Logger LOG = LoggerFactory.getLogger(CrawlDbUpdateUtil.class); + + private T reducer; + + public static Text dummyURL = new Text("http://nutch.apache.org/"); + + protected CrawlDbUpdateUtil(T red, Configuration conf) { + reducer = red; + reducer.configure(new JobConf(conf)); + } + + /** {@link OutputCollector} to collect all values in a {@link List} */ + private class ListOutputCollector implements + OutputCollector<Text, CrawlDatum> { + + private List<CrawlDatum> values = new ArrayList<CrawlDatum>(); + + public void collect(Text key, CrawlDatum value) throws IOException { + values.add(value); + } + + /** collected values as list */ + public List<CrawlDatum> getValues() { + return values; + } + + } + + /** + * Dummy reporter which does nothing and does not return null for + * getCounter() + * + * @see {@link Reporter#NULL} + */ + private class DummyReporter implements Reporter { + + private Counters dummyCounters = new Counters(); + + public void progress() { + } + + public Counter getCounter(Enum<?> arg0) { + return dummyCounters.getGroup("dummy").getCounterForName("dummy"); + } + + public Counter getCounter(String arg0, String arg1) { + return dummyCounters.getGroup("dummy").getCounterForName("dummy"); + } + + public InputSplit getInputSplit() throws UnsupportedOperationException { + throw new UnsupportedOperationException("Dummy reporter without input"); + } + + public void incrCounter(Enum<?> arg0, long arg1) { + } + + public void incrCounter(String arg0, String arg1, long arg2) { + } + + public void setStatus(String arg0) { + } + + public float getProgress() { + return 1f; + } + + } + + /** + * run + * {@link CrawlDbReducer#reduce(Text, Iterator, OutputCollector, Reporter)} + * and return the CrawlDatum(s) which would have been written into CrawlDb + * @param values list of input CrawlDatums + * @return list of resulting CrawlDatum(s) in CrawlDb + */ + public List<CrawlDatum> update(List<CrawlDatum> values) { + if (values == null || values.size() == 0) { + return new ArrayList<CrawlDatum>(0); + } + Collections.shuffle(values); // sorting of values should have no influence + ListOutputCollector output = new ListOutputCollector(); + try { + reducer.reduce(dummyURL, values.iterator(), output, new DummyReporter()); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + } + return output.getValues(); + } + + /** + * run + * {@link CrawlDbReducer#reduce(Text, Iterator, OutputCollector, Reporter)} + * and return the CrawlDatum(s) which would have been written into CrawlDb + * @param dbDatum previous CrawlDatum in CrawlDb + * @param fetchDatum CrawlDatum resulting from fetching + * @return list of resulting CrawlDatum(s) in CrawlDb + */ + public List<CrawlDatum> update(CrawlDatum dbDatum, + CrawlDatum fetchDatum) { + List<CrawlDatum> values = new ArrayList<CrawlDatum>(); + if (dbDatum != null) + values.add(dbDatum); + if (fetchDatum != null) + values.add(fetchDatum); + return update(values); + } + + /** + * see {@link #update(List)} + */ + public List<CrawlDatum> update(CrawlDatum... values) { + return update(Arrays.asList(values)); + } + +} Added: nutch/trunk/src/test/org/apache/nutch/crawl/TODOTestCrawlDbStates.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/crawl/TODOTestCrawlDbStates.java?rev=1610628&view=auto ============================================================================== --- nutch/trunk/src/test/org/apache/nutch/crawl/TODOTestCrawlDbStates.java (added) +++ nutch/trunk/src/test/org/apache/nutch/crawl/TODOTestCrawlDbStates.java Tue Jul 15 09:16:47 2014 @@ -0,0 +1,242 @@ +package org.apache.nutch.crawl; + +import static org.apache.nutch.crawl.CrawlDatum.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.util.TimingUtil; + +import static org.junit.Assert.*; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TODOTestCrawlDbStates extends TestCrawlDbStates { + + private static final Logger LOG = LoggerFactory.getLogger(TODOTestCrawlDbStates.class); + + /** + * NUTCH-578: a fetch_retry should result in a db_gone if db.fetch.retry.max is reached. + * Retry counter has to be reset appropriately. + */ + @Test + public void testCrawlDbReducerPageRetrySchedule() { + LOG.info("NUTCH-578: test long running continuous crawl with fetch_retry"); + ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestFetchRetry(); + // keep going for long, to "provoke" a retry counter overflow + if (!crawlUtil.run(150)) { + fail("fetch_retry did not result in a db_gone if retry counter > maxRetries (NUTCH-578)"); + } + } + + private class ContinuousCrawlTestFetchRetry extends ContinuousCrawlTestUtil { + + private int retryMax = 3; + private int totalRetries = 0; + + ContinuousCrawlTestFetchRetry() { + super(); + fetchStatus = STATUS_FETCH_RETRY; + retryMax = configuration.getInt("db.fetch.retry.max", retryMax); + } + + @Override + protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { + datum.setStatus(fetchStatus); + datum.setFetchTime(currentTime); + totalRetries++; + return datum; + } + + @Override + protected boolean check(CrawlDatum result) { + if (result.getRetriesSinceFetch() > retryMax) { + LOG.warn("Retry counter > db.fetch.retry.max: " + result); + } else if (result.getRetriesSinceFetch() == Byte.MAX_VALUE) { + LOG.warn("Retry counter max. value reached (overflow imminent): " + + result); + } else if (result.getRetriesSinceFetch() < 0) { + LOG.error("Retry counter overflow: " + result); + return false; + } + // use retry counter bound to this class (totalRetries) + // instead of result.getRetriesSinceFetch() because the retry counter + // in CrawlDatum could be reset (eg. NUTCH-578_v5.patch) + if (totalRetries < retryMax) { + if (result.getStatus() == STATUS_DB_UNFETCHED) { + LOG.info("ok: " + result); + result.getRetriesSinceFetch(); + return true; + } + } else { + if (result.getStatus() == STATUS_DB_GONE) { + LOG.info("ok: " + result); + return true; + } + } + LOG.warn("wrong: " + result); + return false; + } + + } + + /** + * NUTCH-1564 AdaptiveFetchSchedule: sync_delta forces immediate re-fetch for + * documents not modified + * <p> + * Problem: documents not modified for a longer time are fetched in every + * cycle because of an error in the SYNC_DELTA calculation of + * {@link AdaptiveFetchSchedule}. + * <br> + * The next fetch time should always be in the future, never in the past. + * </p> + */ + @Test + public void testAdaptiveFetchScheduleSyncDelta() { + LOG.info("NUTCH-1564 test SYNC_DELTA calculation of AdaptiveFetchSchedule"); + Configuration conf = CrawlDBTestUtil.createConfiguration(); + conf.setLong("db.fetch.interval.default", 172800); // 2 days + conf.setLong("db.fetch.schedule.adaptive.min_interval", 86400); // 1 day + conf.setLong("db.fetch.schedule.adaptive.max_interval", 604800); // 7 days + conf.setLong("db.fetch.interval.max", 604800); // 7 days + conf.set("db.fetch.schedule.class", + "org.apache.nutch.crawl.AdaptiveFetchSchedule"); + ContinuousCrawlTestUtil crawlUtil = new CrawlTestFetchScheduleNotModifiedFetchTime( + conf); + crawlUtil.setInterval(FetchSchedule.SECONDS_PER_DAY/3); + if (!crawlUtil.run(100)) { + fail("failed: sync_delta calculation with AdaptiveFetchSchedule"); + } + } + + private class CrawlTestFetchScheduleNotModifiedFetchTime extends + CrawlTestFetchNotModified { + + // time of current fetch + private long fetchTime; + + private long minInterval; + private long maxInterval; + + CrawlTestFetchScheduleNotModifiedFetchTime(Configuration conf) { + super(conf); + minInterval = conf.getLong("db.fetch.schedule.adaptive.min_interval", + 86400); // 1 day + maxInterval = conf.getLong("db.fetch.schedule.adaptive.max_interval", + 604800); // 7 days + if (conf.getLong("db.fetch.interval.max", 604800) < maxInterval) { + maxInterval = conf.getLong("db.fetch.interval.max", 604800); + } + } + + @Override + protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { + // remember time of fetching + fetchTime = currentTime; + return super.fetch(datum, currentTime); + } + + @Override + protected boolean check(CrawlDatum result) { + if (result.getStatus() == STATUS_DB_NOTMODIFIED) { + // check only status notmodified here + long secondsUntilNextFetch = (result.getFetchTime() - fetchTime) / 1000L; + if (secondsUntilNextFetch < -1) { + // next fetch time is in the past (more than one second) + LOG.error("Next fetch time is in the past: " + result); + return false; + } + if (secondsUntilNextFetch < 60) { + // next fetch time is in less than one minute + // (critical: Nutch can hardly be so fast) + LOG.error("Less then one minute until next fetch: " + result); + } + // Next fetch time should be within min. and max. (tolerance: 60 sec.) + if (secondsUntilNextFetch+60 < minInterval + || secondsUntilNextFetch-60 > maxInterval) { + LOG.error("Interval until next fetch time (" + + TimingUtil.elapsedTime(fetchTime, result.getFetchTime()) + + ") is not within min. and max. interval: " + result); + // TODO: is this a failure? + } + } + return true; + } + + } + + /** + * Test whether signatures are reset for "content-less" states + * (gone, redirect, etc.): otherwise, if this state is temporary + * and the document appears again with the old content, it may + * get marked as not_modified in CrawlDb just after the redirect + * state. In this case we cannot expect content in segments. + * Cf. NUTCH-1422: reset signature for redirects. + */ + // TODO: can only test if solution is done in CrawlDbReducer + @Test + public void testSignatureReset() { + LOG.info("NUTCH-1422 must reset signature for redirects and similar states"); + Configuration conf = CrawlDBTestUtil.createConfiguration(); + for (String sched : schedules) { + LOG.info("Testing reset signature with " + sched); + conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl."+sched); + ContinuousCrawlTestUtil crawlUtil = new CrawlTestSignatureReset(conf); + if (!crawlUtil.run(20)) { + fail("failed: signature not reset"); + } + } + } + + private class CrawlTestSignatureReset extends ContinuousCrawlTestUtil { + + byte[][] noContentStates = { + { STATUS_FETCH_GONE, STATUS_DB_GONE }, + { STATUS_FETCH_REDIR_TEMP, STATUS_DB_REDIR_TEMP }, + { STATUS_FETCH_REDIR_PERM, STATUS_DB_REDIR_PERM } }; + + int counter = 0; + byte fetchState; + + public CrawlTestSignatureReset(Configuration conf) { + super(conf); + } + + @Override + protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { + datum = super.fetch(datum, currentTime); + counter++; + // flip-flopping between successful fetch and one of content-less states + if (counter%2 == 1) { + fetchState = STATUS_FETCH_SUCCESS; + } else { + fetchState = noContentStates[(counter%6)/2][0]; + } + LOG.info("Step " + counter + ": fetched with " + + getStatusName(fetchState)); + datum.setStatus(fetchState); + return datum; + } + + @Override + protected boolean check(CrawlDatum result) { + if (result.getStatus() == STATUS_DB_NOTMODIFIED + && !(fetchState == STATUS_FETCH_SUCCESS || fetchState == STATUS_FETCH_NOTMODIFIED)) { + LOG.error("Should never get into state " + + getStatusName(STATUS_DB_NOTMODIFIED) + " from " + + getStatusName(fetchState)); + return false; + } + if (result.getSignature() != null + && !(result.getStatus() == STATUS_DB_FETCHED || result.getStatus() == STATUS_DB_NOTMODIFIED)) { + LOG.error("Signature not reset in state " + + getStatusName(result.getStatus())); + // ok here: since it's not the problem itself (the db_notmodified), but + // the reason for it + } + return true; + } + + } + +} Added: nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java?rev=1610628&view=auto ============================================================================== --- nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java (added) +++ nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java Tue Jul 15 09:16:47 2014 @@ -0,0 +1,486 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.StringUtils; + +import org.apache.nutch.crawl.CrawlDatum; +import static org.apache.nutch.crawl.CrawlDatum.*; +import org.apache.nutch.scoring.ScoringFilterException; +import org.apache.nutch.scoring.ScoringFilters; + +import static org.junit.Assert.*; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test transitions of {@link CrawlDatum} states during an update of + * {@link CrawlDb} (command {@literal updatedb}): + * <ul> + * <li>simulate updatedb with the old CrawlDatum (db status) and the new one + * (fetch status) and test whether the resulting CrawlDatum has the appropriate + * status.</li> + * <li>also check for further CrawlDatum fields (signature, etc.)</li> + * <li>and additional conditions:</li> + * <ul> + * <li>retry counters</li> + * <li>signatures</li> + * <li>configuration properties</li> + * <li>(additional) CrawlDatums of status linked (stemming from inlinks)</li> + * </ul> + * </li> + * </ul> + */ +public class TestCrawlDbStates { + + private static final Logger LOG = LoggerFactory.getLogger(TestCrawlDbStates.class); + + protected static final byte[][] fetchDbStatusPairs = { + { -1, STATUS_DB_UNFETCHED }, + { STATUS_FETCH_SUCCESS, STATUS_DB_FETCHED }, + { STATUS_FETCH_GONE, STATUS_DB_GONE }, + { STATUS_FETCH_REDIR_TEMP, STATUS_DB_REDIR_TEMP }, + { STATUS_FETCH_REDIR_PERM, STATUS_DB_REDIR_PERM }, + { STATUS_FETCH_NOTMODIFIED, STATUS_DB_NOTMODIFIED }, + { STATUS_FETCH_RETRY, -1 }, // fetch_retry does not have a CrawlDb counter-part + { -1, STATUS_DB_DUPLICATE }, + }; + + /** tested {@link FetchSchedule} implementations */ + protected String[] schedules = {"DefaultFetchSchedule", "AdaptiveFetchSchedule"}; + + /** CrawlDatum as result of a link */ + protected final CrawlDatum linked = new CrawlDatum(STATUS_LINKED, + CrawlDBTestUtil.createConfiguration().getInt("db.fetch.interval.default", + 2592000), 0.1f); + + /** + * Test the matrix of state transitions: + * <ul> + * <li>for all available {@link FetchSchedule} implementations</li> + * <li>for every possible status in CrawlDb (including "not in CrawlDb")</li> + * <li>for every possible fetch status</li> + * <li>and zero or more (0-3) additional in-links</li> + * </ul> + * call {@literal updatedb} and check whether the resulting CrawlDb status is + * the expected one. + */ + @Test + public void testCrawlDbStateTransitionMatrix() { + LOG.info("Test CrawlDatum state transitions"); + Configuration conf = CrawlDBTestUtil.createConfiguration(); + CrawlDbUpdateUtil<CrawlDbReducer> updateDb = new CrawlDbUpdateUtil<CrawlDbReducer>( + new CrawlDbReducer(), conf); + int retryMax = conf.getInt("db.fetch.retry.max", 3); + for (String sched : schedules) { + LOG.info("Testing state transitions with " + sched); + conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl."+sched); + FetchSchedule schedule = FetchScheduleFactory + .getFetchSchedule(new JobConf(conf)); + for (int i = 0; i < fetchDbStatusPairs.length; i++) { + byte fromDbStatus = fetchDbStatusPairs[i][1]; + for (int j = 0; j < fetchDbStatusPairs.length; j++) { + byte fetchStatus = fetchDbStatusPairs[j][0]; + CrawlDatum fromDb = null; + if (fromDbStatus == -1) { + // nothing yet in CrawlDb + // CrawlDatum added by FreeGenerator or via outlink + } else { + fromDb = new CrawlDatum(); + fromDb.setStatus(fromDbStatus); + // initialize fetchInterval: + schedule.initializeSchedule(CrawlDbUpdateUtil.dummyURL, fromDb); + } + // expected db status + byte toDbStatus = fetchDbStatusPairs[j][1]; + if (fetchStatus == -1) { + if (fromDbStatus == -1) { + // nothing fetched yet: new document detected via outlink + toDbStatus = STATUS_DB_UNFETCHED; + } else { + // nothing fetched but new inlinks detected: status is unchanged + toDbStatus = fromDbStatus; + } + } else if (fetchStatus == STATUS_FETCH_RETRY) { + // a simple test of fetch_retry (without retries) + if (fromDb == null || fromDb.getRetriesSinceFetch() < retryMax) { + toDbStatus = STATUS_DB_UNFETCHED; + } else { + toDbStatus = STATUS_DB_GONE; + } + } + String fromDbStatusName = (fromDbStatus == -1 ? "<not in CrawlDb>" + : getStatusName(fromDbStatus)); + String fetchStatusName = (fetchStatus == -1 ? "<only inlinks>" : CrawlDatum + .getStatusName(fetchStatus)); + LOG.info(fromDbStatusName + " + " + fetchStatusName + " => " + + getStatusName(toDbStatus)); + List<CrawlDatum> values = new ArrayList<CrawlDatum>(); + for (int l = 0; l <= 2; l++) { // number of additional in-links + CrawlDatum fetch = null; + if (fetchStatus == -1) { + // nothing fetched, need at least one in-link + if (l == 0) continue; + } else { + fetch = new CrawlDatum(); + if (fromDb != null) { + fetch.set(fromDb); + } else { + // not yet in CrawlDb: added by FreeGenerator + schedule.initializeSchedule(CrawlDbUpdateUtil.dummyURL, fetch); + } + fetch.setStatus(fetchStatus); + fetch.setFetchTime(System.currentTimeMillis()); + } + if (fromDb != null) + values.add(fromDb); + if (fetch != null) + values.add(fetch); + for (int n = 0; n < l; n++) { + values.add(linked); + } + List<CrawlDatum> res = updateDb.update(values); + if (res.size() != 1) { + fail("CrawlDb update didn't result in one single CrawlDatum per URL"); + continue; + } + byte status = res.get(0).getStatus(); + if (status != toDbStatus) { + fail("CrawlDb update for " + fromDbStatusName + " and " + + fetchStatusName + " and " + l + " inlinks results in " + + getStatusName(status) + " (expected: " + + getStatusName(toDbStatus) + ")"); + } + values.clear(); + } + } + } + } + } + + /** + * Test states after inject: inject must not modify the status of CrawlDatums + * already in CrawlDb. Newly injected elements have status "db_unfetched". + * Inject is simulated by calling {@link Injector.InjectReducer#reduce()}. + */ + @Test + public void testCrawlDbStatTransitionInject() { + LOG.info("Test CrawlDatum states in Injector after inject"); + Configuration conf = CrawlDBTestUtil.createConfiguration(); + CrawlDbUpdateUtil<Injector.InjectReducer> inject = new CrawlDbUpdateUtil<Injector.InjectReducer>( + new Injector.InjectReducer(), conf); + ScoringFilters scfilters = new ScoringFilters(conf); + for (String sched : schedules) { + LOG.info("Testing inject with " + sched); + conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl."+sched); + FetchSchedule schedule = FetchScheduleFactory + .getFetchSchedule(new JobConf(conf)); + List<CrawlDatum> values = new ArrayList<CrawlDatum>(); + for (int i = 0; i < fetchDbStatusPairs.length; i++) { + byte fromDbStatus = fetchDbStatusPairs[i][1]; + byte toDbStatus = fromDbStatus; + if (fromDbStatus == -1) { + toDbStatus = STATUS_DB_UNFETCHED; + } else { + CrawlDatum fromDb = new CrawlDatum(); + fromDb.setStatus(fromDbStatus); + schedule.initializeSchedule(CrawlDbUpdateUtil.dummyURL, fromDb); + values.add(fromDb); + } + LOG.info("inject " + + (fromDbStatus == -1 ? "<not in CrawlDb>" : CrawlDatum + .getStatusName(fromDbStatus)) + " + " + + getStatusName(STATUS_INJECTED) + " => " + + getStatusName(toDbStatus)); + CrawlDatum injected = new CrawlDatum(STATUS_INJECTED, + conf.getInt("db.fetch.interval.default", 2592000), 0.1f); + schedule.initializeSchedule(CrawlDbUpdateUtil.dummyURL, injected); + try { + scfilters.injectedScore(CrawlDbUpdateUtil.dummyURL, injected); + } catch (ScoringFilterException e) { + LOG.error(StringUtils.stringifyException(e)); + } + values.add(injected); + List<CrawlDatum> res = inject.update(values); + if (res.size() != 1) { + fail("Inject didn't result in one single CrawlDatum per URL"); + continue; + } + byte status = res.get(0).getStatus(); + if (status != toDbStatus) { + fail("Inject for " + + (fromDbStatus == -1 ? "" : getStatusName(fromDbStatus) + " and ") + + getStatusName(STATUS_INJECTED) + + " results in " + getStatusName(status) + + " (expected: " + getStatusName(toDbStatus) + ")"); + } + values.clear(); + } + } + } + + /** + * Test status db_notmodified detected by + * <ul> + * <li>signature comparison</li> + * <li>or HTTP 304</li> + * </ul> + * In addition, test for all available {@link FetchSchedule} implementations + * whether + * <ul> + * <li>modified time is set</li> + * <li>re-fetch is triggered after a certain time to force the fetched content + * to be in a recent segment (old segments are deleted, see comments in + * {@link CrawlDbReducer#reduce(Text, Iterator, OutputCollector, Reporter)} + * </li> + * </ul> + */ + @Test + public void testCrawlDbReducerNotModified() { + LOG.info("Test state notmodified"); + Configuration conf = CrawlDBTestUtil.createConfiguration(); + // test not modified detected by signature comparison + for (String sched : schedules) { + String desc = "test notmodified by signature comparison + " + sched; + LOG.info(desc); + conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl."+sched); + ContinuousCrawlTestUtil crawlUtil = new CrawlTestFetchNotModified(conf); + if (!crawlUtil.run(20)) { + fail("failed: " + desc); + } + } + // test not modified detected by HTTP 304 + for (String sched : schedules) { + String desc = "test notmodified by HTTP 304 + " + sched; + LOG.info(desc); + conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl."+sched); + ContinuousCrawlTestUtil crawlUtil = new CrawlTestFetchNotModifiedHttp304(conf); + if (!crawlUtil.run(20)) { + fail("failed: " + desc); + } + } + } + + protected class CrawlTestFetchNotModified extends ContinuousCrawlTestUtil { + + /** time of the current fetch */ + protected long currFetchTime; + /** time the last fetch took place */ + protected long lastFetchTime; + /** time the document was fetched first (at all or after it has been changed) */ + protected long firstFetchTime; + /** state in CrawlDb before the last fetch */ + protected byte previousDbState; + /** signature in CrawlDb of previous fetch */ + protected byte[] lastSignature; + + private long maxFetchInterval; + private FetchSchedule schedule; + + + CrawlTestFetchNotModified(Configuration conf) { + super(conf); + maxFetchInterval = conf.getLong("db.fetch.interval.max", 7776000); // default = 90 days + maxFetchInterval += (24*60*60); // but take one day more to avoid false alarms + maxFetchInterval *= 1000; // in milli-seconds + schedule = FetchScheduleFactory.getFetchSchedule(new JobConf(conf)); + } + + @Override + protected boolean check(CrawlDatum result) { + if (lastFetchTime > 0 && (currFetchTime - lastFetchTime) > maxFetchInterval) { + LOG.error("last effective fetch (HTTP 200, not HTTP 304), at " + + new Date(lastFetchTime) + + ", took place more than db.fetch.interval.max time, " + + "segment containing fetched content may have been deleted"); + return false; + } + switch (result.getStatus()) { + case STATUS_DB_NOTMODIFIED: + // db_notmodified is correct if the document has been fetched previously + // and it has not been changed since + if ((previousDbState == STATUS_DB_FETCHED || previousDbState == STATUS_DB_NOTMODIFIED)) { + if (lastSignature != null + && result.getSignature() != null + && SignatureComparator._compare(lastSignature, + result.getSignature()) != 0) { + LOG.error("document has changed (signature changed) but state is still " + + getStatusName(STATUS_DB_NOTMODIFIED)); + return false; + } + LOG.info("ok: " + result); + return checkModifiedTime(result, firstFetchTime); + } + LOG.warn("notmodified without previous fetch"); + break; + case STATUS_DB_FETCHED: + if (previousDbState == STATUS_DB_UNFETCHED) { + LOG.info("ok (first fetch): " + result); + return checkModifiedTime(result, firstFetchTime); + } else if (lastSignature != null + && result.getSignature() != null + && SignatureComparator._compare(lastSignature, + result.getSignature()) != 0) { + LOG.info("ok (content changed): " + result); + // expect modified time == now + return checkModifiedTime(result, currFetchTime); + } else { + LOG.warn("document has not changed, db_notmodified expected"); + } + break; + case STATUS_DB_UNFETCHED: + /** + * Status db_unfetched is possible with {@link AdaptiveFetchSchedule} + * because {@link CrawlDbReducer#reduce} calls + * {@link FetchSchedule#forceRefetch} to force a re-fetch if fetch + * interval grows too large. + */ + if (schedule.getClass() == AdaptiveFetchSchedule.class) { + LOG.info("state set to unfetched by AdaptiveFetchSchedule"); + if (result.getSignature() != null) { + LOG.warn("must reset signature: " + result); + return false; + } + LOG.info("ok: " + result); + firstFetchTime = 0; + return true; + } + } + LOG.warn("wrong result: " + result); + return false; + } + + + // test modified time + private boolean checkModifiedTime(CrawlDatum result, long modifiedTime) { + if (result.getModifiedTime() == 0) { + LOG.error("modified time not set (TODO: not set by DefaultFetchSchedule)"); + // TODO: return false (but DefaultFetchSchedule does not set modified + // time, see NUTCH-933) + return true; + } else if (modifiedTime == result.getModifiedTime()) { + return true; + } + LOG.error("wrong modified time: " + new Date(result.getModifiedTime()) + + " (expected " + new Date(modifiedTime) + ")"); + return false; + } + + @Override + protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { + lastFetchTime = currFetchTime; + currFetchTime = currentTime; + previousDbState = datum.getStatus(); + lastSignature = datum.getSignature(); + datum = super.fetch(datum, currentTime); + if (firstFetchTime == 0) { + firstFetchTime = currFetchTime; + } else if ((currFetchTime - firstFetchTime) > (duration/2)) { + // simulate a modification after "one year" + changeContent(); + firstFetchTime = currFetchTime; + } + return datum; + } + } + + protected class CrawlTestFetchNotModifiedHttp304 extends CrawlTestFetchNotModified { + + CrawlTestFetchNotModifiedHttp304(Configuration conf) { + super(conf); + } + + @Override + protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { + lastFetchTime = currFetchTime; + currFetchTime = currentTime; + previousDbState = datum.getStatus(); + lastSignature = datum.getSignature(); + int httpCode; + /* document is "really" fetched (no HTTP 304) + * - if last-modified time or signature are unset + * (page has not been fetched before or fetch is forced) + * - for test purposes, we simulate a modified after "one year" + */ + if (datum.getModifiedTime() == 0 && datum.getSignature() == null + || (currFetchTime - firstFetchTime) > (duration/2)) { + firstFetchTime = currFetchTime; + httpCode = 200; + datum.setStatus(STATUS_FETCH_SUCCESS); + // modify content to change signature + changeContent(); + } else { + httpCode = 304; + datum.setStatus(STATUS_FETCH_NOTMODIFIED); + } + LOG.info("fetched with HTTP " + httpCode + " => " + + getStatusName(datum.getStatus())); + datum.setFetchTime(currentTime); + return datum; + } + } + + /** + * NUTCH-1245: a fetch_gone should always result in a db_gone. + * <p> + * Even in a long-running continuous crawl, when a gone page is + * re-fetched several times over time. + * </p> + */ + @Test + public void testCrawlDbReducerPageGoneSchedule1() { + LOG.info("NUTCH-1245: test long running continuous crawl"); + ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestUtil( + STATUS_FETCH_GONE, STATUS_DB_GONE); + if (!crawlUtil.run(20)) { + fail("fetch_gone did not result in a db_gone (NUTCH-1245)"); + } + } + + /** + * NUTCH-1245: a fetch_gone should always result in a db_gone. + * <p> + * As some kind of misconfiguration set db.fetch.interval.default to a value + * > (fetchIntervalMax * 1.5). + * </p> + */ + @Test + public void testCrawlDbReducerPageGoneSchedule2() { + LOG.info("NUTCH-1245 (misconfiguration): test with db.fetch.interval.default > (1.5 * db.fetch.interval.max)"); + Configuration conf = CrawlDBTestUtil.createConfiguration(); + int fetchIntervalMax = conf.getInt("db.fetch.interval.max", 0); + conf.setInt("db.fetch.interval.default", + 3 + (int) (fetchIntervalMax * 1.5)); + ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestUtil(conf, + STATUS_FETCH_GONE, STATUS_DB_GONE); + if (!crawlUtil.run(0)) { + fail("fetch_gone did not result in a db_gone (NUTCH-1245)"); + } + } + +} +