Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1535

Change subject: WIP - Feed Adaptor and RecordReader Clean
......................................................................

WIP - Feed Adaptor and RecordReader Clean

1. Cleaned DatasourceFactoryProvider with service interface
2. Cleaned StreamRecordReaderProvider with serivce interface
3. Combined LineRecordReader with QuotedLineRecordReader to fit the
StreamRecordReader interface.
4. Delayed the Inputstream binding at the configuration phase.
5. Added one simple test case for StreamRecordReaderProvider.

Change-Id: I699657ddd8408fd00bcbd7df57b6610ef3692a1a
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M asterixdb/asterix-external-data/pom.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
D 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
A 
asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
A 
asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
A 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java
23 files changed, 473 insertions(+), 249 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/35/1535/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 83ff3a2..5365bd8 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -158,6 +158,8 @@
     public static final int UTIL_FILE_SYSTEM_WATCHER_NO_FILES_FOUND = 3076;
     public static final int UTIL_LOCAL_FILE_SYSTEM_UTILS_PATH_NOT_FOUND = 3077;
     public static final int UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER = 
3078;
+    public static final int READER_FACTORY_RECORD_READER_NOT_FOUND = 3079;
+    public static final int 
PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING = 3080;
 
     // Loads the map that maps error codes to error message templates.
     private static Map<Integer, String> errorMessageMap = null;
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 3e96972..81dd289 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -143,3 +143,4 @@
 3076 = %1$s: no files found
 3077 = %1$s: path not found
 3078 = Cannot obtain hdfs scheduler
+3080 = Duplicate record reader format
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index d1f67f0..a1c7012 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -404,10 +404,5 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <version>2.5</version>
-    </dependency>
   </dependencies>
 </project>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
index c6adbc4..a8c5d31 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
@@ -21,6 +21,8 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+import java.util.List;
+
 public interface IRecordReaderFactory<T> extends IExternalDataSourceFactory {
 
     public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext 
ctx, int partition)
@@ -32,4 +34,7 @@
     public default DataSourceType getDataSourceType() {
         return DataSourceType.RECORDS;
     }
+
+    public List<String> getRecordReaderNames();
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 2b899d9..aa5d8e1 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -37,7 +38,6 @@
 import org.apache.asterix.external.input.stream.HDFSInputStream;
 import org.apache.asterix.external.provider.ExternalIndexerProvider;
 import org.apache.asterix.external.provider.StreamRecordReaderProvider;
-import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
@@ -73,7 +73,8 @@
     private JobConf conf;
     private InputSplit[] inputSplits;
     private String nodeName;
-    private Format format;
+    private Class recordReaderClazz;
+    private static final List<String> recordReaderNames = 
Collections.unmodifiableList(Arrays.asList("hdfs"));
 
     @Override
     public void configure(Map<String, String> configuration) throws 
