Author: davsclaus
Date: Thu May 15 21:52:20 2008
New Revision: 656933
URL: http://svn.apache.org/viewvc?rev=656933&view=rev
Log:
CAMEL-504 - Refactored atom and added unit tests
Added:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomConsumerSupport.java
(with props)
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
(with props)
Modified:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomComponent.java
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java
Modified:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomComponent.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomComponent.java?rev=656933&r1=656932&r2=656933&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomComponent.java
(original)
+++
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomComponent.java
Thu May 15 21:52:20 2008
@@ -23,6 +23,8 @@
/**
* An <a href="http://activemq.apache.org/camel/atom.html">Atom Component</a>.
+ * <p/>
+ * Camel uses Apache Abdera as the Atom implementation.
*
* @version $Revision$
*/
Added:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomConsumerSupport.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomConsumerSupport.java?rev=656933&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomConsumerSupport.java
(added)
+++
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomConsumerSupport.java
Thu May 15 21:52:20 2008
@@ -0,0 +1,19 @@
+package org.apache.camel.component.atom;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledPollConsumer;
+
+/**
+ * Base class for consuming Atom feeds.
+ */
+public abstract class AtomConsumerSupport extends
ScheduledPollConsumer<Exchange> {
+ public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L;
+ protected final AtomEndpoint endpoint;
+
+ public AtomConsumerSupport(AtomEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ }
+
+}
Propchange:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomConsumerSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomConsumerSupport.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java?rev=656933&r1=656932&r2=656933&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java
(original)
+++
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java
Thu May 15 21:52:20 2008
@@ -20,8 +20,9 @@
import org.apache.abdera.model.Entry;
import org.apache.abdera.model.Feed;
+import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
-import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultPollingEndpoint;
import org.apache.camel.util.ObjectHelper;
@@ -69,13 +70,18 @@
throw new UnsupportedOperationException("AtomProducer is not
implemented");
}
- @Override
- public PollingConsumer<Exchange> createPollingConsumer() throws Exception {
+ public Consumer<Exchange> createConsumer(Processor processor) throws
Exception {
+ AtomConsumerSupport answer;
if (isSplitEntries()) {
- return new AtomEntryPollingConsumer(this, filter, lastUpdate);
+ answer = new AtomEntryPollingConsumer(this, processor, filter,
lastUpdate);
} else {
- return new AtomPollingConsumer(this);
+ answer = new AtomPollingConsumer(this, processor);
}
+ // ScheduledPollConsumer default delay is 500 millis and that is too
often for polling a feed,
+ // so we override with a new default value. End user can override this
value by providing a consumer.delay parameter
+ answer.setDelay(AtomConsumerSupport.DEFAULT_CONSUMER_DELAY);
+ configureConsumer(answer);
+ return answer;
}
/**
Modified:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java?rev=656933&r1=656932&r2=656933&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java
Thu May 15 21:52:20 2008
@@ -25,66 +25,46 @@
import org.apache.abdera.model.Feed;
import org.apache.abdera.parser.ParseException;
import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.impl.PollingConsumerSupport;
+import org.apache.camel.Processor;
/**
* Consumer to poll atom feeds and return each entry from the feed step by
step.
*
* @version $Revision$
*/
-public class AtomEntryPollingConsumer extends PollingConsumerSupport<Exchange>
{
- private final AtomEndpoint endpoint;
+public class AtomEntryPollingConsumer extends AtomPollingConsumer {
private Document<Feed> document;
private int entryIndex;
private EntryFilter entryFilter;
private List<Entry> list;
- public AtomEntryPollingConsumer(AtomEndpoint endpoint, boolean filter,
Date lastUpdate) {
- super(endpoint);
- this.endpoint = endpoint;
+ public AtomEntryPollingConsumer(AtomEndpoint endpoint, Processor
processor, boolean filter,
+ Date lastUpdate) {
+ super(endpoint, processor);
if (filter) {
entryFilter = new UpdatedDateFilter(lastUpdate);
}
}
- public Exchange receiveNoWait() {
- try {
- getDocument();
- Feed feed = document.getRoot();
-
- while (hasNextEntry()) {
- Entry entry = list.get(entryIndex--);
-
- boolean valid = true;
- if (entryFilter != null) {
- valid = entryFilter.isValidEntry(endpoint, document,
entry);
- }
- if (valid) {
- return endpoint.createExchange(feed, entry);
- }
+ public void poll() throws Exception {
+ getDocument();
+ Feed feed = document.getRoot();
+
+ while (hasNextEntry()) {
+ Entry entry = list.get(entryIndex--);
+
+ boolean valid = true;
+ if (entryFilter != null) {
+ valid = entryFilter.isValidEntry(endpoint, document, entry);
+ }
+ if (valid) {
+ Exchange exchange = endpoint.createExchange(feed, entry);
+ getProcessor().process(exchange);
}
-
- // reset document to be able to poll again
- document = null;
- return null;
- } catch (Exception e) {
- throw new RuntimeCamelException(e);
}
- }
-
- public Exchange receive() {
- return receiveNoWait();
- }
-
- public Exchange receive(long timeout) {
- return receiveNoWait();
- }
-
- protected void doStart() throws Exception {
- }
- protected void doStop() throws Exception {
+ // reset document to be able to poll again
+ document = null;
}
private Document<Feed> getDocument() throws IOException, ParseException {
Modified:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java?rev=656933&r1=656932&r2=656933&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java
Thu May 15 21:52:20 2008
@@ -19,44 +19,24 @@
import org.apache.abdera.model.Document;
import org.apache.abdera.model.Feed;
import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.impl.PollingConsumerSupport;
+import org.apache.camel.Processor;
/**
* Consumer to poll atom feeds and return the full feed.
*
* @version $Revision$
*/
-public class AtomPollingConsumer extends PollingConsumerSupport<Exchange> {
- private final AtomEndpoint endpoint;
+public class AtomPollingConsumer extends AtomConsumerSupport {
- public AtomPollingConsumer(AtomEndpoint endpoint) {
- super(endpoint);
- this.endpoint = endpoint;
+ public AtomPollingConsumer(AtomEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
}
- public Exchange receiveNoWait() {
- try {
- Document<Feed> document =
AtomUtils.parseDocument(endpoint.getAtomUri());
- Feed feed = document.getRoot();
- return endpoint.createExchange(feed);
- } catch (Exception e) {
- throw new RuntimeCamelException(e);
- }
+ protected void poll() throws Exception {
+ Document<Feed> document =
AtomUtils.parseDocument(endpoint.getAtomUri());
+ Feed feed = document.getRoot();
+ Exchange exchange = endpoint.createExchange(feed);
+ getProcessor().process(exchange);
}
- public Exchange receive() {
- return receiveNoWait();
- }
-
- public Exchange receive(long timeout) {
- return receiveNoWait();
- }
-
- protected void doStart() throws Exception {
- }
-
- protected void doStop() throws Exception {
- }
-
}
Added:
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java?rev=656933&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
(added)
+++
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
Thu May 15 21:52:20 2008
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atom;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * Unit test for fast polling using a low delay
+ */
+public class AtomPollingLowDelayTest extends ContextTestSupport {
+
+ public void testLowDelay() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(7);
+ mock.setResultWaitTime(1000L);
+ mock.assertIsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+
from("atom:file:src/test/data/feed.atom?splitEntries=true&consumer.delay=100&consumer.initialDelay=0").to("mock:result");
+ }
+ };
+ }
+}
Propchange:
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java?rev=656933&r1=656932&r2=656933&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java
(original)
+++
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java
Thu May 15 21:52:20 2008
@@ -36,10 +36,10 @@
Document<Feed> doc =
AtomUtils.parseDocument("file:src/test/data/feed.atom");
assertNotNull(doc);
+ // timestamp from the feed to use as base
// 2007-11-13T13:35:25.014Z
Calendar cal =
GregorianCalendar.getInstance(TimeZone.getTimeZone("GMT+1:00"));
cal.set(2007, Calendar.NOVEMBER, 13, 14, 35, 0);
- System.out.println(cal.getTime());
EntryFilter filter = new UpdatedDateFilter(cal.getTime());
List<Entry> entries = doc.getRoot().getEntries();