abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/376
Change subject: Introduced Local Filesystem Feed Adapter
......................................................................
Introduced Local Filesystem Feed Adapter
In this change, I have added filesystem based feed adapter. The adapter is
aliased push_localfs.
It works in the following way:
When it starts, it starts ingesting available data in directories passed in the
adapter
arguments. Once it is done with the existing files, it places a watch on the
directories passed.
It is push based and so will only stop using a disconnect feed statement.
Faulty records are
dropped and the feed will try to parse the next records.
Change-Id: I707756e3b4c9ffca4b55ec9817a08e5c16333010
---
A asterix-app/data/local-dir/data/even-more.txt
A asterix-app/data/local-dir/data/hello-world
A asterix-app/data/local-dir/data/ignored
A asterix-app/data/local-dir/data/more-records.txt
A asterix-app/data/local-dir/data/records.txt
A
asterix-app/src/test/resources/runtimets/queries/external/query1/query1.1.ddl.aql
A
asterix-app/src/test/resources/runtimets/queries/external/query1/query1.2.update.aql
A
asterix-app/src/test/resources/runtimets/queries/external/query1/query1.3.query.aql
A asterix-app/src/test/resources/runtimets/results/external/query1/query1.1.adm
M asterix-app/src/test/resources/runtimets/testsuite.xml
M
asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
A
asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LocalFileSystemFeedAdapterFactory.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
A
asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemWatcher.java
A
asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFileSystemFeedAdapter.java
A
asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFilesystemFeedClient.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
A
asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/NCLocalFilesystemDirecoryInputStream.java
M
asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
M
asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
M
asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
M
asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
M
asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
M
asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
26 files changed, 1,097 insertions(+), 115 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/76/376/1
diff --git a/asterix-app/data/local-dir/data/even-more.txt
b/asterix-app/data/local-dir/data/even-more.txt
new file mode 100644
index 0000000..2f3c6c9
--- /dev/null
+++ b/asterix-app/data/local-dir/data/even-more.txt
@@ -0,0 +1,3 @@
+45|Steve|50
+46|John|23
+47|Samuel|22
diff --git a/asterix-app/data/local-dir/data/hello-world
b/asterix-app/data/local-dir/data/hello-world
new file mode 100644
index 0000000..73e0e49
--- /dev/null
+++ b/asterix-app/data/local-dir/data/hello-world
@@ -0,0 +1,3 @@
+hellow world
+
+
diff --git a/asterix-app/data/local-dir/data/ignored
b/asterix-app/data/local-dir/data/ignored
new file mode 100644
index 0000000..0a77cda
--- /dev/null
+++ b/asterix-app/data/local-dir/data/ignored
@@ -0,0 +1 @@
+This file is expected to be ignored.
\ No newline at end of file
diff --git a/asterix-app/data/local-dir/data/more-records.txt
b/asterix-app/data/local-dir/data/more-records.txt
new file mode 100644
index 0000000..36fc93c
--- /dev/null
+++ b/asterix-app/data/local-dir/data/more-records.txt
@@ -0,0 +1,22 @@
+23|Steve|50
+24|John|23
+25|Samuel|22
+26|Mary|29
+27|William|75
+28|Sarah|16
+29|Noel|33
+30|Carlos|40
+31|Joseph|45
+32|David|22
+33|Nadine|10
+34|Steve|50
+35|John|23
+36|Samuel|22
+37|Mary|29
+38|William|75
+39|Sarah|16
+40|Noel|33
+41|Carlos|40
+42|Joseph|45
+43|David|22
+44|Nadine|10
diff --git a/asterix-app/data/local-dir/data/records.txt
b/asterix-app/data/local-dir/data/records.txt
new file mode 100644
index 0000000..538d7bf
--- /dev/null
+++ b/asterix-app/data/local-dir/data/records.txt
@@ -0,0 +1,22 @@
+1|Steve|50
+2|John|23
+3|Samuel|22
+4|Mary|29
+5|William|75
+6|Sarah|16
+7|Noel|33
+8|Carlos|40
+9|Joseph|45
+10|David|22
+11|Nadine|10
+12|Steve|50
+13|John|23
+14|Samuel|22
+15|Mary|29
+16|William|75
+17|Sarah|16
+18|Noel|33
+19|Carlos|40
+20|Joseph|45
+21|David|22
+22|Nadine|10
\ No newline at end of file
diff --git
a/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.1.ddl.aql
b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.1.ddl.aql
new file mode 100644
index 0000000..229af78
--- /dev/null
+++
b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.1.ddl.aql
@@ -0,0 +1,22 @@
+/*
+* Description : Create an external dataset
+ that contains records stored in a local directory.
+ Query the dataset and make sure all the records in
+ files which match the expression are returned
+* Expected Res : Success
+* Date : 2015/08/31
+*/
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type EmployeeType as closed {
+ id: int64,
+ name: string,
+ age: int64
+};
+
+create external dataset EmployeeDataset(EmployeeType)
+using localfs
+(("path"="nc1://data/local-dir/data"),("format"="delimited-text"),("delimiter"="|"),("expression"=".*\\.txt"));
\ No newline at end of file
diff --git
a/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.2.update.aql
b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.2.update.aql
new file mode 100644
index 0000000..58f5830
--- /dev/null
+++
b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.2.update.aql
@@ -0,0 +1,8 @@
+/*
+* Description : Create an external dataset
+ that contains records stored in a local directory.
+ Query the dataset and make sure all the records in
+ files which match the expression are returned
+* Expected Res : Success
+* Date : 2015/08/31
+*/
\ No newline at end of file
diff --git
a/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.3.query.aql
b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.3.query.aql
new file mode 100644
index 0000000..a89f8d3
--- /dev/null
+++
b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.3.query.aql
@@ -0,0 +1,14 @@
+/*
+* Description : Create an external dataset
+ that contains records stored in a local directory.
+ Query the dataset and make sure all the records in
+ files which match the expression are returned
+* Expected Res : Success
+* Date : 2015/08/31
+*/
+
+use dataverse test;
+
+for $x in dataset EmployeeDataset
+order by $x.id asc
+return $x;
\ No newline at end of file
diff --git
a/asterix-app/src/test/resources/runtimets/results/external/query1/query1.1.adm
b/asterix-app/src/test/resources/runtimets/results/external/query1/query1.1.adm
new file mode 100644
index 0000000..9a9d74c
--- /dev/null
+++
b/asterix-app/src/test/resources/runtimets/results/external/query1/query1.1.adm
@@ -0,0 +1,48 @@
+[ { "id": 1, "name": "Steve", "age": 50 }
+, { "id": 2, "name": "John", "age": 23 }
+, { "id": 3, "name": "Samuel", "age": 22 }
+, { "id": 4, "name": "Mary", "age": 29 }
+, { "id": 5, "name": "William", "age": 75 }
+, { "id": 6, "name": "Sarah", "age": 16 }
+, { "id": 7, "name": "Noel", "age": 33 }
+, { "id": 8, "name": "Carlos", "age": 40 }
+, { "id": 9, "name": "Joseph", "age": 45 }
+, { "id": 10, "name": "David", "age": 22 }
+, { "id": 11, "name": "Nadine", "age": 10 }
+, { "id": 12, "name": "Steve", "age": 50 }
+, { "id": 13, "name": "John", "age": 23 }
+, { "id": 14, "name": "Samuel", "age": 22 }
+, { "id": 15, "name": "Mary", "age": 29 }
+, { "id": 16, "name": "William", "age": 75 }
+, { "id": 17, "name": "Sarah", "age": 16 }
+, { "id": 18, "name": "Noel", "age": 33 }
+, { "id": 19, "name": "Carlos", "age": 40 }
+, { "id": 20, "name": "Joseph", "age": 45 }
+, { "id": 21, "name": "David", "age": 22 }
+, { "id": 22, "name": "Nadine", "age": 10 }
+, { "id": 23, "name": "Steve", "age": 50 }
+, { "id": 24, "name": "John", "age": 23 }
+, { "id": 25, "name": "Samuel", "age": 22 }
+, { "id": 26, "name": "Mary", "age": 29 }
+, { "id": 27, "name": "William", "age": 75 }
+, { "id": 28, "name": "Sarah", "age": 16 }
+, { "id": 29, "name": "Noel", "age": 33 }
+, { "id": 30, "name": "Carlos", "age": 40 }
+, { "id": 31, "name": "Joseph", "age": 45 }
+, { "id": 32, "name": "David", "age": 22 }
+, { "id": 33, "name": "Nadine", "age": 10 }
+, { "id": 34, "name": "Steve", "age": 50 }
+, { "id": 35, "name": "John", "age": 23 }
+, { "id": 36, "name": "Samuel", "age": 22 }
+, { "id": 37, "name": "Mary", "age": 29 }
+, { "id": 38, "name": "William", "age": 75 }
+, { "id": 39, "name": "Sarah", "age": 16 }
+, { "id": 40, "name": "Noel", "age": 33 }
+, { "id": 41, "name": "Carlos", "age": 40 }
+, { "id": 42, "name": "Joseph", "age": 45 }
+, { "id": 43, "name": "David", "age": 22 }
+, { "id": 44, "name": "Nadine", "age": 10 }
+, { "id": 45, "name": "Steve", "age": 50 }
+, { "id": 46, "name": "John", "age": 23 }
+, { "id": 47, "name": "Samuel", "age": 22 }
+ ]
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml
b/asterix-app/src/test/resources/runtimets/testsuite.xml
index feeeefd..5b79a95 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -22,6 +22,13 @@
ResultOffsetPath="results"
QueryOffsetPath="queries"
QueryFileExtension=".aql">
+ <test-group name="external">
+ <test-case FilePath="external">
+ <compilation-unit name="query1">
+ <output-dir compare="Text">query1</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
<test-group name="flwor">
<test-case FilePath="flwor">
<compilation-unit name="at00">
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
index bb94ce7..0df8f44 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
@@ -40,5 +40,7 @@
public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
public void close() throws HyracksDataException;
+
+ public void forceFlush() throws HyracksDataException;
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LocalFileSystemFeedAdapterFactory.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LocalFileSystemFeedAdapterFactory.java
new file mode 100644
index 0000000..4bcfbdf
--- /dev/null
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LocalFileSystemFeedAdapterFactory.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.adapter.factory;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
+import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
+import org.apache.asterix.external.dataset.adapter.LocalFileSystemFeedAdapter;
+import org.apache.asterix.external.util.INodeResolver;
+import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import
org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class LocalFileSystemFeedAdapterFactory extends
StreamBasedAdapterFactory implements IFeedAdapterFactory {
+
+ private static final long serialVersionUID = 1L;
+ private static final String NAME = "push_localfs";
+ private IAType sourceDatatype;
+ private FileSplit[] fileSplits;
+ private ARecordType outputType;
+ private String expression; // Expression to filter files with when the
user provides a directory
+
+ @Override
+ public SupportedOperation getSupportedOperations() {
+ return SupportedOperation.READ;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws
Exception {
+ return configurePartitionConstraint();
+ }
+
+ @Override
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int
partition) throws Exception {
+ return new LocalFileSystemFeedAdapter(configuration, fileSplits,
parserFactory, sourceDatatype, ctx,
+ expression);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType
outputType) throws Exception {
+ this.configuration = configuration;
+ this.outputType = outputType;
+ this.expression = (String)
configuration.get(AsterixTupleParserFactory.KEY_EXPRESSION);
+ String[] splits = ((String)
configuration.get(AsterixTupleParserFactory.KEY_PATH)).split(",");
+ IAType sourceDatatype = (IAType) outputType;
+ configureFileSplits(splits);
+ configureFormat(sourceDatatype);
+ }
+
+ private void configureFileSplits(String[] splits) throws AsterixException {
+ if (fileSplits == null) {
+ fileSplits = new FileSplit[splits.length];
+ String nodeName;
+ String nodeLocalPath;
+ int count = 0;
+ String trimmedValue;
+ for (String splitPath : splits) {
+ trimmedValue = splitPath.trim();
+ if (!trimmedValue.contains("://")) {
+ throw new AsterixException(
+ "Invalid path: " + splitPath + "\nUsage-
path=\"Host://Absolute File Path\"");
+ }
+ nodeName = trimmedValue.split(":")[0];
+ nodeLocalPath = trimmedValue.split("://")[1];
+ FileSplit fileSplit = new FileSplit(nodeName, new
FileReference(new File(nodeLocalPath)));
+ fileSplits[count++] = fileSplit;
+ }
+ }
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return outputType;
+ }
+
+ @Override
+ public boolean isRecordTrackingEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIntakeProgressTracker createIntakeProgressTracker() {
+ return null;
+ }
+
+ private AlgebricksPartitionConstraint configurePartitionConstraint()
throws AsterixException {
+ String[] locs = new String[fileSplits.length];
+ String location;
+ for (int i = 0; i < fileSplits.length; i++) {
+ location =
getNodeResolver().resolveNode(fileSplits[i].getNodeName());
+ locs[i] = location;
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locs);
+ }
+
+ protected INodeResolver getNodeResolver() {
+ if (nodeResolver == null) {
+ nodeResolver = NCFileSystemAdapterFactory.initializeNodeResolver();
+ }
+ return nodeResolver;
+ }
+
+ @Override
+ public InputDataFormat getInputDataFormat() {
+ return InputDataFormat.UNKNOWN;
+ }
+}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 31bd7ab..7d6d43a 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -15,7 +15,6 @@
package org.apache.asterix.external.adapter.factory;
import java.io.File;
-import java.util.List;
import java.util.Map;
import java.util.logging.Level;
@@ -25,7 +24,6 @@
import org.apache.asterix.external.util.DNSResolverFactory;
import org.apache.asterix.external.util.INodeResolver;
import org.apache.asterix.external.util.INodeResolverFactory;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.external.IAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
@@ -33,7 +31,6 @@
import
org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -53,19 +50,18 @@
private IAType sourceDatatype;
private FileSplit[] fileSplits;
private ARecordType outputType;
-
+ // Expression to filter files with when the user provides a directory
+ private String expression;
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int
partition) throws Exception {
- NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(fileSplits,
parserFactory, sourceDatatype, ctx);
- return fsAdapter;
+ return new NCFileSystemAdapter(fileSplits, parserFactory,
sourceDatatype, ctx, expression);
}
@Override
public String getName() {
return NC_FILE_SYSTEM_ADAPTER_NAME;
}
-
@Override
public SupportedOperation getSupportedOperations() {
@@ -76,11 +72,11 @@
public void configure(Map<String, String> configuration, ARecordType
outputType) throws Exception {
this.configuration = configuration;
this.outputType = outputType;
+ this.expression = (String)
configuration.get(AsterixTupleParserFactory.KEY_EXPRESSION);
String[] splits = ((String)
configuration.get(AsterixTupleParserFactory.KEY_PATH)).split(",");
IAType sourceDatatype = (IAType) outputType;
configureFileSplits(splits);
configureFormat(sourceDatatype);
-
}
@Override
@@ -126,9 +122,11 @@
return nodeResolver;
}
- private static INodeResolver initializeNodeResolver() {
+ // The method below should be moved to a util class since it is static and
can potentially be used by multiple other classes
+ public static INodeResolver initializeNodeResolver() {
INodeResolver nodeResolver = null;
- String configuredNodeResolverFactory =
System.getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY);
+ String configuredNodeResolverFactory = System
+
.getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY);
if (configuredNodeResolverFactory != null) {
try {
nodeResolver = ((INodeResolverFactory)
(Class.forName(configuredNodeResolverFactory).newInstance()))
@@ -146,19 +144,14 @@
}
return nodeResolver;
}
-
+
@Override
public ARecordType getAdapterOutputType() {
return outputType;
}
-
+
@Override
public InputDataFormat getInputDataFormat() {
return InputDataFormat.UNKNOWN;
}
-
- public void setFiles(List<ExternalFile> files) throws AlgebricksException {
- throw new AlgebricksException("can't set files for this Adapter");
- }
-
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
index 6abb0f0..ed82630 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
@@ -49,9 +49,9 @@
private FrameTupleAppender appender;
private IFrame frame;
- private long tupleCount = 0;
- private final IHyracksTaskContext ctx;
- private int frameTupleCount = 0;
+ protected long tupleCount = 0;
+ protected final IHyracksTaskContext ctx;
+ protected int frameTupleCount = 0;
protected FeedPolicyEnforcer policyEnforcer;
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
index c922f69..519180c 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
@@ -121,6 +121,7 @@
recordBuilder.write(dataOutput, true);
}
+ @SuppressWarnings("unchecked")
private void writeObject(IAObject obj, DataOutput dataOutput) throws
IOException, AsterixException {
switch (obj.getType().getTypeTag()) {
case RECORD: {
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemWatcher.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemWatcher.java
new file mode 100644
index 0000000..3aa4da1
--- /dev/null
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemWatcher.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchEvent.Kind;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class FileSystemWatcher extends InputStream {
+ private final WatchService watcher;
+ private final HashMap<WatchKey, Path> keys;
+ private LinkedList<File> files = new LinkedList<File>();
+ private Iterator<File> it;
+ private String expression;
+ private byte lastByte = '\n';
+ private static Logger LOGGER =
Logger.getLogger(FileSystemWatcher.class.getName());
+
+ // Initialize the inputStream to a dummy inputStream that returns
EndOfStream
+ private InputStream in = new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return -1;
+ }
+ };
+
+ public FileSystemWatcher(Path inputResource, String expression) throws
IOException {
+ this.expression = expression;
+ watcher = FileSystems.getDefault().newWatchService();
+ keys = new HashMap<WatchKey, Path>();
+ registerAll(inputResource);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ try {
+ in.close();
+ } finally {
+ in = null;
+ }
+ }
+ }
+
+ /**
+ * Closes the current input stream and opens the next one, if any.
+ */
+ private void advance() throws IOException {
+ close();
+ if (it.hasNext()) {
+ File file = it.next();
+ in = new FileInputStream(file);
+ } else {
+ // Empty the list of files to read
+ files.clear();
+ // Read new Events (Pulling first to add all available files)
+ WatchKey key;
+ key = watcher.poll();
+ while (key != null) {
+ handleEvents(key);
+ if (reachedEndOfStream(key)) {
+ return;
+ }
+ key = watcher.poll();
+ }
+ // No file was found, wait for the filesystem to push events
+ while (files.isEmpty()) {
+ try {
+ key = watcher.take();
+ } catch (InterruptedException x) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("Watcher interrupted Stacktrace:\n" +
x.getStackTrace());
+ }
+ in = null;
+ return;
+ } catch (ClosedWatchServiceException e) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("Watcher Service Exception. Stacktrace:\n"
+ e.getStackTrace());
+ }
+ in = null;
+ return;
+ }
+ handleEvents(key);
+ if (reachedEndOfStream(key)) {
+ return;
+ }
+ }
+ // files were found, re-create the iterator and move it one step
+ it = files.iterator();
+ in = new FileInputStream(it.next());
+ }
+ }
+
+ private void handleEvents(WatchKey key) {
+ // get dir associated with the key
+ Path dir = keys.get(key);
+ if (dir == null) {
+ // This should never happen
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("WatchKey not recognized!!");
+ }
+ return;
+ }
+
+ for (WatchEvent<?> event : key.pollEvents()) {
+ Kind<?> kind = event.kind();
+ // TODO: Do something about overflow events
+ // An overflow event means that some events were dropped
+ if (kind == StandardWatchEventKinds.OVERFLOW) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("Overflow event. Some events might have been
missed");
+ }
+ continue;
+ }
+
+ // Context for directory entry event is the file name of entry
+ WatchEvent<Path> ev = cast(event);
+ Path name = ev.context();
+ Path child = dir.resolve(name);
+ // if directory is created then register it and its sub-directories
+ if ((kind == StandardWatchEventKinds.ENTRY_CREATE)) {
+ try {
+ if (Files.isDirectory(child, LinkOption.NOFOLLOW_LINKS)) {
+ registerAll(child);
+ } else {
+ // it is a file, add it to the files list.
+ if (expression != null || Pattern.matches(expression,
child.toString())) {
+ files.add(new File(child.toString()));
+ }
+ }
+ } catch (IOException e) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn(e.getMessage() + ":" + e.getStackTrace());
+ }
+ }
+ }
+ }
+ }
+
+ private boolean reachedEndOfStream(WatchKey key) {
+ // reset key and remove from set if directory no longer accessible
+ if (!key.reset()) {
+ keys.remove(key);
+ if (keys.isEmpty()) {
+ // No more directories, we close the stream: this essentially
means the root directory monitored doesn't exist anymore
+ in = null;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (in == null) {
+ return 0;
+ }
+ return in.available();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = in.read();
+ if (result == -1) {
+ advance();
+ if (in != null) {
+ return '\n';
+ }
+ return read();
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (in == null) {
+ return -1;
+ }
+ int result = in.read(b, off, len);
+ if (result == 0) {
+ return 0;
+ } else if (result == -1) {
+ advance();
+ // return a new line at the end of every file <--Might create
problems for some cases depending on the parser implementation-->
+ if (in != null) {
+ if (lastByte != '\n' && lastByte != '\r') {
+ b[off] = '\n';
+ return 1;
+ }
+ }
+ return read(b, off, len);
+ }
+ lastByte = b[off + result - 1];
+ return result;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (in == null || n <= 0) {
+ return 0;
+ }
+ long result = in.skip(n);
+ if (result != 0) {
+ return result;
+ }
+ if (read() == -1) {
+ return 0;
+ }
+ return 1 + in.skip(n - 1);
+ }
+
+ /**
+ * Register the given directory, and all its sub-directories, with the
+ * WatchService.
+ */
+ private void registerAll(final Path inputResource) throws IOException {
+ // register directory and sub-directories
+ final LinkedList<Path> initialDirs = new LinkedList<Path>();
+ Files.walkFileTree(inputResource, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult preVisitDirectory(Path path,
BasicFileAttributes attrs) throws IOException {
+ if (!Files.exists(path, LinkOption.NOFOLLOW_LINKS)) {
+ LOGGER.warn("File doesn't exist: " + path);
+ return FileVisitResult.TERMINATE;
+ }
+ if (Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
+ initialDirs.add(path);
+ //get immediate children files
+ File[] subFiles = path.toFile().listFiles();
+ for (File file : subFiles) {
+ if (!file.isDirectory()) {
+ if (expression == null ||
Pattern.matches(expression, file.getPath())) {
+ files.add(file);
+ }
+ }
+ }
+ } else {
+ // Path is a file, add to list of files if it matches the
expression
+ if (expression == null || Pattern.matches(expression,
path.toString())) {
+ files.add(new File(path.toString()));
+ }
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ it = files.iterator();
+ for (Path path : initialDirs) {
+ register(path);
+ }
+ }
+
+ /**
+ * Register the given directory with the WatchService
+ */
+ private void register(Path dir) throws IOException {
+ WatchKey key = dir.register(watcher,
StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+ StandardWatchEventKinds.ENTRY_MODIFY);
+ keys.put(key, dir);
+ }
+
+ @SuppressWarnings("unchecked")
+ static <T> WatchEvent<T> cast(WatchEvent<?> event) {
+ return (WatchEvent<T>) event;
+ }
+}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFileSystemFeedAdapter.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFileSystemFeedAdapter.java
new file mode 100644
index 0000000..15ae3f6
--- /dev/null
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFileSystemFeedAdapter.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwardPolicy;
+import org.apache.asterix.external.dataset.adapter.IFeedClient.InflowState;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import
org.apache.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class LocalFileSystemFeedAdapter extends ClientBasedFeedAdapter {
+
+ private static final long serialVersionUID = 1L;
+ private static final int DEFAULT_BATCH_SIZE = 100;
+ private final FileSplit[] fileSplits;
+ private final String expression;
+ protected final ITupleParserFactory parserFactory;
+ protected final IAType sourceDatatype;
+ private IFrameWriter writer;
+ private LocalFilesystemFeedClient client;
+
+ public LocalFileSystemFeedAdapter(Map<String, String> configuration,
FileSplit[] fileSplits,
+ ITupleParserFactory parserFactory, IAType sourceDatatype,
IHyracksTaskContext ctx, String expression) {
+ super(configuration, ctx);
+ this.fileSplits = fileSplits;
+ this.expression = expression;
+ this.parserFactory = parserFactory;
+ this.sourceDatatype = sourceDatatype;
+ }
+
+ /**
+ * Discontinue the ingestion of data and end the feed.
+ *
+ * @throws Exception
+ */
+ public void stop() throws Exception {
+ // This needs to be fixed
+ continueIngestion = false;
+ }
+
+ @Override
+ public DataExchangeMode getDataExchangeMode() {
+ return DataExchangeMode.PUSH;
+ }
+
+ private void setup(int partition) throws Exception {
+ if (client == null) {
+ client = (LocalFilesystemFeedClient) getFeedClient(partition);
+ }
+ }
+
+ // This adapter needs better exception handling
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ this.writer = writer;
+ setup(partition);
+ InflowState inflowState = null;
+ while (continueIngestion) {
+ try {
+ inflowState = client.parseNext();
+ switch (inflowState) {
+ case DATA_AVAILABLE:
+ frameTupleCount++;
+ break;
+ case NO_MORE_DATA:
+ tupleCount += frameTupleCount;
+ frameTupleCount = 0;
+ continueIngestion = false;
+ break;
+ case DATA_NOT_AVAILABLE:
+ break;
+ }
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public IFeedClient getFeedClient(int partition) throws Exception {
+ FileSplit split = fileSplits[partition];
+ File inputResource = split.getLocalFile().getFile();
+ return new LocalFilesystemFeedClient(adapterOutputType,
inputResource.toPath(), parserFactory, ctx, writer,
+ expression);
+ }
+
+ @Override
+ public ITupleForwardPolicy getTupleParserPolicy() {
+ configuration.put(ITupleForwardPolicy.PARSER_POLICY,
+
ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
+ String propValue =
configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
+ if (propValue == null) {
+ configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" +
DEFAULT_BATCH_SIZE);
+ }
+ return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+ }
+
+ @Override
+ public boolean handleException(Exception e) {
+ return true;
+ }
+}
\ No newline at end of file
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFilesystemFeedClient.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFilesystemFeedClient.java
new file mode 100644
index 0000000..5216934
--- /dev/null
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFilesystemFeedClient.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.operators.file.AbstractTupleParser;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class LocalFilesystemFeedClient extends FeedClient {
+
+ private FileSystemWatcher watcher;
+ private AbstractTupleParser tupleParser;
+
+ public LocalFilesystemFeedClient(ARecordType recordType, Path
inputResource, ITupleParserFactory parserFactory,
+ IHyracksTaskContext ctx, IFrameWriter writer, String expression)
throws IOException, AsterixException {
+ watcher = new FileSystemWatcher(inputResource, expression);
+ tupleParser = (AbstractTupleParser)
parserFactory.createTupleParser(ctx);
+ tupleParser.setInputStream(watcher);
+ tupleParser.setWriter(writer);
+ }
+
+ public InflowState parseNext() throws Exception {
+ if (tupleParser.parseNext()) {
+ return InflowState.DATA_AVAILABLE;
+ } else {
+ tupleParser.end();
+ return InflowState.NO_MORE_DATA;
+ }
+ }
+
+ public void flushRecords() throws Exception {
+ tupleParser.getPolicy().forceFlush();
+ }
+
+ @Override
+ public InflowState retrieveNextRecord() throws Exception {
+ throw new AsterixException(
+ "FileSystemFeedParser can't retrieve the record since it is
tightly coupled with the Parser");
+ }
+}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 635fae7..4a6b435 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -16,12 +16,15 @@
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.LinkedList;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import
org.apache.asterix.external.indexing.input.NCLocalFilesystemDirecoryInputStream;
import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -37,27 +40,73 @@
private static final long serialVersionUID = 1L;
private final FileSplit[] fileSplits;
+ private final String expression;
public NCFileSystemAdapter(FileSplit[] fileSplits, ITupleParserFactory
parserFactory, IAType atype,
- IHyracksTaskContext ctx) throws HyracksDataException {
+ IHyracksTaskContext ctx, String expression) throws
HyracksDataException {
super(parserFactory, atype, ctx);
this.fileSplits = fileSplits;
+ this.expression = expression;
}
@Override
public InputStream getInputStream(int partition) throws IOException {
FileSplit split = fileSplits[partition];
- File inputFile = split.getLocalFile().getFile();
+ File inputResource = split.getLocalFile().getFile();
+ LinkedList<File> subFiles = new LinkedList<File>();
InputStream in;
try {
- in = new FileInputStream(inputFile);
+ // Check that the resource exists
+ if (Files.exists(inputResource.toPath())) {
+ // Check if the resource is a directory
+ if (Files.isDirectory(inputResource.toPath())) {
+ // Resource is a directory, we need to process files of
interests inside it
+ File[] files = inputResource.listFiles();
+ for (File file : files) {
+ if (file.isDirectory()) {
+ addSubFiles(subFiles, file);
+ } else {
+ if (expression == null ||
Pattern.matches(expression, file.getPath())) {
+ subFiles.add(file);
+ }
+ }
+ }
+ in = new NCLocalFilesystemDirecoryInputStream(subFiles);
+ } else {
+ // Resource is a file
+ in = new FileInputStream(inputResource);
+ }
+ } else {
+ throw new IOException("Resource doesn't exist");
+ }
return in;
- } catch (FileNotFoundException e) {
+ } catch (PatternSyntaxException e) {
+ throw new IOException("The regular expression provided is an
invalid expression", e);
+ } catch (Exception e) {
throw new IOException(e);
}
}
-
+ /* list all files under the directory
+ * currentDir is expected to be a directory
+ */
+ private void addSubFiles(LinkedList<File> files, File currentDir) throws
IOException {
+ try {
+ File[] subFiles = currentDir.listFiles();
+ for (File file : subFiles) {
+ if (file.isDirectory()) {
+ addSubFiles(files, file);
+ } else {
+ if (expression == null || Pattern.matches(expression,
file.getPath())) {
+ files.add(file);
+ }
+ }
+ }
+ } catch (PatternSyntaxException e) {
+ throw new IOException("The regular expression provided is an
invalid expression", e);
+ }
+ }
+
@Override
public String getFilename(int partition) {
final FileSplit fileSplit = fileSplits[partition];
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/NCLocalFilesystemDirecoryInputStream.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/NCLocalFilesystemDirecoryInputStream.java
new file mode 100644
index 0000000..8bf30b9
--- /dev/null
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/NCLocalFilesystemDirecoryInputStream.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.indexing.input;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+public class NCLocalFilesystemDirecoryInputStream extends InputStream {
+
+ private InputStream in;
+ private Iterator<File> it;
+ private byte lastByte;
+
+ public NCLocalFilesystemDirecoryInputStream(LinkedList<File> files) throws
IOException {
+ this.it = files.iterator();
+ advance();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ try {
+ in.close();
+ } finally {
+ in = null;
+ }
+ }
+ }
+
+ /**
+ * Closes the current input stream and opens the next one, if any.
+ */
+ private void advance() throws IOException {
+ close();
+ if (it.hasNext()) {
+ File file = it.next();
+ in = new FileInputStream(file);
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (in == null) {
+ return 0;
+ }
+ return in.available();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (in == null) {
+ return -1;
+ }
+ int result = in.read();
+ if (result == -1) {
+ advance();
+ // fix here as well
+ if (in != null) {
+ if (lastByte != '\n' && lastByte != '\r') {
+ return '\n';
+ }
+ }
+ return read();
+ }
+ lastByte = (byte) result;
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (in == null) {
+ return -1;
+ }
+ int result = in.read(b, off, len);
+ if (result == 0) {
+ return 0;
+ } else if (result == -1) {
+ advance();
+ // return a new line if the previous file didn't end with a new
line.
+ if (in != null) {
+ if (lastByte != '\n' && lastByte != '\r') {
+ b[off] = '\n';
+ return 1;
+ } else {
+ return read(b, off, len);
+ }
+ }
+ return read(b, off, len);
+ }
+ lastByte = b[off + result - 1];
+ return result;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (in == null || n <= 0) {
+ return 0;
+ }
+ long result = in.skip(n);
+ if (result != 0) {
+ return result;
+ }
+ if (read() == -1) {
+ return 0;
+ }
+ return 1 + in.skip(n - 1);
+ }
+}
\ No newline at end of file
diff --git
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 53c53fe..c5f5177 100644
---
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -228,8 +228,8 @@
public static void insertInitialDataverses(MetadataTransactionContext
mdTxnCtx) throws Exception {
String dataverseName =
MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new
Dataverse(dataverseName, dataFormat,
- IMetadataEntity.PENDING_NO_OP));
+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
+ new Dataverse(dataverseName, dataFormat,
IMetadataEntity.PENDING_NO_OP));
}
public static void insertInitialDatasets(MetadataTransactionContext
mdTxnCtx) throws Exception {
@@ -266,8 +266,8 @@
getBuiltinTypes(types);
getMetadataTypes(types);
for (int i = 0; i < types.size(); i++) {
- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new
Datatype(dataverseName, types.get(i).getTypeName(),
- types.get(i), false));
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
+ new Datatype(dataverseName, types.get(i).getTypeName(),
types.get(i), false));
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Finished inserting initial datatypes.");
@@ -276,10 +276,11 @@
public static void insertInitialIndexes(MetadataTransactionContext
mdTxnCtx) throws Exception {
for (int i = 0; i < secondaryIndexes.length; i++) {
- MetadataManager.INSTANCE.addIndex(mdTxnCtx, new
Index(secondaryIndexes[i].getDataverseName(),
- secondaryIndexes[i].getIndexedDatasetName(),
secondaryIndexes[i].getIndexName(), IndexType.BTREE,
- secondaryIndexes[i].getPartitioningExpr(),
secondaryIndexes[i].getPartitioningExprType(), false,
- false, IMetadataEntity.PENDING_NO_OP));
+ MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+ new Index(secondaryIndexes[i].getDataverseName(),
secondaryIndexes[i].getIndexedDatasetName(),
+ secondaryIndexes[i].getIndexName(),
IndexType.BTREE,
+ secondaryIndexes[i].getPartitioningExpr(),
secondaryIndexes[i].getPartitioningExprType(),
+ false, false, IMetadataEntity.PENDING_NO_OP));
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Finished inserting initial indexes.");
@@ -324,6 +325,7 @@
"org.apache.asterix.external.adapter.factory.PushBasedTwitterAdapterFactory",
"org.apache.asterix.external.adapter.factory.RSSFeedAdapterFactory",
"org.apache.asterix.external.adapter.factory.CNNFeedAdapterFactory",
+
"org.apache.asterix.external.adapter.factory.LocalFileSystemFeedAdapterFactory",
"org.apache.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory",
"org.apache.asterix.tools.external.data.TwitterFirehoseFeedAdapterFactory",
"org.apache.asterix.tools.external.data.GenericSocketFeedAdapterFactory",
@@ -379,31 +381,25 @@
+ IndexFileNameUtil.prepareFileName(metadataStore +
File.separator + index.getFileNameRelativePath(),
runtimeContext.getMetaDataIODeviceId());
FileReference file = new FileReference(new File(filePath));
- List<IVirtualBufferCache> virtualBufferCaches =
runtimeContext.getVirtualBufferCaches(index.getDatasetId()
- .getId());
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContext
+ .getVirtualBufferCaches(index.getDatasetId().getId());
ITypeTraits[] typeTraits = index.getTypeTraits();
IBinaryComparatorFactory[] comparatorFactories =
index.getKeyBinaryComparatorFactory();
int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
LSMBTree lsmBtree = null;
long resourceID = -1;
- ILSMOperationTracker opTracker = index.isPrimaryIndex() ?
runtimeContext.getLSMBTreeOperationTracker(index
- .getDatasetId().getId()) : new
BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
- index.getDatasetId().getId(), ((DatasetLifecycleManager)
indexLifecycleManager).getDatasetInfo(index
- .getDatasetId().getId()));
+ ILSMOperationTracker opTracker = index.isPrimaryIndex()
+ ?
runtimeContext.getLSMBTreeOperationTracker(index.getDatasetId().getId())
+ : new BaseOperationTracker((DatasetLifecycleManager)
indexLifecycleManager,
+ index.getDatasetId().getId(),
+ ((DatasetLifecycleManager)
indexLifecycleManager).getDatasetInfo(index.getDatasetId().getId()));
final String path = file.getFile().getPath();
if (create) {
- lsmBtree = LSMBTreeUtils.createLSMTree(
- virtualBufferCaches,
- file,
- bufferCache,
- fileMapProvider,
- typeTraits,
- comparatorFactories,
- bloomFilterKeyFields,
- runtimeContext.getBloomFilterFalsePositiveRate(),
+ lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file,
bufferCache, fileMapProvider, typeTraits,
+ comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(),
runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
- GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES,
indexLifecycleManager), opTracker,
- runtimeContext.getLSMIOScheduler(),
+ GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES,
indexLifecycleManager),
+ opTracker, runtimeContext.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
index.isPrimaryIndex(),
null, null, null, null, true);
lsmBtree.create();
@@ -422,19 +418,14 @@
resourceID = resource.getResourceId();
lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
if (lsmBtree == null) {
- lsmBtree = LSMBTreeUtils.createLSMTree(
- virtualBufferCaches,
- file,
- bufferCache,
- fileMapProvider,
- typeTraits,
- comparatorFactories,
- bloomFilterKeyFields,
+ lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches,
file, bufferCache, fileMapProvider,
+ typeTraits, comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(),
runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
-
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager),
opTracker,
- runtimeContext.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE
- .createIOOperationCallback(),
index.isPrimaryIndex(), null, null, null, null, true);
+
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager),
+ opTracker, runtimeContext.getLSMIOScheduler(),
+
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
index.isPrimaryIndex(),
+ null, null, null, null, true);
indexLifecycleManager.register(resourceID, lsmBtree);
}
}
diff --git
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
index c16e0e1..3931703 100644
---
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
+++
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
@@ -41,6 +41,9 @@
protected DataOutput dos = tb.getDataOutput();
protected final ARecordType recType;
protected final IHyracksTaskContext ctx;
+ protected IFrameWriter writer;
+ protected IDataParser parser;
+ protected ITupleForwardPolicy policy;
public AbstractTupleParser(IHyracksTaskContext ctx, ARecordType recType)
throws HyracksDataException {
this.recType = recType;
@@ -74,4 +77,32 @@
}
}
+ public void setInputStream(InputStream in) throws AsterixException,
IOException {
+ parser = getDataParser();
+ parser.initialize(in, recType, true);
+ }
+
+ public void setWriter(IFrameWriter writer) throws HyracksDataException {
+ this.writer = writer;
+ policy = getTupleParserPolicy();
+ policy.initialize(ctx, writer);
+ }
+
+ public boolean parseNext() throws AsterixException, IOException {
+ tb.reset();
+ if (!parser.parse(tb.getDataOutput())) {
+ return false;
+ }
+ tb.addFieldEndOffset();
+ policy.addTuple(tb);
+ return true;
+ }
+
+ public void end() throws HyracksDataException {
+ policy.close();
+ }
+
+ public ITupleForwardPolicy getPolicy() {
+ return policy;
+ }
}
diff --git
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
index 4374ab6..74c4769 100644
---
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
+++
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
@@ -65,6 +65,7 @@
public static final String AT_LEAST_ONE_SEMANTICS =
FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS;
public static final String NODE_RESOLVER_FACTORY_PROPERTY =
"node.Resolver";
public static final String DEFAULT_DELIMITER = ",";
+ public static final Object KEY_EXPRESSION = "expression";
private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap =
initializeValueParserFactoryMap();
diff --git
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
index af6d35e..cc9a08c 100644
---
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
+++
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
@@ -36,7 +36,7 @@
public static final String BATCH_INTERVAL = "batch-interval";
private static final Logger LOGGER =
Logger.getLogger(CounterTimerTupleForwardPolicy.class.getName());
-
+
private FrameTupleAppender appender;
private IFrame frame;
private IFrameWriter writer;
@@ -87,7 +87,8 @@
}
private void addTupleToFrame(ArrayTupleBuilder tb) throws
HyracksDataException {
- if (tuplesInFrame == batchSize ||
!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ if (tuplesInFrame == batchSize
+ || !appender.append(tb.getFieldEndOffsets(),
tb.getByteArray(), 0, tb.getSize())) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("flushing frame containg (" + tuplesInFrame + ")
tuples");
}
@@ -101,16 +102,7 @@
}
public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- if (activeTimer) {
- synchronized (lock) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
- } else {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
- }
-
+ forceFlush();
if (timer != null) {
timer.cancel();
}
@@ -129,11 +121,11 @@
@Override
public void run() {
try {
- if (tuplesInFrame > 0) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("TTL expired flushing frame (" +
tuplesInFrame + ")");
- }
- synchronized (lock) {
+ synchronized (lock) {
+ if (tuplesInFrame > 0) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TTL expired flushing frame (" +
tuplesInFrame + ")");
+ }
FrameUtils.flushFrame(frame.getBuffer(), writer);
appender.reset(frame, true);
tuplesInFrame = 0;
@@ -151,4 +143,23 @@
return TupleForwardPolicyType.COUNTER_TIMER_EXPIRED;
}
+ @Override
+ public void forceFlush() throws HyracksDataException {
+ if (activeTimer) {
+ synchronized (lock) {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ tuplesInFrame = 0;
+ appender.reset(frame, true);
+ }
+ }
+ } else {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ tuplesInFrame = 0;
+ appender.reset(frame, true);
+ }
+ }
+ }
+
}
\ No newline at end of file
diff --git
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
index 7c43e26..079537d 100644
---
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
+++
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
@@ -28,45 +28,48 @@
public class FrameFullTupleForwardPolicy implements ITupleForwardPolicy {
- private FrameTupleAppender appender;
- private IFrame frame;
- private IFrameWriter writer;
+ private FrameTupleAppender appender;
+ private IFrame frame;
+ private IFrameWriter writer;
- public void configure(Map<String, String> configuration) {
- // no-op
- }
+ public void configure(Map<String, String> configuration) {
+ // no-op
+ }
- public void initialize(IHyracksTaskContext ctx, IFrameWriter writer)
- throws HyracksDataException {
- this.appender = new FrameTupleAppender();
- this.frame = new VSizeFrame(ctx);
- this.writer = writer;
- appender.reset(frame, true);
- }
+ public void initialize(IHyracksTaskContext ctx, IFrameWriter writer)
throws HyracksDataException {
+ this.appender = new FrameTupleAppender();
+ this.frame = new VSizeFrame(ctx);
+ this.writer = writer;
+ appender.reset(frame, true);
+ }
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
- boolean success = appender.append(tb.getFieldEndOffsets(),
- tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- appender.reset(frame, true);
- success = appender.append(tb.getFieldEndOffsets(),
- tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- throw new IllegalStateException();
- }
- }
- }
+ public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+ boolean success = appender.append(tb.getFieldEndOffsets(),
tb.getByteArray(), 0, tb.getSize());
+ if (!success) {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ appender.reset(frame, true);
+ success = appender.append(tb.getFieldEndOffsets(),
tb.getByteArray(), 0, tb.getSize());
+ if (!success) {
+ throw new IllegalStateException();
+ }
+ }
+ }
- public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ }
- }
+ }
- @Override
- public TupleForwardPolicyType getType() {
- return TupleForwardPolicyType.FRAME_FULL;
- }
+ @Override
+ public TupleForwardPolicyType getType() {
+ return TupleForwardPolicyType.FRAME_FULL;
+ }
+
+ @Override
+ public void forceFlush() throws HyracksDataException {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ appender.reset(frame, true);
+ }
}
\ No newline at end of file
diff --git
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
index 95e6b22..463e0cd 100644
---
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
+++
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
@@ -81,4 +81,10 @@
public TupleForwardPolicyType getType() {
return TupleForwardPolicyType.RATE_CONTROLLED;
}
+
+ @Override
+ public void forceFlush() throws HyracksDataException {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ appender.reset(frame, true);
+ }
}
\ No newline at end of file
--
To view, visit https://asterix-gerrit.ics.uci.edu/376
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I707756e3b4c9ffca4b55ec9817a08e5c16333010
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>