AsterixException {
@@ -105,7 +106,7 @@
                 this.recordClass = reader.createValue().getClass();
                 reader.close();
             } else {
-                format = 
StreamRecordReaderProvider.getReaderFormat(configuration);
+                recordReaderClazz = 
StreamRecordReaderProvider.getRecordReaderClazz(configuration);
                 this.recordClass = char[].class;
             }
         } catch (IOException e) {
@@ -193,9 +194,10 @@
             throws HyracksDataException {
         try {
             IExternalIndexer indexer = files == null ? null : 
ExternalIndexerProvider.getIndexer(configuration);
-            if (format != null) {
-                StreamRecordReader streamReader = 
StreamRecordReaderProvider.createRecordReader(format,
-                        createInputStream(ctx, partition, indexer), 
configuration);
+            if (recordReaderClazz != null) {
+                StreamRecordReader streamReader = (StreamRecordReader) 
recordReaderClazz
+                        .getDeclaredConstructor(AsterixInputStream.class, 
Map.class)
+                        .newInstance(createInputStream(ctx, partition, 
indexer), configuration);
                 if (indexer != null) {
                     return new IndexingStreamRecordReader(streamReader, 
indexer);
                 } else {
@@ -226,4 +228,9 @@
     public boolean isIndexingOp() {
         return ((files != null) && indexingOp);
     }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index 2ded3fb..ee59eac 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -20,6 +20,8 @@
 
 import java.net.MalformedURLException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -39,6 +41,7 @@
     private static final long serialVersionUID = 1L;
     private final List<String> urls = new ArrayList<String>();
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+    private static final List<String> recordReaderNames = 
Collections.unmodifiableList(Arrays.asList("rss_feed"));
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -70,6 +73,11 @@
     }
 
     @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+    @Override
     public boolean isIndexible() {
         return false;
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index aa0451a..3c17db9 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@ -19,13 +19,21 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 
 public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
 
-    public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream) {
+
+    public static final List<String> recordReaderFormats = Collections
+            
.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_LINE_SEPARATED));
+
+    public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream, 
Map<String, String> config) {
         super(inputStream);
     }
 
@@ -100,6 +108,11 @@
         return true;
     }
 
+    @Override
+    public List<String> getRecordReaderFormats() {
+        return recordReaderFormats;
+    }
+
     private boolean skipWhiteSpace() throws IOException {
         // start by skipping white spaces
         while (true) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 59b72e4..c29c0e7 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -19,9 +19,15 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class LineRecordReader extends StreamRecordReader {
@@ -31,12 +37,33 @@
     protected int newlineLength;
     protected int recordNumber = 0;
     protected boolean nextIsHeader = false;
+    private final boolean quotedReader;
+    private final char quote;
+    private boolean inQuote;
+    private boolean prevCharEscape;
+    public static final List<String> recordReaderFormats = 
Collections.unmodifiableList(Arrays.asList(
+            ExternalDataConstants.FORMAT_DELIMITED_TEXT,
+            ExternalDataConstants.FORMAT_CSV
+    ));
 
-    public LineRecordReader(final boolean hasHeader, final AsterixInputStream 
stream) throws HyracksDataException {
-        super(stream);
+    public LineRecordReader(AsterixInputStream inputStream, Map<String, 
String> config) throws HyracksDataException {
+        super(inputStream);
+        String quoteString = config.get(ExternalDataConstants.KEY_QUOTE);
+        if (quoteString != null) {
+            quotedReader = true;
+            if (quoteString.length() != 1) {
+                throw new 
HyracksDataException(ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE,
+                        ExternalDataConstants.PARAMETER_OF_SIZE_ONE, 
quoteString));
+            }
+            quote = quoteString.charAt(0);
+        } else {
+            quotedReader = false;
+            quote = '\n';
+        }
+        boolean hasHeader = ExternalDataUtils.hasHeader(config);
         this.hasHeader = hasHeader;
         if (hasHeader) {
-            stream.setNotificationHandler(this);
+            inputStream.setNotificationHandler(this);
         }
     }
 
@@ -48,7 +75,100 @@
     }
 
     @Override
+    public List<String> getRecordReaderFormats() {
+        return recordReaderFormats;
+    }
+
+    @Override
     public boolean hasNext() throws IOException {
+        if (quotedReader) {
+            return quotedLineRecordReaderHasNext();
+        } else {
+            return lineRecordReaderHasNext();
+        }
+    }
+
+    public boolean quotedLineRecordReaderHasNext() throws IOException {
+        while (true) {
+            if (done) {
+                return false;
+            }
+            newlineLength = 0;
+            prevCharCR = false;
+            prevCharEscape = false;
+            record.reset();
+            int readLength = 0;
+            inQuote = false;
+            do {
+                int startPosn = bufferPosn;
+                if (bufferPosn >= bufferLength) {
+                    startPosn = bufferPosn = 0;
+                    bufferLength = reader.read(inputBuffer);
+                    if (bufferLength <= 0) {
+                        {
+                            if (readLength > 0) {
+                                if (inQuote) {
+                                    throw new IOException("malformed input 
record ended inside quote");
+                                }
+                                record.endRecord();
+                                recordNumber++;
+                                return true;
+                            }
+                            close();
+                            return false;
+                        }
+                    }
+                }
+                for (; bufferPosn < bufferLength; ++bufferPosn) {
+                    if (!inQuote) {
+                        if (inputBuffer[bufferPosn] == 
ExternalDataConstants.LF) {
+                            newlineLength = (prevCharCR) ? 2 : 1;
+                            ++bufferPosn;
+                            break;
+                        }
+                        if (prevCharCR) {
+                            newlineLength = 1;
+                            break;
+                        }
+                        prevCharCR = (inputBuffer[bufferPosn] == 
ExternalDataConstants.CR);
+                        if (inputBuffer[bufferPosn] == quote) {
+                            if (!prevCharEscape) {
+                                inQuote = true;
+                            }
+                        }
+                        if (prevCharEscape) {
+                            prevCharEscape = false;
+                        } else {
+                            prevCharEscape = inputBuffer[bufferPosn] == 
ExternalDataConstants.ESCAPE;
+                        }
+                    } else {
+                        // only look for next quote
+                        if (inputBuffer[bufferPosn] == quote) {
+                            if (!prevCharEscape) {
+                                inQuote = false;
+                            }
+                        }
+                        prevCharEscape = inputBuffer[bufferPosn] == 
ExternalDataConstants.ESCAPE;
+                    }
+                }
+                readLength = bufferPosn - startPosn;
+                if (prevCharCR && newlineLength == 0) {
+                    --readLength;
+                }
+                if (readLength > 0) {
+                    record.append(inputBuffer, startPosn, readLength);
+                }
+            } while (newlineLength == 0);
+            if (nextIsHeader) {
+                nextIsHeader = false;
+                continue;
+            }
+            recordNumber++;
+            return true;
+        }
+    }
+
+    public boolean lineRecordReaderHasNext() throws IOException {
         while (true) {
             if (done) {
                 return false;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
deleted file mode 100644
index 006bf0b..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import java.io.IOException;
-
-import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class QuotedLineRecordReader extends LineRecordReader {
-
-    private final char quote;
-    private boolean prevCharEscape;
-    private boolean inQuote;
-
-    public QuotedLineRecordReader(final boolean hasHeader, final 
AsterixInputStream stream, final String quoteString)
-            throws HyracksDataException {
-        super(hasHeader, stream);
-        if ((quoteString == null) || (quoteString.length() != 1)) {
-            throw new 
HyracksDataException(ExceptionUtils.incorrectParameterMessage(
-                    ExternalDataConstants.KEY_QUOTE, 
ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
-        }
-        this.quote = quoteString.charAt(0);
-    }
-
-    @Override
-    public boolean hasNext() throws IOException {
-        while (true) {
-            if (done) {
-                return false;
-            }
-            newlineLength = 0;
-            prevCharCR = false;
-            prevCharEscape = false;
-            record.reset();
-            int readLength = 0;
-            inQuote = false;
-            do {
-                int startPosn = bufferPosn;
-                if (bufferPosn >= bufferLength) {
-                    startPosn = bufferPosn = 0;
-                    bufferLength = reader.read(inputBuffer);
-                    if (bufferLength <= 0) {
-                        {
-                            if (readLength > 0) {
-                                if (inQuote) {
-                                    throw new IOException("malformed input 
record ended inside quote");
-                                }
-                                record.endRecord();
-                                recordNumber++;
-                                return true;
-                            }
-                            close();
-                            return false;
-                        }
-                    }
-                }
-                for (; bufferPosn < bufferLength; ++bufferPosn) {
-                    if (!inQuote) {
-                        if (inputBuffer[bufferPosn] == 
ExternalDataConstants.LF) {
-                            newlineLength = (prevCharCR) ? 2 : 1;
-                            ++bufferPosn;
-                            break;
-                        }
-                        if (prevCharCR) {
-                            newlineLength = 1;
-                            break;
-                        }
-                        prevCharCR = (inputBuffer[bufferPosn] == 
ExternalDataConstants.CR);
-                        if (inputBuffer[bufferPosn] == quote) {
-                            if (!prevCharEscape) {
-                                inQuote = true;
-                            }
-                        }
-                        if (prevCharEscape) {
-                            prevCharEscape = false;
-                        } else {
-                            prevCharEscape = inputBuffer[bufferPosn] == 
ExternalDataConstants.ESCAPE;
-                        }
-                    } else {
-                        // only look for next quote
-                        if (inputBuffer[bufferPosn] == quote) {
-                            if (!prevCharEscape) {
-                                inQuote = false;
-                            }
-                        }
-                        prevCharEscape = inputBuffer[bufferPosn] == 
ExternalDataConstants.ESCAPE;
-                    }
-                }
-                readLength = bufferPosn - startPosn;
-                if (prevCharCR && newlineLength == 0) {
-                    --readLength;
-                }
-                if (readLength > 0) {
-                    record.append(inputBuffer, startPosn, readLength);
-                }
-            } while (newlineLength == 0);
-            if (nextIsHeader) {
-                nextIsHeader = false;
-                continue;
-            }
-            recordNumber++;
-            return true;
-        }
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 4d6d004..4002bf1 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -19,6 +19,10 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.external.api.AsterixInputStream;
@@ -33,10 +37,17 @@
     private char recordStart;
     private char recordEnd;
     private int recordNumber = 0;
+    public static final List<String> recordReaderFormats = 
Collections.unmodifiableList(Arrays.asList(
+            ExternalDataConstants.FORMAT_ADM,
+            ExternalDataConstants.FORMAT_JSON,
+            ExternalDataConstants.FORMAT_SEMISTRUCTURED
+    ));
 
-    public SemiStructuredRecordReader(AsterixInputStream stream, String 
recStartString, String recEndString)
+    public SemiStructuredRecordReader(AsterixInputStream stream, Map<String, 
String> config)
             throws HyracksDataException {
         super(stream);
+        String recStartString = 
config.get(ExternalDataConstants.KEY_RECORD_START);
+        String recEndString = config.get(ExternalDataConstants.KEY_RECORD_END);
         // set record opening char
         if (recStartString != null) {
             if (recStartString.length() != 1) {
@@ -151,6 +162,11 @@
     }
 
     @Override
+    public List<String> getRecordReaderFormats() {
+        return recordReaderFormats;
+    }
+
+    @Override
     public boolean stop() {
         try {
             reader.stop();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 7dc5bce..1ca66fc 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IRawRecord;
@@ -29,7 +30,6 @@
 import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class StreamRecordReader implements IRecordReader<char[]>, 
IStreamNotificationHandler {
@@ -40,7 +40,6 @@
     protected int bufferPosn = 0;
     protected boolean done = false;
     protected FeedLogManager feedLogManager;
-    protected MutableBoolean newFile = new MutableBoolean(false);
 
     public StreamRecordReader(AsterixInputStream inputStream) {
         this.reader = new AsterixInputStreamReader(inputStream);
@@ -95,4 +94,6 @@
     public void notifyNewSource() {
         throw new UnsupportedOperationException();
     }
+
+    public abstract List<String> getRecordReaderFormats();
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 4649559..ebc0bbe 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -18,13 +18,22 @@
  */
 package org.apache.asterix.external.input.record.reader.stream;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
+import 
org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
+import 
org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
+import 
org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
 import org.apache.asterix.external.provider.StreamRecordReaderProvider;
-import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -33,13 +42,14 @@
 public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> 
{
 
     private static final long serialVersionUID = 1L;
-    protected final IInputStreamFactory streamFactory;
+    protected IInputStreamFactory streamFactory;
     protected Map<String, String> configuration;
-    protected Format format;
-
-    public StreamRecordReaderFactory(IInputStreamFactory inputStreamFactory) {
-        this.streamFactory = inputStreamFactory;
-    }
+    protected Class recordReaderClazz;
+    private static final List<String> recordReaderNames = 
Collections.unmodifiableList(Arrays.asList(
+            ExternalDataConstants.ALIAS_LOCALFS_ADAPTER,
+            ExternalDataConstants.ALIAS_SOCKET_ADAPTER,
+            ExternalDataConstants.SOCKET,
+            ExternalDataConstants.STREAM_SOCKET_CLIENT));
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -57,17 +67,44 @@
         return streamFactory.getPartitionConstraint();
     }
 
+    private void configureInputStreamFactory(Map<String, String> config) 
throws AsterixException {
+        String reader = config.get(ExternalDataConstants.KEY_READER);
+        if (reader.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)) {
+            streamFactory = new LocalFSInputStreamFactory();
+        } else if (reader.equals(ExternalDataConstants.ALIAS_SOCKET_ADAPTER)
+                || reader.equals(ExternalDataConstants.SOCKET)) {
+            streamFactory = new SocketServerInputStreamFactory();
+        } else if (reader.equals(ExternalDataConstants.STREAM_SOCKET_CLIENT)) {
+            streamFactory = new SocketClientInputStreamFactory();
+        } else {
+            throw new AsterixException("UNKNOWN Adaptor name");
+        }
+    }
+
     @Override
     public void configure(Map<String, String> configuration) throws 
HyracksDataException, AlgebricksException {
         this.configuration = configuration;
+        configureInputStreamFactory(configuration);
         streamFactory.configure(configuration);
-        format = StreamRecordReaderProvider.getReaderFormat(configuration);
+        recordReaderClazz = 
StreamRecordReaderProvider.getRecordReaderClazz(configuration);
     }
 
     @Override
     public IRecordReader<? extends char[]> 
createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
-        return StreamRecordReaderProvider.createRecordReader(format, 
streamFactory.createInputStream(ctx, partition),
-                configuration);
+        StreamRecordReader recordReader;
+        try {
+            recordReader = (StreamRecordReader) recordReaderClazz
+                    .getDeclaredConstructor(AsterixInputStream.class, 
Map.class)
+                    .newInstance(streamFactory.createInputStream(ctx, 
partition), configuration);
+        } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException
+                | NoSuchMethodException e) {
+            throw new RuntimeException(e);
+        }
+        return recordReader;
+    }
+
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 4d8be98..7946dee 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.input.record.reader.twitter;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -48,20 +51,24 @@
     private Map<String, String> configuration;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
-    public static boolean isTwitterPull(Map<String, String> configuration) {
-        String reader = configuration.get(ExternalDataConstants.KEY_READER);
-        if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
-                || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
-            return true;
-        }
-        return false;
-    }
+    private static final List<String> recordReaderNames = 
Collections.unmodifiableList(Arrays.asList(
+            ExternalDataConstants.READER_TWITTER_PULL,
+            ExternalDataConstants.READER_TWITTER_PUSH,
+            ExternalDataConstants.READER_PUSH_TWITTER,
+            ExternalDataConstants.READER_PULL_TWITTER,
+            ExternalDataConstants.READER_USER_STREAM_TWITTER));
 
     @Override
     public DataSourceType getDataSourceType() {
         return DataSourceType.RECORDS;
     }
 
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() 
throws AlgebricksException {
         clusterLocations = 
IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 
INTAKE_CARDINALITY);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 1428863..5e3b328 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -18,8 +18,17 @@
  */
 package org.apache.asterix.external.provider;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -27,25 +36,23 @@
 import 
org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.HDFSDataSourceFactory;
-import 
org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory;
-import 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
-import 
org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
 import 
org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
-import 
org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
 import 
org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.commons.io.IOUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class DatasourceFactoryProvider {
+
+    private static final String RESOURCE = 
"META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory";
+    private static Map<String, Class> factories = null;
 
     private DatasourceFactoryProvider() {
     }
 
     public static IExternalDataSourceFactory 
getExternalDataSourceFactory(ILibraryManager libraryManager,
-            Map<String, String> configuration) throws HyracksDataException {
+            Map<String, String> configuration) throws HyracksDataException, 
AsterixException {
         if 
(ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS))
 {
             String reader = 
configuration.get(ExternalDataConstants.KEY_READER);
             return 
DatasourceFactoryProvider.getRecordReaderFactory(libraryManager, reader, 
configuration);
@@ -88,41 +95,61 @@
         return streamSourceFactory;
     }
 
-    public static IRecordReaderFactory<?> 
getRecordReaderFactory(ILibraryManager libraryManager, String reader,
-            Map<String, String> configuration) throws HyracksDataException {
-        if (reader.equals(ExternalDataConstants.EXTERNAL)) {
-            try {
-                return 
ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, 
configuration);
-            } catch (AlgebricksException e) {
-                // Not sure whether this is the right way to handle 
AlgebricksException  (xikui)
-                throw new HyracksDataException(e);
-            }
+    protected static IRecordReaderFactory getInstance(Class clazz) throws 
AsterixException {
+        try {
+            return (IRecordReaderFactory) clazz.newInstance();
+        } catch (InstantiationException | IllegalAccessException | 
ClassCastException e) {
+            throw new AsterixException("Cannot create: " + 
clazz.getSimpleName(), e);
         }
-        switch (reader) {
-            case ExternalDataConstants.READER_HDFS:
-                return new HDFSDataSourceFactory();
-            case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER:
-                return new StreamRecordReaderFactory(new 
LocalFSInputStreamFactory());
-            case ExternalDataConstants.READER_TWITTER_PULL:
-            case ExternalDataConstants.READER_TWITTER_PUSH:
-            case ExternalDataConstants.READER_PUSH_TWITTER:
-            case ExternalDataConstants.READER_PULL_TWITTER:
-            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
-                return new TwitterRecordReaderFactory();
-            case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
-            case ExternalDataConstants.SOCKET:
-                return new StreamRecordReaderFactory(new 
SocketServerInputStreamFactory());
-            case ExternalDataConstants.STREAM_SOCKET_CLIENT:
-                return new StreamRecordReaderFactory(new 
SocketClientInputStreamFactory());
-            case ExternalDataConstants.READER_RSS:
-                return new RSSRecordReaderFactory();
-            default:
-                try {
-                    return (IRecordReaderFactory<?>) 
Class.forName(reader).newInstance();
-                } catch (IllegalAccessException | ClassNotFoundException | 
InstantiationException
-                        | ClassCastException e) {
-                    throw new 
RuntimeDataException(ErrorCode.UNKNOWN_RECORD_READER_FACTORY, e,reader);
+    }
+
+    public static IRecordReaderFactory getRecordReaderFactory(ILibraryManager 
libraryManager, String adaptorName,
+            Map<String, String> configuration) throws HyracksDataException, 
AsterixException {
+        if (adaptorName.equals(ExternalDataConstants.EXTERNAL)) {
+            return 
ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, 
configuration);
+        }
+
+        if (factories == null) {
+            factories = initFactories();
+        }
+
+        if (factories.containsKey(adaptorName)) {
+            return getInstance(factories.get(adaptorName));
+        }
+
+        try {
+            return (IRecordReaderFactory) 
Class.forName(adaptorName).newInstance();
+        } catch (IllegalAccessException | ClassNotFoundException | 
InstantiationException | ClassCastException e) {
+            throw new 
RuntimeDataException(ErrorCode.UNKNOWN_RECORD_READER_FACTORY, e, adaptorName);
+        }
+    }
+
+    protected static Map<String, Class> initFactories() throws 
AsterixException {
+        Map<String, Class> factories = new HashMap<>();
+        ClassLoader cl = ParserFactoryProvider.class.getClassLoader();
+        final Charset encoding = Charset.forName("UTF-8");
+        try {
+            Enumeration<URL> urls = cl.getResources(RESOURCE);
+            for (URL url : Collections.list(urls)) {
+                InputStream is = url.openStream();
+                String config = IOUtils.toString(is, encoding);
+                is.close();
+                String[] classNames = config.split("\n");
+                for (String className : classNames) {
+                    final Class<?> clazz = Class.forName(className);
+                    List<String> formats = ((IRecordReaderFactory) 
clazz.newInstance()).getRecordReaderNames();
+                    for (String format : formats) {
+                        if (factories.containsKey(format)) {
+                            throw new 
AsterixException(ErrorCode.PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING,
+                                    format);
+                        }
+                        factories.put(format, clazz);
+                    }
                 }
+            }
+        } catch (IOException | ClassNotFoundException | InstantiationException 
| IllegalAccessException e) {
+            throw new AsterixException(e);
         }
+        return factories;
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
index 261a63c..1c07171 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
@@ -18,66 +18,80 @@
  */
 package org.apache.asterix.external.provider;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.AsterixInputStream;
-import 
org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReader;
-import org.apache.asterix.external.input.record.reader.stream.LineRecordReader;
-import 
org.apache.asterix.external.input.record.reader.stream.QuotedLineRecordReader;
-import 
org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
 import 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.commons.io.IOUtils;
 
 public class StreamRecordReaderProvider {
-    public enum Format {
-        SEMISTRUCTURED,
-        CSV,
-        LINE_SEPARATED
+
+    private static final String RESOURCE = 
"META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader";
+    private static final String READER_FORMAT_NAME = "recordReaderFormats";
+    private static Map<String, Class> recordReaders = null;
+
+    protected static StreamRecordReader getInstance(Class clazz) throws 
AsterixException {
+        try {
+            return (StreamRecordReader) clazz.newInstance();
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new AsterixException("Cannot create RecordReader: " + 
clazz.getSimpleName(), e);
+        }
     }
 
-    public static Format getReaderFormat(Map<String, String> configuration) 
throws AsterixException {
+    public static Class getRecordReaderClazz(Map<String, String> 
configuration) throws AsterixException {
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+
+        if (recordReaders == null) {
+            recordReaders = initRecordReaders();
+        }
+
         if (format != null) {
-            switch (format) {
-                case ExternalDataConstants.FORMAT_ADM:
-                case ExternalDataConstants.FORMAT_JSON:
-                case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
-                    return Format.SEMISTRUCTURED;
-                case ExternalDataConstants.FORMAT_LINE_SEPARATED:
-                    return Format.LINE_SEPARATED;
-                case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
-                case ExternalDataConstants.FORMAT_CSV:
-                    return Format.CSV;
+            if (recordReaders.containsKey(format)) {
+                return recordReaders.get(format);
             }
-            throw new AsterixException("Unknown format: " + format);
+            throw new 
AsterixException(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, 
format);
         }
         throw new AsterixException("Unspecified parameter: " + 
ExternalDataConstants.KEY_FORMAT);
     }
 
-    public static StreamRecordReader createRecordReader(Format format, 
AsterixInputStream inputStream,
-            Map<String, String> configuration) throws HyracksDataException {
-        switch (format) {
-            case CSV:
-                String quoteString = 
configuration.get(ExternalDataConstants.KEY_QUOTE);
-                boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
-                if (quoteString != null) {
-                    return new QuotedLineRecordReader(hasHeader, inputStream, 
quoteString);
-                } else {
-                    return new LineRecordReader(hasHeader, inputStream);
+    protected static Map<String, Class> initRecordReaders() throws 
AsterixException {
+        Map<String, Class> recordReaders = new HashMap<>();
+        ClassLoader cl = StreamRecordReaderProvider.class.getClassLoader();
+        final Charset encoding = Charset.forName("UTF-8");
+        try {
+            Enumeration<URL> urls = cl.getResources(RESOURCE);
+            for (URL url : Collections.list(urls)) {
+                InputStream is = url.openStream();
+                String config = IOUtils.toString(is, encoding);
+                is.close();
+                String[] classNames = config.split("\n");
+                for (String className : classNames) {
+                    final Class<?> clazz = Class.forName(className);
+                    List<String> formats = (List<String>) 
clazz.getField(READER_FORMAT_NAME).get(null);
+                    for (String format : formats) {
+                        if (recordReaders.containsKey(format)) {
+                            throw new 
AsterixException(ErrorCode.PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING,
+                                    format);
+                        }
+                        recordReaders.put(format, clazz);
+                    }
                 }
-            case LINE_SEPARATED:
-                return new EmptyLineSeparatedRecordReader(inputStream);
-            case SEMISTRUCTURED:
-                return new SemiStructuredRecordReader(inputStream,
-                        
configuration.get(ExternalDataConstants.KEY_RECORD_START),
-                        
configuration.get(ExternalDataConstants.KEY_RECORD_END));
-            default:
-                throw new 
RuntimeDataException(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, 
format);
+            }
+        } catch (IOException | ClassNotFoundException | IllegalAccessException 
| NoSuchFieldException e) {
+            throw new AsterixException(e);
         }
+        return recordReaders;
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
new file mode 100644
index 0000000..dac60b9
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -0,0 +1,4 @@
+org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory
+org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory
+org.apache.asterix.external.input.HDFSDataSourceFactory
+org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
diff --git 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader
 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader
new file mode 100644
index 0000000..ae1fbe2
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader
@@ -0,0 +1,3 @@
+org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReader
+org.apache.asterix.external.input.record.reader.stream.LineRecordReader
+org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader
\ No newline at end of file
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index f952a4d..b521e9e 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@ -24,6 +24,7 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -38,6 +39,7 @@
 import 
org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
 import org.apache.asterix.external.library.ClassAdParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FileSystemWatcher;
 import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -114,10 +116,13 @@
             ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
             for (String path : files) {
                 List<Path> paths = new ArrayList<>();
+                Map<String, String> config = new HashMap<>();
+                config.put(ExternalDataConstants.KEY_RECORD_START, "[");
+                config.put(ExternalDataConstants.KEY_RECORD_END, "]");
                 paths.add(Paths.get(getClass().getResource(path).toURI()));
                 FileSystemWatcher watcher = new FileSystemWatcher(paths, null, 
false);
                 LocalFSInputStream in = new LocalFSInputStream(watcher);
-                SemiStructuredRecordReader recordReader = new 
SemiStructuredRecordReader(in, "[", "]");
+                SemiStructuredRecordReader recordReader = new 
SemiStructuredRecordReader(in, config);
                 while (recordReader.hasNext()) {
                     tb.reset();
                     IRawRecord<char[]> record = recordReader.next();
@@ -149,10 +154,13 @@
             CharArrayLexerSource lexerSource = new CharArrayLexerSource();
             for (String path : files) {
                 List<Path> paths = new ArrayList<>();
+                Map<String, String> config = new HashMap<>();
+                config.put(ExternalDataConstants.KEY_RECORD_START, "[");
+                config.put(ExternalDataConstants.KEY_RECORD_END, "]");
                 paths.add(Paths.get(getClass().getResource(path).toURI()));
                 FileSystemWatcher watcher = new FileSystemWatcher(paths, null, 
false);
                 LocalFSInputStream in = new LocalFSInputStream(watcher);
-                SemiStructuredRecordReader recordReader = new 
SemiStructuredRecordReader(in, "[", "]");
+                SemiStructuredRecordReader recordReader = new 
SemiStructuredRecordReader(in, config);
                 try {
                     Value val = new Value(objectPool);
                     while (recordReader.hasNext()) {
@@ -187,10 +195,13 @@
             CharArrayLexerSource lexerSource = new CharArrayLexerSource();
             for (String path : files) {
                 List<Path> paths = new ArrayList<>();
+                Map<String, String> config = new HashMap<>();
+                config.put(ExternalDataConstants.KEY_RECORD_START, "[");
+                config.put(ExternalDataConstants.KEY_RECORD_END, "]");
                 paths.add(Paths.get(getClass().getResource(path).toURI()));
                 FileSystemWatcher watcher = new FileSystemWatcher(paths, null, 
false);
                 LocalFSInputStream in = new LocalFSInputStream(watcher);
-                SemiStructuredRecordReader recordReader = new 
SemiStructuredRecordReader(in, "[", "]");
+                SemiStructuredRecordReader recordReader = new 
SemiStructuredRecordReader(in, config);
                 try {
                     Value val = new Value(objectPool);
                     while (recordReader.hasNext()) {
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
index 3467411..28893b2 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.input.record.reader;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
@@ -32,6 +35,8 @@
 
     private static final long serialVersionUID = 1L;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+    private static final List<String> recordReaderNames = 
Collections.unmodifiableList(Arrays.asList());
+
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() 
throws AlgebricksException {
@@ -53,4 +58,8 @@
     public Class<?> getRecordClass() {
         return RecordWithPK.class;
     }
+
+    @Override public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index 98105b2..5089c82 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.input.record.reader.kv;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.external.api.IRecordReader;
@@ -39,6 +42,7 @@
     private int upsertCycle = 0;
     private int numOfReaders;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+    private static final List<String> recordReaderNames = 
Collections.unmodifiableList(Arrays.asList());
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
@@ -77,4 +81,8 @@
     public Class<?> getRecordClass() {
         return DCPRequest.class;
     }
+
+    @Override public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
index c238f1c..978a315 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
@@ -30,13 +30,16 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.input.record.CharArrayRecord;
 import 
org.apache.asterix.external.input.record.converter.DCPMessageToRecordConverter;
 import 
org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FileSystemWatcher;
 import org.junit.Assert;
 import org.junit.Test;
@@ -73,10 +76,13 @@
     public void testDecodingJsonRecords() throws URISyntaxException, 
IOException {
         String jsonFileName = "/record.json";
         List<Path> paths = new ArrayList<>();
+        Map<String, String> config = new HashMap<>();
+        config.put(ExternalDataConstants.KEY_RECORD_START, "{");
+        config.put(ExternalDataConstants.KEY_RECORD_END, "}");
         paths.add(Paths.get(getClass().getResource(jsonFileName).toURI()));
         FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
         LocalFSInputStream in = new LocalFSInputStream(watcher);
-        try (SemiStructuredRecordReader recordReader = new 
SemiStructuredRecordReader(in, "{", "}")) {
+        try (SemiStructuredRecordReader recordReader = new 
SemiStructuredRecordReader(in, config)) {
             while (recordReader.hasNext()) {
                 try {
                     IRawRecord<char[]> record = recordReader.next();
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index 2f5f341..422d814 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -24,11 +24,13 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.external.api.IRawRecord;
 import 
org.apache.asterix.external.input.record.converter.CSVToRecordWithMetadataAndPKConverter;
-import 
org.apache.asterix.external.input.record.reader.stream.QuotedLineRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.LineRecordReader;
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.external.parser.RecordWithMetadataParser;
@@ -85,8 +87,10 @@
             // create input stream
             LocalFSInputStream inputStream = new LocalFSInputStream(watcher);
             // create reader record reader
-            QuotedLineRecordReader lineReader = new 
QuotedLineRecordReader(true, inputStream,
-                    ExternalDataConstants.DEFAULT_QUOTE);
+            Map<String, String> config = new HashMap<>();
+            config.put(ExternalDataConstants.HAS_HEADER, "true");
+            config.put(ExternalDataConstants.KEY_QUOTE, 
ExternalDataConstants.DEFAULT_QUOTE);
+            LineRecordReader lineReader = new LineRecordReader(inputStream, 
config);
             // create csv with json record reader
             CSVToRecordWithMetadataAndPKConverter recordConverter = new 
CSVToRecordWithMetadataAndPKConverter(
                     valueIndex, delimiter, metaType, recordType, pkIndicators, 
pkIndexes, keyTypes);
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java
new file mode 100644
index 0000000..9844035
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.test;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StreamRecordReaderProviderTest {
+
+    @Test
+    public void Test() throws AsterixException{
+        List<String> recordReaderFormats = Arrays.asList(
+                ExternalDataConstants.FORMAT_LINE_SEPARATED,
+                ExternalDataConstants.FORMAT_ADM,
+                ExternalDataConstants.FORMAT_JSON,
+                ExternalDataConstants.FORMAT_SEMISTRUCTURED,
+                ExternalDataConstants.FORMAT_DELIMITED_TEXT,
+                ExternalDataConstants.FORMAT_CSV);
+        Map<String, String> config = new HashMap<>();
+        for (String format : recordReaderFormats) {
+            config.put(ExternalDataConstants.KEY_FORMAT, format);
+            Class clazz = 
StreamRecordReaderProvider.getRecordReaderClazz(config);
+            Assert.assertTrue(clazz != null);
+        }
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1535
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I699657ddd8408fd00bcbd7df57b6610ef3692a1a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xkk...@gmail.com>

Reply via email to