Author: andy
Date: Mon Mar 26 09:22:16 2012
New Revision: 1305276
URL: http://svn.apache.org/viewvc?rev=1305276&view=rev
Log:
Extract SinkToQueue and put in atlas/lib
Added:
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/lib/SinkToQueue.java
Modified:
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotParsePuller.java
Added:
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/lib/SinkToQueue.java
URL:
http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/lib/SinkToQueue.java?rev=1305276&view=auto
==============================================================================
---
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/lib/SinkToQueue.java
(added)
+++
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/lib/SinkToQueue.java
Mon Mar 26 09:22:16 2012
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openjena.atlas.lib;
+
+import java.util.concurrent.BlockingQueue ;
+import java.util.concurrent.CancellationException ;
+
+/** Send items to a blocking queue */
+public class SinkToQueue<T> implements Sink<T>
+{
+ private final BlockingQueue<T> queue ;
+
+ public SinkToQueue(BlockingQueue<T> queue) { this.queue = queue ; }
+
+ @Override
+ public void send(T item)
+ {
+ try
+ {
+ if (Thread.interrupted()) throw new InterruptedException();
+ // Hopefully we'll never get passed null... but just in case
+ if (null == item) return;
+ queue.put(item);
+ }
+ catch (InterruptedException e)
+ {
+ throw new CancellationException();
+ }
+ }
+
+ @Override
+ public void flush() {}
+
+ @Override
+ public void close() {}
+}
Modified:
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotParsePuller.java
URL:
http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotParsePuller.java?rev=1305276&r1=1305275&r2=1305276&view=diff
==============================================================================
---
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotParsePuller.java
(original)
+++
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotParsePuller.java
Mon Mar 26 09:22:16 2012
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit ;
import org.openjena.atlas.AtlasException ;
import org.openjena.atlas.lib.Closeable ;
import org.openjena.atlas.lib.Sink ;
+import org.openjena.atlas.lib.SinkToQueue ;
import org.openjena.riot.lang.LangRIOT ;
import org.openjena.riot.system.ParserProfile ;
@@ -71,49 +72,10 @@ public abstract class RiotParsePuller<T>
this.baseIRI = baseIRI;
this.queue = new ArrayBlockingQueue<T>(QUEUE_CAPACITY);
- Sink<T> sink = createSink();
+ Sink<T> sink = new SinkToQueue<T>(queue) ;
this.parser = createParser(sink);
}
- private Sink<T> createSink()
- {
- // Executes within the context of the thread
- Sink<T> sink = new Sink<T>()
- {
- @Override
- public void send(T item)
- {
- try
- {
- if (Thread.interrupted()) throw new InterruptedException();
-
- // Hopefully we'll never get passed null... but just in
case
- if (null == item) return;
-
- queue.put(item);
- }
- catch (InterruptedException e)
- {
- // Presumably throwing an exception is how the parsers
expect you to cancel?
- throw new CancellationException();
- }
- }
-
- @Override
- public void flush()
- {
- // do nothing
- }
-
- @Override
- public void close()
- {
- // do nothing
- }
- };
- return sink;
- }
-
@Override
public Lang getLang()
{