Xikui Wang has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1430
Change subject: WIP - Modularize feed adaptors ...................................................................... WIP - Modularize feed adaptors Modularize the feed adaptors and move 'license-unfriendly' adaptors to separate package. Change-Id: Ic4f95255f5493a813ee1f875b63a62e74bc47602 --- M asterixdb/asterix-external-data/pom.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java A asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory A asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/DatasourceFactoryProviderTest.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java A asterixdb/asterix-villain/pom.xml R asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterPullRecordReader.java R asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterPushRecordReader.java R asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterRecordReaderFactory.java R asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/util/TwitterUtil.java A asterixdb/asterix-villain/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory M asterixdb/pom.xml 17 files changed, 215 insertions(+), 62 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/30/1430/1 diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index 72c7997..3796a0d 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -106,6 +106,7 @@ <include>**/NOTICE</include> <include>**/LICENSE</include> <include>**/DEPENDENCIES</include> + <include>**/services/**</include> </includes> </configuration> <executions> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java index c6adbc4..627bd65 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java @@ -32,4 +32,7 @@ public default DataSourceType getDataSourceType() { return DataSourceType.RECORDS; } + + public String[] getRecordReaderNames(); + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java index 2b899d9..65d89cb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java @@ -74,6 +74,7 @@ private InputSplit[] inputSplits; private String nodeName; private Format format; + private String[] recordReaderNames = { "hdfs" }; @Override public void configure(Map<String, String> configuration) throws AsterixException { @@ -226,4 +227,9 @@ public boolean isIndexingOp() { return ((files != null) && indexingOp); } + + @Override + public String[] getRecordReaderNames() { + return recordReaderNames; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java index 2ded3fb..6f7db89 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java @@ -39,6 +39,7 @@ private static final long serialVersionUID = 1L; private final List<String> urls = new ArrayList<String>(); private transient AlgebricksAbsolutePartitionConstraint clusterLocations; + private static String[] recordReaderNames = { "rss_feed" }; @Override public DataSourceType getDataSourceType() { @@ -89,4 +90,9 @@ return SyndEntryImpl.class; } + @Override + public String[] getRecordReaderNames() { + return recordReaderNames; + } + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java index 4649559..4a9fadb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java @@ -23,8 +23,12 @@ import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; +import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory; +import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory; +import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory; import org.apache.asterix.external.provider.StreamRecordReaderProvider; import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -33,13 +37,13 @@ public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> { private static final long serialVersionUID = 1L; - protected final IInputStreamFactory streamFactory; + protected IInputStreamFactory streamFactory; protected Map<String, String> configuration; protected Format format; - - public StreamRecordReaderFactory(IInputStreamFactory inputStreamFactory) { - this.streamFactory = inputStreamFactory; - } + protected String recordReaderName; + private String[] recordReaderNames = { ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, + ExternalDataConstants.ALIAS_SOCKET_ADAPTER, ExternalDataConstants.SOCKET, + ExternalDataConstants.STREAM_SOCKET_CLIENT }; @Override public DataSourceType getDataSourceType() { @@ -60,6 +64,19 @@ @Override public void configure(Map<String, String> configuration) throws HyracksDataException, AlgebricksException { this.configuration = configuration; + recordReaderName = configuration.get(ExternalDataConstants.KEY_READER); + switch (recordReaderName) { + case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER: + this.streamFactory = new LocalFSInputStreamFactory(); + break; + case ExternalDataConstants.ALIAS_SOCKET_ADAPTER: + case ExternalDataConstants.SOCKET: + this.streamFactory = new SocketServerInputStreamFactory(); + break; + case ExternalDataConstants.STREAM_SOCKET_CLIENT: + this.streamFactory = new SocketClientInputStreamFactory(); + break; + } streamFactory.configure(configuration); format = StreamRecordReaderProvider.getReaderFormat(configuration); } @@ -70,4 +87,9 @@ return StreamRecordReaderProvider.createRecordReader(format, streamFactory.createInputStream(ctx, partition), configuration); } + + @Override + public String[] getRecordReaderNames() { + return recordReaderNames; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java index 1428863..4eed8ea 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java @@ -18,8 +18,17 @@ */ package org.apache.asterix.external.provider; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; import java.util.Map; +import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.library.ILibraryManager; @@ -27,25 +36,23 @@ import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType; import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.api.IRecordReaderFactory; -import org.apache.asterix.external.input.HDFSDataSourceFactory; -import org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory; -import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory; -import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory; import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory; -import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory; import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.commons.io.IOUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; public class DatasourceFactoryProvider { + + private static final String RESOURCE = "META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory"; + private static Map<String, Class> factories = null; private DatasourceFactoryProvider() { } public static IExternalDataSourceFactory getExternalDataSourceFactory(ILibraryManager libraryManager, - Map<String, String> configuration) throws HyracksDataException { + Map<String, String> configuration) throws HyracksDataException, AsterixException { if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) { String reader = configuration.get(ExternalDataConstants.KEY_READER); return DatasourceFactoryProvider.getRecordReaderFactory(libraryManager, reader, configuration); @@ -88,41 +95,60 @@ return streamSourceFactory; } - public static IRecordReaderFactory<?> getRecordReaderFactory(ILibraryManager libraryManager, String reader, - Map<String, String> configuration) throws HyracksDataException { - if (reader.equals(ExternalDataConstants.EXTERNAL)) { - try { - return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration); - } catch (AlgebricksException e) { - // Not sure whether this is the right way to handle AlgebricksException (xikui) - throw new HyracksDataException(e); - } + protected static IRecordReaderFactory getInstance(Class clazz) throws AsterixException { + try { + return (IRecordReaderFactory) clazz.newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassCastException e) { + throw new AsterixException("Cannot create: " + clazz.getSimpleName(), e); } - switch (reader) { - case ExternalDataConstants.READER_HDFS: - return new HDFSDataSourceFactory(); - case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER: - return new StreamRecordReaderFactory(new LocalFSInputStreamFactory()); - case ExternalDataConstants.READER_TWITTER_PULL: - case ExternalDataConstants.READER_TWITTER_PUSH: - case ExternalDataConstants.READER_PUSH_TWITTER: - case ExternalDataConstants.READER_PULL_TWITTER: - case ExternalDataConstants.READER_USER_STREAM_TWITTER: - return new TwitterRecordReaderFactory(); - case ExternalDataConstants.ALIAS_SOCKET_ADAPTER: - case ExternalDataConstants.SOCKET: - return new StreamRecordReaderFactory(new SocketServerInputStreamFactory()); - case ExternalDataConstants.STREAM_SOCKET_CLIENT: - return new StreamRecordReaderFactory(new SocketClientInputStreamFactory()); - case ExternalDataConstants.READER_RSS: - return new RSSRecordReaderFactory(); - default: - try { - return (IRecordReaderFactory<?>) Class.forName(reader).newInstance(); - } catch (IllegalAccessException | ClassNotFoundException | InstantiationException - | ClassCastException e) { - throw new RuntimeDataException(ErrorCode.UNKNOWN_RECORD_READER_FACTORY, e,reader); + } + + public static IRecordReaderFactory<?> getRecordReaderFactory(ILibraryManager libraryManager, String adaptorName, + Map<String, String> configuration) throws HyracksDataException, AsterixException { + if (adaptorName.equals(ExternalDataConstants.EXTERNAL)) { + return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration); + } + + if (factories == null) { + factories = initFactories(); + } + + if (factories.containsKey(adaptorName)) { + return getInstance(factories.get(adaptorName)); + } + + try { + return (IRecordReaderFactory) Class.forName(adaptorName).newInstance(); + } catch (IllegalAccessException | ClassNotFoundException | InstantiationException | ClassCastException e) { + throw new RuntimeDataException(ErrorCode.UNKNOWN_RECORD_READER_FACTORY, e, adaptorName); + } + } + + protected static Map<String, Class> initFactories() throws AsterixException { + Map<String, Class> factories = new HashMap<>(); + ClassLoader cl = ParserFactoryProvider.class.getClassLoader(); + final Charset encoding = Charset.forName("UTF-8"); + try { + Enumeration<URL> urls = cl.getResources(RESOURCE); + for (URL url : Collections.list(urls)) { + InputStream is = url.openStream(); + String config = IOUtils.toString(is, encoding); + is.close(); + String[] classNames = config.split("\n"); + for (String className : classNames) { + final Class<?> clazz = Class.forName(className); + String[] formats = ((IRecordReaderFactory) clazz.newInstance()).getRecordReaderNames(); + for (String format : formats) { + if (factories.containsKey(format)) { + throw new AsterixException("Duplicate format " + format); + } + factories.put(format, clazz); + } } + } + } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new AsterixException(e); } + return factories; } } diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory new file mode 100644 index 0000000..f13421c --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory @@ -0,0 +1,3 @@ +org.apache.asterix.external.input.HDFSDataSourceFactory +org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory +org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/DatasourceFactoryProviderTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/DatasourceFactoryProviderTest.java new file mode 100644 index 0000000..3923015 --- /dev/null +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/DatasourceFactoryProviderTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.test; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.api.IRecordReaderFactory; +import org.apache.asterix.external.input.HDFSDataSourceFactory; +import org.apache.asterix.external.provider.DatasourceFactoryProvider; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.junit.Assert; +import org.junit.Test; + +public class DatasourceFactoryProviderTest { + + IRecordReaderFactory factory = null; + + @Test + public void test() throws HyracksDataException, AsterixException { + factory = DatasourceFactoryProvider.getRecordReaderFactory(null, "hdfs", null); + Assert.assertTrue(factory instanceof HDFSDataSourceFactory); + } +} diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java index 3467411..bf8a616 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java @@ -33,6 +33,7 @@ private static final long serialVersionUID = 1L; private transient AlgebricksAbsolutePartitionConstraint clusterLocations; + @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 1); @@ -53,4 +54,8 @@ public Class<?> getRecordClass() { return RecordWithPK.class; } + + @Override public String[] getRecordReaderNames() { + return new String[0]; + } } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java index 6841e6b..b9bba1f 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java @@ -77,4 +77,8 @@ public Class<?> getRecordClass() { return DCPRequest.class; } + + @Override public String[] getRecordReaderNames() { + return new String[0]; + } } diff --git a/asterixdb/asterix-villain/pom.xml b/asterixdb/asterix-villain/pom.xml new file mode 100644 index 0000000..c4ea6de --- /dev/null +++ b/asterixdb/asterix-villain/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>apache-asterixdb</artifactId> + <groupId>org.apache.asterix</groupId> + <version>0.8.9-SNAPSHOT</version> + </parent> + <artifactId>asterix-villain</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.asterix</groupId> + <artifactId>asterix-external-data</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterPullRecordReader.java similarity index 97% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java rename to asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterPullRecordReader.java index 5a7b4b9..fde685b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java +++ b/asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterPullRecordReader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.input.record.reader.twitter; +package org.apache.asterix.villain.external.input.record.reader.twitter; import java.io.IOException; import java.util.List; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterPushRecordReader.java similarity index 89% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java rename to asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterPushRecordReader.java index ffffbd7..610bf37 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java +++ b/asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterPushRecordReader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.input.record.reader.twitter; +package org.apache.asterix.villain.external.input.record.reader.twitter; import java.io.IOException; import java.util.concurrent.LinkedBlockingQueue; @@ -26,18 +26,9 @@ import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.input.record.GenericRecord; import org.apache.asterix.external.util.FeedLogManager; -import org.apache.asterix.external.util.TwitterUtil; -import twitter4j.DirectMessage; +import org.apache.asterix.villain.external.util.TwitterUtil; import twitter4j.FilterQuery; -import twitter4j.StallWarning; -import twitter4j.Status; -import twitter4j.StatusDeletionNotice; -import twitter4j.StatusListener; -import twitter4j.TwitterObjectFactory; import twitter4j.TwitterStream; -import twitter4j.User; -import twitter4j.UserList; -import twitter4j.UserStreamListener; public class TwitterPushRecordReader implements IRecordReader<String> { private LinkedBlockingQueue<String> inputQ; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterRecordReaderFactory.java similarity index 90% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java rename to asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterRecordReaderFactory.java index 570155c..d795905 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java +++ b/asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/input/record/reader/twitter/TwitterRecordReaderFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.input.record.reader.twitter; +package org.apache.asterix.villain.external.input.record.reader.twitter; import java.util.Map; import java.util.logging.Level; @@ -27,9 +27,9 @@ import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.TwitterUtil; -import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants; -import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants; +import org.apache.asterix.villain.external.util.TwitterUtil; +import org.apache.asterix.villain.external.util.TwitterUtil.AuthenticationConstants; +import org.apache.asterix.villain.external.util.TwitterUtil.SearchAPIConstants; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -47,6 +47,14 @@ private Map<String, String> configuration; private transient AlgebricksAbsolutePartitionConstraint clusterLocations; + private static final String[] recordReaderNames = { ExternalDataConstants.READER_TWITTER_PULL, + ExternalDataConstants.READER_TWITTER_PUSH, ExternalDataConstants.READER_PUSH_TWITTER, + ExternalDataConstants.READER_PULL_TWITTER, ExternalDataConstants.READER_USER_STREAM_TWITTER }; + + @Override + public String[] getRecordReaderNames() { + return recordReaderNames; + } @Override public DataSourceType getDataSourceType() { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/util/TwitterUtil.java similarity index 98% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java rename to asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/util/TwitterUtil.java index bd8e52d..ef244c5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java +++ b/asterixdb/asterix-villain/src/main/java/org/apache/asterix/villain/external/util/TwitterUtil.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.util; +package org.apache.asterix.villain.external.util; import java.io.InputStream; import java.util.HashMap; @@ -30,6 +30,7 @@ import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.util.ExternalDataConstants; import twitter4j.DirectMessage; import twitter4j.FilterQuery; import twitter4j.StallWarning; diff --git a/asterixdb/asterix-villain/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-villain/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory new file mode 100644 index 0000000..b876491 --- /dev/null +++ b/asterixdb/asterix-villain/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory @@ -0,0 +1 @@ +org.apache.asterix.villain.external.input.record.reader.twitter.TwitterRecordReaderFactory \ No newline at end of file diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index 675d549..37600b8 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -605,6 +605,7 @@ <module>asterix-coverage</module> <module>asterix-active</module> <module>asterix-client-helper</module> + <module>asterix-villain</module> </modules> <repositories> -- To view, visit https://asterix-gerrit.ics.uci.edu/1430 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ic4f95255f5493a813ee1f875b63a62e74bc47602 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Xikui Wang <xkk...@gmail.com>