Xikui Wang has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1535
Change subject: WIP - Feed Adaptor and RecordReader Clean ...................................................................... WIP - Feed Adaptor and RecordReader Clean 1. Cleaned DatasourceFactoryProvider with service interface 2. Cleaned StreamRecordReaderProvider with serivce interface 3. Combined LineRecordReader with QuotedLineRecordReader to fit the StreamRecordReader interface. 4. Delayed the Inputstream binding at the configuration phase. 5. Added one simple test case for StreamRecordReaderProvider. Change-Id: I699657ddd8408fd00bcbd7df57b6610ef3692a1a --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties M asterixdb/asterix-external-data/pom.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java A asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory A asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java A asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java 23 files changed, 473 insertions(+), 249 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/35/1535/1 diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 83ff3a2..5365bd8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -158,6 +158,8 @@ public static final int UTIL_FILE_SYSTEM_WATCHER_NO_FILES_FOUND = 3076; public static final int UTIL_LOCAL_FILE_SYSTEM_UTILS_PATH_NOT_FOUND = 3077; public static final int UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER = 3078; + public static final int READER_FACTORY_RECORD_READER_NOT_FOUND = 3079; + public static final int PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING = 3080; // Loads the map that maps error codes to error message templates. private static Map<Integer, String> errorMessageMap = null; diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 3e96972..81dd289 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -143,3 +143,4 @@ 3076 = %1$s: no files found 3077 = %1$s: path not found 3078 = Cannot obtain hdfs scheduler +3080 = Duplicate record reader format diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index d1f67f0..a1c7012 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -404,10 +404,5 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>2.5</version> - </dependency> </dependencies> </project> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java index c6adbc4..a8c5d31 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java @@ -21,6 +21,8 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import java.util.List; + public interface IRecordReaderFactory<T> extends IExternalDataSourceFactory { public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition) @@ -32,4 +34,7 @@ public default DataSourceType getDataSourceType() { return DataSourceType.RECORDS; } + + public List<String> getRecordReaderNames(); + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java index 2b899d9..aa5d8e1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -37,7 +38,6 @@ import org.apache.asterix.external.input.stream.HDFSInputStream; import org.apache.asterix.external.provider.ExternalIndexerProvider; import org.apache.asterix.external.provider.StreamRecordReaderProvider; -import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.HDFSUtils; @@ -73,7 +73,8 @@ private JobConf conf; private InputSplit[] inputSplits; private String nodeName; - private Format format; + private Class recordReaderClazz; + private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList("hdfs")); @Override public void configure(Map<String, String> configuration) throws AsterixException { @@ -105,7 +106,7 @@ this.recordClass = reader.createValue().getClass(); reader.close(); } else { - format = StreamRecordReaderProvider.getReaderFormat(configuration); + recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration); this.recordClass = char[].class; } } catch (IOException e) { @@ -193,9 +194,10 @@ throws HyracksDataException { try { IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration); - if (format != null) { - StreamRecordReader streamReader = StreamRecordReaderProvider.createRecordReader(format, - createInputStream(ctx, partition, indexer), configuration); + if (recordReaderClazz != null) { + StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz + .getDeclaredConstructor(AsterixInputStream.class, Map.class) + .newInstance(createInputStream(ctx, partition, indexer), configuration); if (indexer != null) { return new IndexingStreamRecordReader(streamReader, indexer); } else { @@ -226,4 +228,9 @@ public boolean isIndexingOp() { return ((files != null) && indexingOp); } + + @Override + public List<String> getRecordReaderNames() { + return recordReaderNames; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java index 2ded3fb..ee59eac 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java @@ -20,6 +20,8 @@ import java.net.MalformedURLException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -39,6 +41,7 @@ private static final long serialVersionUID = 1L; private final List<String> urls = new ArrayList<String>(); private transient AlgebricksAbsolutePartitionConstraint clusterLocations; + private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList("rss_feed")); @Override public DataSourceType getDataSourceType() { @@ -70,6 +73,11 @@ } @Override + public List<String> getRecordReaderNames() { + return recordReaderNames; + } + + @Override public boolean isIndexible() { return false; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java index aa0451a..3c17db9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java @@ -19,13 +19,21 @@ package org.apache.asterix.external.input.record.reader.stream; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.util.ExternalDataConstants; public class EmptyLineSeparatedRecordReader extends StreamRecordReader { - public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream) { + + public static final List<String> recordReaderFormats = Collections + .unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_LINE_SEPARATED)); + + public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream, Map<String, String> config) { super(inputStream); } @@ -100,6 +108,11 @@ return true; } + @Override + public List<String> getRecordReaderFormats() { + return recordReaderFormats; + } + private boolean skipWhiteSpace() throws IOException { // start by skipping white spaces while (true) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java index 59b72e4..c29c0e7 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java @@ -19,9 +19,15 @@ package org.apache.asterix.external.input.record.reader.stream; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.asterix.common.exceptions.ExceptionUtils; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; public class LineRecordReader extends StreamRecordReader { @@ -31,12 +37,33 @@ protected int newlineLength; protected int recordNumber = 0; protected boolean nextIsHeader = false; + private final boolean quotedReader; + private final char quote; + private boolean inQuote; + private boolean prevCharEscape; + public static final List<String> recordReaderFormats = Collections.unmodifiableList(Arrays.asList( + ExternalDataConstants.FORMAT_DELIMITED_TEXT, + ExternalDataConstants.FORMAT_CSV + )); - public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream) throws HyracksDataException { - super(stream); + public LineRecordReader(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException { + super(inputStream); + String quoteString = config.get(ExternalDataConstants.KEY_QUOTE); + if (quoteString != null) { + quotedReader = true; + if (quoteString.length() != 1) { + throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE, + ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString)); + } + quote = quoteString.charAt(0); + } else { + quotedReader = false; + quote = '\n'; + } + boolean hasHeader = ExternalDataUtils.hasHeader(config); this.hasHeader = hasHeader; if (hasHeader) { - stream.setNotificationHandler(this); + inputStream.setNotificationHandler(this); } } @@ -48,7 +75,100 @@ } @Override + public List<String> getRecordReaderFormats() { + return recordReaderFormats; + } + + @Override public boolean hasNext() throws IOException { + if (quotedReader) { + return quotedLineRecordReaderHasNext(); + } else { + return lineRecordReaderHasNext(); + } + } + + public boolean quotedLineRecordReaderHasNext() throws IOException { + while (true) { + if (done) { + return false; + } + newlineLength = 0; + prevCharCR = false; + prevCharEscape = false; + record.reset(); + int readLength = 0; + inQuote = false; + do { + int startPosn = bufferPosn; + if (bufferPosn >= bufferLength) { + startPosn = bufferPosn = 0; + bufferLength = reader.read(inputBuffer); + if (bufferLength <= 0) { + { + if (readLength > 0) { + if (inQuote) { + throw new IOException("malformed input record ended inside quote"); + } + record.endRecord(); + recordNumber++; + return true; + } + close(); + return false; + } + } + } + for (; bufferPosn < bufferLength; ++bufferPosn) { + if (!inQuote) { + if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) { + newlineLength = (prevCharCR) ? 2 : 1; + ++bufferPosn; + break; + } + if (prevCharCR) { + newlineLength = 1; + break; + } + prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR); + if (inputBuffer[bufferPosn] == quote) { + if (!prevCharEscape) { + inQuote = true; + } + } + if (prevCharEscape) { + prevCharEscape = false; + } else { + prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE; + } + } else { + // only look for next quote + if (inputBuffer[bufferPosn] == quote) { + if (!prevCharEscape) { + inQuote = false; + } + } + prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE; + } + } + readLength = bufferPosn - startPosn; + if (prevCharCR && newlineLength == 0) { + --readLength; + } + if (readLength > 0) { + record.append(inputBuffer, startPosn, readLength); + } + } while (newlineLength == 0); + if (nextIsHeader) { + nextIsHeader = false; + continue; + } + recordNumber++; + return true; + } + } + + public boolean lineRecordReaderHasNext() throws IOException { while (true) { if (done) { return false; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java deleted file mode 100644 index 006bf0b..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.stream; - -import java.io.IOException; - -import org.apache.asterix.common.exceptions.ExceptionUtils; -import org.apache.asterix.external.api.AsterixInputStream; -import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public class QuotedLineRecordReader extends LineRecordReader { - - private final char quote; - private boolean prevCharEscape; - private boolean inQuote; - - public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream, final String quoteString) - throws HyracksDataException { - super(hasHeader, stream); - if ((quoteString == null) || (quoteString.length() != 1)) { - throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage( - ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString)); - } - this.quote = quoteString.charAt(0); - } - - @Override - public boolean hasNext() throws IOException { - while (true) { - if (done) { - return false; - } - newlineLength = 0; - prevCharCR = false; - prevCharEscape = false; - record.reset(); - int readLength = 0; - inQuote = false; - do { - int startPosn = bufferPosn; - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - bufferLength = reader.read(inputBuffer); - if (bufferLength <= 0) { - { - if (readLength > 0) { - if (inQuote) { - throw new IOException("malformed input record ended inside quote"); - } - record.endRecord(); - recordNumber++; - return true; - } - close(); - return false; - } - } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { - if (!inQuote) { - if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) { - newlineLength = (prevCharCR) ? 2 : 1; - ++bufferPosn; - break; - } - if (prevCharCR) { - newlineLength = 1; - break; - } - prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR); - if (inputBuffer[bufferPosn] == quote) { - if (!prevCharEscape) { - inQuote = true; - } - } - if (prevCharEscape) { - prevCharEscape = false; - } else { - prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE; - } - } else { - // only look for next quote - if (inputBuffer[bufferPosn] == quote) { - if (!prevCharEscape) { - inQuote = false; - } - } - prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE; - } - } - readLength = bufferPosn - startPosn; - if (prevCharCR && newlineLength == 0) { - --readLength; - } - if (readLength > 0) { - record.append(inputBuffer, startPosn, readLength); - } - } while (newlineLength == 0); - if (nextIsHeader) { - nextIsHeader = false; - continue; - } - recordNumber++; - return true; - } - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java index 4d6d004..4002bf1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java @@ -19,6 +19,10 @@ package org.apache.asterix.external.input.record.reader.stream; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; import org.apache.asterix.common.exceptions.ExceptionUtils; import org.apache.asterix.external.api.AsterixInputStream; @@ -33,10 +37,17 @@ private char recordStart; private char recordEnd; private int recordNumber = 0; + public static final List<String> recordReaderFormats = Collections.unmodifiableList(Arrays.asList( + ExternalDataConstants.FORMAT_ADM, + ExternalDataConstants.FORMAT_JSON, + ExternalDataConstants.FORMAT_SEMISTRUCTURED + )); - public SemiStructuredRecordReader(AsterixInputStream stream, String recStartString, String recEndString) + public SemiStructuredRecordReader(AsterixInputStream stream, Map<String, String> config) throws HyracksDataException { super(stream); + String recStartString = config.get(ExternalDataConstants.KEY_RECORD_START); + String recEndString = config.get(ExternalDataConstants.KEY_RECORD_END); // set record opening char if (recStartString != null) { if (recStartString.length() != 1) { @@ -151,6 +162,11 @@ } @Override + public List<String> getRecordReaderFormats() { + return recordReaderFormats; + } + + @Override public boolean stop() { try { reader.stop(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java index 7dc5bce..1ca66fc 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.input.record.reader.stream; import java.io.IOException; +import java.util.List; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IRawRecord; @@ -29,7 +30,6 @@ import org.apache.asterix.external.input.stream.AsterixInputStreamReader; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; -import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.hyracks.api.exceptions.HyracksDataException; public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler { @@ -40,7 +40,6 @@ protected int bufferPosn = 0; protected boolean done = false; protected FeedLogManager feedLogManager; - protected MutableBoolean newFile = new MutableBoolean(false); public StreamRecordReader(AsterixInputStream inputStream) { this.reader = new AsterixInputStreamReader(inputStream); @@ -95,4 +94,6 @@ public void notifyNewSource() { throw new UnsupportedOperationException(); } + + public abstract List<String> getRecordReaderFormats(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java index 4649559..ebc0bbe 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java @@ -18,13 +18,22 @@ */ package org.apache.asterix.external.input.record.reader.stream; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; +import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory; +import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory; +import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory; import org.apache.asterix.external.provider.StreamRecordReaderProvider; -import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -33,13 +42,14 @@ public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> { private static final long serialVersionUID = 1L; - protected final IInputStreamFactory streamFactory; + protected IInputStreamFactory streamFactory; protected Map<String, String> configuration; - protected Format format; - - public StreamRecordReaderFactory(IInputStreamFactory inputStreamFactory) { - this.streamFactory = inputStreamFactory; - } + protected Class recordReaderClazz; + private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList( + ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, + ExternalDataConstants.ALIAS_SOCKET_ADAPTER, + ExternalDataConstants.SOCKET, + ExternalDataConstants.STREAM_SOCKET_CLIENT)); @Override public DataSourceType getDataSourceType() { @@ -57,17 +67,44 @@ return streamFactory.getPartitionConstraint(); } + private void configureInputStreamFactory(Map<String, String> config) throws AsterixException { + String reader = config.get(ExternalDataConstants.KEY_READER); + if (reader.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)) { + streamFactory = new LocalFSInputStreamFactory(); + } else if (reader.equals(ExternalDataConstants.ALIAS_SOCKET_ADAPTER) + || reader.equals(ExternalDataConstants.SOCKET)) { + streamFactory = new SocketServerInputStreamFactory(); + } else if (reader.equals(ExternalDataConstants.STREAM_SOCKET_CLIENT)) { + streamFactory = new SocketClientInputStreamFactory(); + } else { + throw new AsterixException("UNKNOWN Adaptor name"); + } + } + @Override public void configure(Map<String, String> configuration) throws HyracksDataException, AlgebricksException { this.configuration = configuration; + configureInputStreamFactory(configuration); streamFactory.configure(configuration); - format = StreamRecordReaderProvider.getReaderFormat(configuration); + recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration); } @Override public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException { - return StreamRecordReaderProvider.createRecordReader(format, streamFactory.createInputStream(ctx, partition), - configuration); + StreamRecordReader recordReader; + try { + recordReader = (StreamRecordReader) recordReaderClazz + .getDeclaredConstructor(AsterixInputStream.class, Map.class) + .newInstance(streamFactory.createInputStream(ctx, partition), configuration); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException + | NoSuchMethodException e) { + throw new RuntimeException(e); + } + return recordReader; + } + + public List<String> getRecordReaderNames() { + return recordReaderNames; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java index 4d8be98..7946dee 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.external.input.record.reader.twitter; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -48,20 +51,24 @@ private Map<String, String> configuration; private transient AlgebricksAbsolutePartitionConstraint clusterLocations; - public static boolean isTwitterPull(Map<String, String> configuration) { - String reader = configuration.get(ExternalDataConstants.KEY_READER); - if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL) - || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) { - return true; - } - return false; - } + private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList( + ExternalDataConstants.READER_TWITTER_PULL, + ExternalDataConstants.READER_TWITTER_PUSH, + ExternalDataConstants.READER_PUSH_TWITTER, + ExternalDataConstants.READER_PULL_TWITTER, + ExternalDataConstants.READER_USER_STREAM_TWITTER)); @Override public DataSourceType getDataSourceType() { return DataSourceType.RECORDS; } + + @Override + public List<String> getRecordReaderNames() { + return recordReaderNames; + } + @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java index 1428863..5e3b328 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java @@ -18,8 +18,17 @@ */ package org.apache.asterix.external.provider; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.library.ILibraryManager; @@ -27,25 +36,23 @@ import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType; import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.api.IRecordReaderFactory; -import org.apache.asterix.external.input.HDFSDataSourceFactory; -import org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory; -import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory; -import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory; import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory; -import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory; import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.commons.io.IOUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; public class DatasourceFactoryProvider { + + private static final String RESOURCE = "META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory"; + private static Map<String, Class> factories = null; private DatasourceFactoryProvider() { } public static IExternalDataSourceFactory getExternalDataSourceFactory(ILibraryManager libraryManager, - Map<String, String> configuration) throws HyracksDataException { + Map<String, String> configuration) throws HyracksDataException, AsterixException { if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) { String reader = configuration.get(ExternalDataConstants.KEY_READER); return DatasourceFactoryProvider.getRecordReaderFactory(libraryManager, reader, configuration); @@ -88,41 +95,61 @@ return streamSourceFactory; } - public static IRecordReaderFactory<?> getRecordReaderFactory(ILibraryManager libraryManager, String reader, - Map<String, String> configuration) throws HyracksDataException { - if (reader.equals(ExternalDataConstants.EXTERNAL)) { - try { - return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration); - } catch (AlgebricksException e) { - // Not sure whether this is the right way to handle AlgebricksException (xikui) - throw new HyracksDataException(e); - } + protected static IRecordReaderFactory getInstance(Class clazz) throws AsterixException { + try { + return (IRecordReaderFactory) clazz.newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassCastException e) { + throw new AsterixException("Cannot create: " + clazz.getSimpleName(), e); } - switch (reader) { - case ExternalDataConstants.READER_HDFS: - return new HDFSDataSourceFactory(); - case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER: - return new StreamRecordReaderFactory(new LocalFSInputStreamFactory()); - case ExternalDataConstants.READER_TWITTER_PULL: - case ExternalDataConstants.READER_TWITTER_PUSH: - case ExternalDataConstants.READER_PUSH_TWITTER: - case ExternalDataConstants.READER_PULL_TWITTER: - case ExternalDataConstants.READER_USER_STREAM_TWITTER: - return new TwitterRecordReaderFactory(); - case ExternalDataConstants.ALIAS_SOCKET_ADAPTER: - case ExternalDataConstants.SOCKET: - return new StreamRecordReaderFactory(new SocketServerInputStreamFactory()); - case ExternalDataConstants.STREAM_SOCKET_CLIENT: - return new StreamRecordReaderFactory(new SocketClientInputStreamFactory()); - case ExternalDataConstants.READER_RSS: - return new RSSRecordReaderFactory(); - default: - try { - return (IRecordReaderFactory<?>) Class.forName(reader).newInstance(); - } catch (IllegalAccessException | ClassNotFoundException | InstantiationException - | ClassCastException e) { - throw new RuntimeDataException(ErrorCode.UNKNOWN_RECORD_READER_FACTORY, e,reader); + } + + public static IRecordReaderFactory getRecordReaderFactory(ILibraryManager libraryManager, String adaptorName, + Map<String, String> configuration) throws HyracksDataException, AsterixException { + if (adaptorName.equals(ExternalDataConstants.EXTERNAL)) { + return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration); + } + + if (factories == null) { + factories = initFactories(); + } + + if (factories.containsKey(adaptorName)) { + return getInstance(factories.get(adaptorName)); + } + + try { + return (IRecordReaderFactory) Class.forName(adaptorName).newInstance(); + } catch (IllegalAccessException | ClassNotFoundException | InstantiationException | ClassCastException e) { + throw new RuntimeDataException(ErrorCode.UNKNOWN_RECORD_READER_FACTORY, e, adaptorName); + } + } + + protected static Map<String, Class> initFactories() throws AsterixException { + Map<String, Class> factories = new HashMap<>(); + ClassLoader cl = ParserFactoryProvider.class.getClassLoader(); + final Charset encoding = Charset.forName("UTF-8"); + try { + Enumeration<URL> urls = cl.getResources(RESOURCE); + for (URL url : Collections.list(urls)) { + InputStream is = url.openStream(); + String config = IOUtils.toString(is, encoding); + is.close(); + String[] classNames = config.split("\n"); + for (String className : classNames) { + final Class<?> clazz = Class.forName(className); + List<String> formats = ((IRecordReaderFactory) clazz.newInstance()).getRecordReaderNames(); + for (String format : formats) { + if (factories.containsKey(format)) { + throw new AsterixException(ErrorCode.PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING, + format); + } + factories.put(format, clazz); + } } + } + } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new AsterixException(e); } + return factories; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java index 261a63c..1c07171 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java @@ -18,66 +18,80 @@ */ package org.apache.asterix.external.provider; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Constructor; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.api.AsterixInputStream; -import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReader; -import org.apache.asterix.external.input.record.reader.stream.LineRecordReader; -import org.apache.asterix.external.input.record.reader.stream.QuotedLineRecordReader; -import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader; import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.ExternalDataUtils; -import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.commons.io.IOUtils; public class StreamRecordReaderProvider { - public enum Format { - SEMISTRUCTURED, - CSV, - LINE_SEPARATED + + private static final String RESOURCE = "META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader"; + private static final String READER_FORMAT_NAME = "recordReaderFormats"; + private static Map<String, Class> recordReaders = null; + + protected static StreamRecordReader getInstance(Class clazz) throws AsterixException { + try { + return (StreamRecordReader) clazz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new AsterixException("Cannot create RecordReader: " + clazz.getSimpleName(), e); + } } - public static Format getReaderFormat(Map<String, String> configuration) throws AsterixException { + public static Class getRecordReaderClazz(Map<String, String> configuration) throws AsterixException { String format = configuration.get(ExternalDataConstants.KEY_FORMAT); + + if (recordReaders == null) { + recordReaders = initRecordReaders(); + } + if (format != null) { - switch (format) { - case ExternalDataConstants.FORMAT_ADM: - case ExternalDataConstants.FORMAT_JSON: - case ExternalDataConstants.FORMAT_SEMISTRUCTURED: - return Format.SEMISTRUCTURED; - case ExternalDataConstants.FORMAT_LINE_SEPARATED: - return Format.LINE_SEPARATED; - case ExternalDataConstants.FORMAT_DELIMITED_TEXT: - case ExternalDataConstants.FORMAT_CSV: - return Format.CSV; + if (recordReaders.containsKey(format)) { + return recordReaders.get(format); } - throw new AsterixException("Unknown format: " + format); + throw new AsterixException(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, format); } throw new AsterixException("Unspecified parameter: " + ExternalDataConstants.KEY_FORMAT); } - public static StreamRecordReader createRecordReader(Format format, AsterixInputStream inputStream, - Map<String, String> configuration) throws HyracksDataException { - switch (format) { - case CSV: - String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE); - boolean hasHeader = ExternalDataUtils.hasHeader(configuration); - if (quoteString != null) { - return new QuotedLineRecordReader(hasHeader, inputStream, quoteString); - } else { - return new LineRecordReader(hasHeader, inputStream); + protected static Map<String, Class> initRecordReaders() throws AsterixException { + Map<String, Class> recordReaders = new HashMap<>(); + ClassLoader cl = StreamRecordReaderProvider.class.getClassLoader(); + final Charset encoding = Charset.forName("UTF-8"); + try { + Enumeration<URL> urls = cl.getResources(RESOURCE); + for (URL url : Collections.list(urls)) { + InputStream is = url.openStream(); + String config = IOUtils.toString(is, encoding); + is.close(); + String[] classNames = config.split("\n"); + for (String className : classNames) { + final Class<?> clazz = Class.forName(className); + List<String> formats = (List<String>) clazz.getField(READER_FORMAT_NAME).get(null); + for (String format : formats) { + if (recordReaders.containsKey(format)) { + throw new AsterixException(ErrorCode.PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING, + format); + } + recordReaders.put(format, clazz); + } } - case LINE_SEPARATED: - return new EmptyLineSeparatedRecordReader(inputStream); - case SEMISTRUCTURED: - return new SemiStructuredRecordReader(inputStream, - configuration.get(ExternalDataConstants.KEY_RECORD_START), - configuration.get(ExternalDataConstants.KEY_RECORD_END)); - default: - throw new RuntimeDataException(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, format); + } + } catch (IOException | ClassNotFoundException | IllegalAccessException | NoSuchFieldException e) { + throw new AsterixException(e); } + return recordReaders; } } diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory new file mode 100644 index 0000000..dac60b9 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory @@ -0,0 +1,4 @@ +org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory +org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory +org.apache.asterix.external.input.HDFSDataSourceFactory +org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader new file mode 100644 index 0000000..ae1fbe2 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader @@ -0,0 +1,3 @@ +org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReader +org.apache.asterix.external.input.record.reader.stream.LineRecordReader +org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java index f952a4d..b521e9e 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java @@ -24,6 +24,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -38,6 +39,7 @@ import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader; import org.apache.asterix.external.input.stream.LocalFSInputStream; import org.apache.asterix.external.library.ClassAdParser; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FileSystemWatcher; import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; @@ -114,10 +116,13 @@ ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields); for (String path : files) { List<Path> paths = new ArrayList<>(); + Map<String, String> config = new HashMap<>(); + config.put(ExternalDataConstants.KEY_RECORD_START, "["); + config.put(ExternalDataConstants.KEY_RECORD_END, "]"); paths.add(Paths.get(getClass().getResource(path).toURI())); FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false); LocalFSInputStream in = new LocalFSInputStream(watcher); - SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]"); + SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, config); while (recordReader.hasNext()) { tb.reset(); IRawRecord<char[]> record = recordReader.next(); @@ -149,10 +154,13 @@ CharArrayLexerSource lexerSource = new CharArrayLexerSource(); for (String path : files) { List<Path> paths = new ArrayList<>(); + Map<String, String> config = new HashMap<>(); + config.put(ExternalDataConstants.KEY_RECORD_START, "["); + config.put(ExternalDataConstants.KEY_RECORD_END, "]"); paths.add(Paths.get(getClass().getResource(path).toURI())); FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false); LocalFSInputStream in = new LocalFSInputStream(watcher); - SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]"); + SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, config); try { Value val = new Value(objectPool); while (recordReader.hasNext()) { @@ -187,10 +195,13 @@ CharArrayLexerSource lexerSource = new CharArrayLexerSource(); for (String path : files) { List<Path> paths = new ArrayList<>(); + Map<String, String> config = new HashMap<>(); + config.put(ExternalDataConstants.KEY_RECORD_START, "["); + config.put(ExternalDataConstants.KEY_RECORD_END, "]"); paths.add(Paths.get(getClass().getResource(path).toURI())); FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false); LocalFSInputStream in = new LocalFSInputStream(watcher); - SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]"); + SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, config); try { Value val = new Value(objectPool); while (recordReader.hasNext()) { diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java index 3467411..28893b2 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.external.input.record.reader; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; import org.apache.asterix.external.api.IExternalDataSourceFactory; @@ -32,6 +35,8 @@ private static final long serialVersionUID = 1L; private transient AlgebricksAbsolutePartitionConstraint clusterLocations; + private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList()); + @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { @@ -53,4 +58,8 @@ public Class<?> getRecordClass() { return RecordWithPK.class; } + + @Override public List<String> getRecordReaderNames() { + return recordReaderNames; + } } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java index 98105b2..5089c82 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.external.input.record.reader.kv; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; import org.apache.asterix.external.api.IRecordReader; @@ -39,6 +42,7 @@ private int upsertCycle = 0; private int numOfReaders; private transient AlgebricksAbsolutePartitionConstraint clusterLocations; + private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList()); @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { @@ -77,4 +81,8 @@ public Class<?> getRecordClass() { return DCPRequest.class; } + + @Override public List<String> getRecordReaderNames() { + return recordReaderNames; + } } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java index c238f1c..978a315 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java @@ -30,13 +30,16 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.input.record.CharArrayRecord; import org.apache.asterix.external.input.record.converter.DCPMessageToRecordConverter; import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader; import org.apache.asterix.external.input.stream.LocalFSInputStream; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FileSystemWatcher; import org.junit.Assert; import org.junit.Test; @@ -73,10 +76,13 @@ public void testDecodingJsonRecords() throws URISyntaxException, IOException { String jsonFileName = "/record.json"; List<Path> paths = new ArrayList<>(); + Map<String, String> config = new HashMap<>(); + config.put(ExternalDataConstants.KEY_RECORD_START, "{"); + config.put(ExternalDataConstants.KEY_RECORD_END, "}"); paths.add(Paths.get(getClass().getResource(jsonFileName).toURI())); FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false); LocalFSInputStream in = new LocalFSInputStream(watcher); - try (SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "{", "}")) { + try (SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, config)) { while (recordReader.hasNext()) { try { IRawRecord<char[]> record = recordReader.next(); diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java index 2f5f341..422d814 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java @@ -24,11 +24,13 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.input.record.converter.CSVToRecordWithMetadataAndPKConverter; -import org.apache.asterix.external.input.record.reader.stream.QuotedLineRecordReader; +import org.apache.asterix.external.input.record.reader.stream.LineRecordReader; import org.apache.asterix.external.input.stream.LocalFSInputStream; import org.apache.asterix.external.parser.ADMDataParser; import org.apache.asterix.external.parser.RecordWithMetadataParser; @@ -85,8 +87,10 @@ // create input stream LocalFSInputStream inputStream = new LocalFSInputStream(watcher); // create reader record reader - QuotedLineRecordReader lineReader = new QuotedLineRecordReader(true, inputStream, - ExternalDataConstants.DEFAULT_QUOTE); + Map<String, String> config = new HashMap<>(); + config.put(ExternalDataConstants.HAS_HEADER, "true"); + config.put(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE); + LineRecordReader lineReader = new LineRecordReader(inputStream, config); // create csv with json record reader CSVToRecordWithMetadataAndPKConverter recordConverter = new CSVToRecordWithMetadataAndPKConverter( valueIndex, delimiter, metaType, recordType, pkIndicators, pkIndexes, keyTypes); diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java new file mode 100644 index 0000000..9844035 --- /dev/null +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.parser.test; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.provider.StreamRecordReaderProvider; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class StreamRecordReaderProviderTest { + + @Test + public void Test() throws AsterixException{ + List<String> recordReaderFormats = Arrays.asList( + ExternalDataConstants.FORMAT_LINE_SEPARATED, + ExternalDataConstants.FORMAT_ADM, + ExternalDataConstants.FORMAT_JSON, + ExternalDataConstants.FORMAT_SEMISTRUCTURED, + ExternalDataConstants.FORMAT_DELIMITED_TEXT, + ExternalDataConstants.FORMAT_CSV); + Map<String, String> config = new HashMap<>(); + for (String format : recordReaderFormats) { + config.put(ExternalDataConstants.KEY_FORMAT, format); + Class clazz = StreamRecordReaderProvider.getRecordReaderClazz(config); + Assert.assertTrue(clazz != null); + } + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1535 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I699657ddd8408fd00bcbd7df57b6610ef3692a1a Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Xikui Wang <xkk...@gmail.com>