abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/621
Change subject: Support Change Feeds
......................................................................
Support Change Feeds
This change allows feeds to perform upserts and deletes
in order to perform replication of an external data source.
The change does so by performing the following:
1. The adapter produces [PK][Record]. (Record == null --> delete)
2. The insert is replaced by an upsert operator.
Change-Id: If136a03d424970132dfb09f0dda56e160d4c0078
---
M
asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
M
asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
M
asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
M
asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M
asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
M asterix-app/src/test/resources/runtimets/testsuite.xml
M
asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
A
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M
asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
M
asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
M asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
M
asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
M
asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
24 files changed, 353 insertions(+), 146 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/21/621/1
diff --git
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
index f8df183..b9ed260 100644
---
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
@@ -90,15 +90,24 @@
if (insertDeleteUpsertOperator.getOperation() ==
Kind.UPSERT) {
//we need to add a function that checks if previous
record was found
upsertVar = context.newVar();
- //introduce casting to enforced type
- AbstractFunctionCallExpression isNullFunc = new
ScalarFunctionCallExpression(
+ AbstractFunctionCallExpression orFunc = new
ScalarFunctionCallExpression(
+
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OR));
+
+ AbstractFunctionCallExpression isNewNullFunc = new
ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_NULL));
- // The first argument is the record
- isNullFunc.getArguments().add(new
MutableObject<ILogicalExpression>(
+
isNewNullFunc.getArguments().add(insertDeleteUpsertOperator.getPayloadExpression());
+
+ AbstractFunctionCallExpression isPrevNullFunc = new
ScalarFunctionCallExpression(
+
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_NULL));
+ // argument is the previous record
+ isPrevNullFunc.getArguments().add(new
MutableObject<ILogicalExpression>(
new
VariableReferenceExpression(insertDeleteUpsertOperator.getPrevRecordVar())));
+
+ orFunc.getArguments().add(new
MutableObject<ILogicalExpression>(isPrevNullFunc));
+ orFunc.getArguments().add(new
MutableObject<ILogicalExpression>(isNewNullFunc));
+
// AssignOperator puts in the cast var the casted
record
- upsertFlagAssign = new AssignOperator(upsertVar,
- new
MutableObject<ILogicalExpression>(isNullFunc));
+ upsertFlagAssign = new AssignOperator(upsertVar, new
MutableObject<ILogicalExpression>(orFunc));
// Connect the current top of the plan to the cast
operator
upsertFlagAssign.getInputs()
.add(new
MutableObject<ILogicalOperator>(sinkOperator.getInputs().get(0).getValue()));
diff --git
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 1945be3..2f6226b 100644
---
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -63,7 +63,8 @@
public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef,
IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef,
IOptimizationContext context)
+ throws AlgebricksException {
return false;
}
@@ -107,8 +108,8 @@
String datasetName = datasetReference.second;
Dataset dataset = metadataProvider.findDataset(dataverseName,
datasetName);
if (dataset == null) {
- throw new AlgebricksException("Could not find dataset " +
datasetName + " in dataverse "
- + dataverseName);
+ throw new AlgebricksException(
+ "Could not find dataset " + datasetName + " in
dataverse " + dataverseName);
}
AqlSourceId asid = new AqlSourceId(dataverseName, datasetName);
@@ -173,8 +174,8 @@
scanInpList.addAll(unnest.getInputs());
opRef.setValue(scan);
addPrimaryKey(v, context);
+ scan.setKeyVars(unnest.getAdditionalVariables(),
unnest.getAdditionalVariableTypes());
context.computeAndSetTypeEnvironmentForOperator(scan);
-
return true;
}
@@ -195,9 +196,8 @@
private AqlDataSource createFeedDataSource(AqlSourceId aqlId, String
targetDataset, String sourceFeedName,
String subscriptionLocation, AqlMetadataProvider metadataProvider,
FeedPolicyEntity feedPolicy,
String outputType, String locations) throws AlgebricksException {
- if (!aqlId.getDataverseName().equals(
- metadataProvider.getDefaultDataverse() == null ? null :
metadataProvider.getDefaultDataverse()
- .getDataverseName())) {
+ if
(!aqlId.getDataverseName().equals(metadataProvider.getDefaultDataverse() ==
null ? null
+ : metadataProvider.getDefaultDataverse().getDataverseName())) {
return null;
}
IAType feedOutputType =
metadataProvider.findType(aqlId.getDataverseName(), outputType);
diff --git
a/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
b/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index 5dc9f18..edacbf4 100644
---
a/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++
b/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -23,10 +23,13 @@
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement.Kind;
import org.apache.asterix.lang.common.expression.VariableExpr;
import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -398,11 +401,14 @@
private final FeedConnectionRequest request;
private Query query;
private final int varCounter;
+ private SubscribeFeedStatement stmt;
- public CompiledSubscribeFeedStatement(FeedConnectionRequest request,
Query query, int varCounter) {
+ public CompiledSubscribeFeedStatement(FeedConnectionRequest request,
Query query, int varCounter,
+ SubscribeFeedStatement stmt) {
this.request = request;
this.query = query;
this.varCounter = varCounter;
+ this.stmt = stmt;
}
@Override
@@ -432,6 +438,10 @@
return Kind.SUBSCRIBE_FEED;
}
+ public boolean isChangeFeed(MetadataTransactionContext mdTxnCtx)
throws MetadataException, AlgebricksException {
+ return stmt.isChangeFeed(mdTxnCtx);
+ }
+
}
public static class CompiledDisconnectFeedStatement implements
ICompiledDmlStatement {
diff --git
a/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
b/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index a1a6d4b..ec9230d 100644
---
a/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++
b/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -85,6 +85,7 @@
import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.runtime.formats.FormatUtils;
import
org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
+import
org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -312,6 +313,7 @@
for (List<String> keyFieldName : partitionKeys) {
prepareVarAndExpression(keyFieldName, resVar, vars, exprs,
varRefsForLoading);
}
+ AssignOperator assign = new AssignOperator(vars, exprs);
List<String> additionalFilteringField =
DatasetUtils.getFilterField(targetDatasource.getDataset());
List<LogicalVariable> additionalFilteringVars = null;
@@ -328,11 +330,6 @@
additionalFilteringAssign = new
AssignOperator(additionalFilteringVars,
additionalFilteringAssignExpressions);
- }
-
- AssignOperator assign = new AssignOperator(vars, exprs);
-
- if (additionalFilteringAssign != null) {
additionalFilteringAssign.getInputs().add(new
MutableObject<ILogicalOperator>(project));
assign.getInputs().add(new
MutableObject<ILogicalOperator>(additionalFilteringAssign));
} else {
@@ -389,11 +386,41 @@
break;
}
case SUBSCRIBE_FEED: {
- ILogicalOperator insertOp = new
InsertDeleteUpsertOperator(targetDatasource, varRef,
- varRefsForLoading,
InsertDeleteUpsertOperator.Kind.INSERT, false);
- insertOp.getInputs().add(new
MutableObject<ILogicalOperator>(assign));
+ // if the feed is a change feed (i.e, performs different
operations), we need to project op variable
+ CompiledSubscribeFeedStatement sfs =
(CompiledSubscribeFeedStatement) stmt;
+ InsertDeleteUpsertOperator feedModificationOp;
+ if
(sfs.isChangeFeed(metadataProvider.getMetadataTxnContext())) {
+ ARecordType recordType = (ARecordType)
targetDatasource.getItemType();
+ // Get the UnnestOperator
+ UnnestOperator unnest = (UnnestOperator)
assignCollectionToSequence.getInputs().get(0)
+ .getValue();
+ // add key variables
+ unnest.setAdditionalVariables(new ArrayList<>());
+
unnest.getAdditionalVariables().addAll(assign.getVariables());
+ List<IAType> types = new
ArrayList<IAType>(partitionKeys.size());
+ recordType.getFieldTypes(partitionKeys, types);
+ unnest.setAdditionalVariableTypes(types);
+
project.getVariables().addAll(unnest.getAdditionalVariables());
+ // Create an expression for the operation variable
+ feedModificationOp = new
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+ InsertDeleteUpsertOperator.Kind.UPSERT, false);
+ // Create and add a new variable used for representing
the original record
+ feedModificationOp.setPrevRecordVar(context.newVar());
+ feedModificationOp.setPrevRecordType(recordType);
+ if (additionalFilteringField != null) {
+
feedModificationOp.setPrevFilterVar(context.newVar());
+ feedModificationOp
+
.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
+ }
+
feedModificationOp.getInputs().add(assign.getInputs().get(0));
+ } else {
+ feedModificationOp = new
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+ InsertDeleteUpsertOperator.Kind.INSERT, false);
+ feedModificationOp.getInputs().add(new
MutableObject<ILogicalOperator>(assign));
+ }
+
feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new
MutableObject<ILogicalOperator>(insertOp));
+ leafOperator.getInputs().add(new
MutableObject<ILogicalOperator>(feedModificationOp));
break;
}
default:
diff --git
a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 46ea72b..0709d27 100644
---
a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++
b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -2376,7 +2376,7 @@
bfs.initialize(metadataProvider.getMetadataTxnContext());
CompiledSubscribeFeedStatement csfs = new
CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(),
- bfs.getQuery(), bfs.getVarCounter());
+ bfs.getQuery(), bfs.getVarCounter(), bfs);
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" +
Boolean.TRUE);
metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME,
"" + bfs.getPolicy());
metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml
b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 9b03eaf..6075f67 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -28,6 +28,58 @@
ResultOffsetPath="results"
QueryOffsetPath="queries"
QueryFileExtension=".aql">
+ <test-group name="upsert">
+ <test-case FilePath="upsert">
+ <compilation-unit name="primary-secondary-rtree">
+ <output-dir compare="Text">primary-secondary-rtree</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="upsert-with-self-read">
+ <output-dir compare="Text">upsert-with-self-read</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="filtered-dataset">
+ <output-dir compare="Text">filtered-dataset</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="nullable-index">
+ <output-dir compare="Text">nullable-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="nested-index">
+ <output-dir compare="Text">nested-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="open-index">
+ <output-dir compare="Text">open-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="primary-index">
+ <output-dir compare="Text">primary-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="primary-secondary-btree">
+ <output-dir compare="Text">primary-secondary-btree</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="primary-secondary-inverted">
+ <output-dir
compare="Text">primary-secondary-inverted</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="multiple-secondaries">
+ <output-dir compare="Text">multiple-secondaries</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
<test-group name="feeds">
<test-case FilePath="feeds">
<compilation-unit name="feeds_01">
@@ -1239,58 +1291,6 @@
</compilation-unit>
</test-case>
-->
- </test-group>
- <test-group name="upsert">
- <test-case FilePath="upsert">
- <compilation-unit name="primary-secondary-rtree">
- <output-dir compare="Text">primary-secondary-rtree</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="upsert-with-self-read">
- <output-dir compare="Text">upsert-with-self-read</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="filtered-dataset">
- <output-dir compare="Text">filtered-dataset</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="nullable-index">
- <output-dir compare="Text">nullable-index</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="nested-index">
- <output-dir compare="Text">nested-index</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="open-index">
- <output-dir compare="Text">open-index</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="primary-index">
- <output-dir compare="Text">primary-index</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="primary-secondary-btree">
- <output-dir compare="Text">primary-secondary-btree</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="primary-secondary-inverted">
- <output-dir
compare="Text">primary-secondary-inverted</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="multiple-secondaries">
- <output-dir compare="Text">multiple-secondaries</output-dir>
- </compilation-unit>
- </test-case>
</test-group>
<test-group name="dml">
<test-case FilePath="dml">
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index e5a3473..275212c 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -129,7 +129,6 @@
} catch (ACIDException e) {
throw new HyracksDataException("could not write flush log", e);
}
-
flushLogCreated = true;
flushOnExit = false;
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
index 3cb8f37..e6dfa43 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
@@ -21,6 +21,9 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
public interface IRecordDataParser<T> extends IDataParser {
/**
@@ -34,4 +37,8 @@
* @return the record class
*/
public Class<? extends T> getRecordClass();
+
+ public default void appendKeys(IRawRecord<? extends T> record,
ArrayTupleBuilder tb) throws IOException {
+ throw new HyracksDataException("Unsupported operation");
+ }
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 461eaf9..09ea26f 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -31,8 +31,8 @@
protected FeedTupleForwarder tupleForwarder;
protected IHyracksTaskContext ctx;
protected Map<String, String> configuration;
- protected static final int NUMBER_OF_TUPLE_FIELDS = 1;
- protected ArrayTupleBuilder tb = new
ArrayTupleBuilder(NUMBER_OF_TUPLE_FIELDS);
+ protected int numOfFields = 1;
+ protected ArrayTupleBuilder tb;
@Override
public ITupleForwarder getTupleForwarder() {
@@ -52,6 +52,8 @@
public void configure(Map<String, String> configuration,
IHyracksTaskContext ctx) {
this.configuration = configuration;
this.ctx = ctx;
+ this.numOfFields = 1;
+ this.tb = new ArrayTupleBuilder(numOfFields);
}
@Override
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
new file mode 100644
index 0000000..e70b208
--- /dev/null
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class ChangeFeedDataFlowController<T> extends
FeedRecordDataFlowController<T> {
+
+ @Override
+ protected void parse(IRawRecord<? extends T> record) throws IOException {
+ tb.reset();
+ dataParser.parse(record, tb.getDataOutput());
+ tb.addFieldEndOffset();
+ dataParser.appendKeys(record, tb);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration,
IHyracksTaskContext ctx) {
+ this.configuration = configuration;
+ this.ctx = ctx;
+ // 1 for the record + 1 for each key field
+ this.numOfFields = 1 +
ExternalDataUtils.getNumberOfKeys(configuration);
+ this.tb = new ArrayTupleBuilder(numOfFields);
+ }
+}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 2a4eaf9..45427bd 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.dataflow;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.external.api.IRawRecord;
@@ -47,9 +48,7 @@
Thread.sleep(interval);
continue;
}
- tb.reset();
- dataParser.parse(record, tb.getDataOutput());
- tb.addFieldEndOffset();
+ parse(record);
tupleForwarder.addTuple(tb);
}
} catch (Throwable th) {
@@ -113,4 +112,7 @@
this.recordReader = recordReader;
recordReader.setController(this);
}
+
+ protected void parse(IRawRecord<? extends T> record) throws IOException {
+ }
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
index d5640a6..e46722e 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
@@ -39,6 +39,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
public class RecordWithMetadata<T> {
@@ -48,6 +49,7 @@
private IValueParserFactory[] valueParserFactories;
private byte[] fieldTypeTags;
private IRawRecord<T> record;
+ private int[] keyIdx;
// Serializers
@SuppressWarnings("unchecked")
@@ -135,4 +137,17 @@
mutableDouble.setValue(value);
IDataParser.toBytes(mutableDouble, fieldValueBuffers[index],
doubleSerde);
}
+
+ public void setKeyIdx(int[] keyIdx) {
+ this.keyIdx = keyIdx;
+ }
+
+ public void writeKey(ArrayTupleBuilder tb) throws IOException {
+ DataOutput out = tb.getDataOutput();
+ for (int idx : keyIdx) {
+ //out.writeByte(fieldTypeTags[idx]);
+ out.write(fieldValueBuffers[idx].getByteArray(), 0,
fieldValueBuffers[idx].getLength());
+ tb.addFieldEndOffset();
+ }
+ }
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
index 895af1b..1a83cc9 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
@@ -61,6 +61,7 @@
private static final MutationMessage POISON_PILL = new
MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
null);
+ private static final int[] IDX_KEY = { 0 };
private final String feedName;
private final short[] vbuckets;
private final String bucket;
@@ -72,7 +73,7 @@
private CouchbaseCore core;
private DefaultCoreEnvironment env;
private Thread pushThread;
- private ArrayBlockingQueue<MutationMessage> messages;
+ private ArrayBlockingQueue<DCPRequest> messages;
private GenericRecord<RecordWithMetadata<char[]>> record;
private RecordWithMetadata<char[]> recordWithMetadata;
private boolean done = false;
@@ -94,9 +95,10 @@
this.couchbaseNodes = couchbaseNodes;
this.vbuckets = vbuckets;
this.recordWithMetadata = new RecordWithMetadata<char[]>(metaTypes,
char[].class);
- this.messages = new ArrayBlockingQueue<MutationMessage>(queueSize);
+ this.messages = new ArrayBlockingQueue<DCPRequest>(queueSize);
this.value = new CharArrayRecord();
- recordWithMetadata.setRecord(value);
+ this.recordWithMetadata.setRecord(value);
+ this.recordWithMetadata.setKeyIdx(IDX_KEY);
this.record = new
GenericRecord<RecordWithMetadata<char[]>>(recordWithMetadata);
}
@@ -155,12 +157,8 @@
state.put(new
BucketStreamState(message.partition(), oldState.vbucketUUID(),
message.endSequenceNumber(),
oldState.endSequenceNumber(),
message.endSequenceNumber(),
oldState.snapshotEndSequenceNumber()));
- } else if (dcpRequest instanceof MutationMessage) {
-
- messages.put((MutationMessage) dcpRequest);
- } else if (dcpRequest instanceof RemoveMessage) {
- RemoveMessage message = (RemoveMessage) dcpRequest;
- LOGGER.info(message.key() + " was deleted.");
+ } else if (dcpRequest instanceof MutationMessage ||
dcpRequest instanceof RemoveMessage) {
+ messages.put(dcpRequest);
}
} catch (Throwable th) {
LOGGER.error(th);
@@ -189,32 +187,39 @@
if (messages.isEmpty()) {
controller.flush();
}
- MutationMessage message = messages.take();
- if (message == POISON_PILL) {
+ DCPRequest dcpReq = messages.take();
+ if (dcpReq == POISON_PILL) {
return null;
+ } else if (dcpReq instanceof RemoveMessage) {
+ RemoveMessage message = (RemoveMessage) dcpReq;
+ String key = message.key();
+ recordWithMetadata.reset();
+ recordWithMetadata.setMetadata(0, key);
+ } else {
+ MutationMessage message = (MutationMessage) dcpReq;
+ String key = message.key();
+ int vbucket = message.partition();
+ long seq = message.bySequenceNumber();
+ String bucket = message.bucket();
+ long cas = message.cas();
+ long creationTime = message.creationTime();
+ int expiration = message.expiration();
+ int flags = message.flags();
+ long revSeqNumber = message.revisionSequenceNumber();
+ int lockTime = message.lockTime();
+ recordWithMetadata.reset();
+ recordWithMetadata.setMetadata(0, key);
+ recordWithMetadata.setMetadata(1, bucket);
+ recordWithMetadata.setMetadata(2, vbucket);
+ recordWithMetadata.setMetadata(3, seq);
+ recordWithMetadata.setMetadata(4, cas);
+ recordWithMetadata.setMetadata(5, creationTime);
+ recordWithMetadata.setMetadata(6, expiration);
+ recordWithMetadata.setMetadata(7, flags);
+ recordWithMetadata.setMetadata(8, revSeqNumber);
+ recordWithMetadata.setMetadata(9, lockTime);
+ CouchbaseReader.set(message.content(), decoder, bytes, chars,
value);
}
- String key = message.key();
- int vbucket = message.partition();
- long seq = message.bySequenceNumber();
- String bucket = message.bucket();
- long cas = message.cas();
- long creationTime = message.creationTime();
- int expiration = message.expiration();
- int flags = message.flags();
- long revSeqNumber = message.revisionSequenceNumber();
- int lockTime = message.lockTime();
- recordWithMetadata.reset();
- recordWithMetadata.setMetadata(0, key);
- recordWithMetadata.setMetadata(1, bucket);
- recordWithMetadata.setMetadata(2, vbucket);
- recordWithMetadata.setMetadata(3, seq);
- recordWithMetadata.setMetadata(4, cas);
- recordWithMetadata.setMetadata(5, creationTime);
- recordWithMetadata.setMetadata(6, expiration);
- recordWithMetadata.setMetadata(7, flags);
- recordWithMetadata.setMetadata(8, revSeqNumber);
- recordWithMetadata.setMetadata(9, lockTime);
- CouchbaseReader.set(message.content(), decoder, bytes, chars, value);
return record;
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
index b9b6f65..6cbcbba 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
@@ -88,6 +88,8 @@
password = configuration.get(ExternalDataConstants.KEY_PASSWORD);
}
this.configuration = configuration;
+ ExternalDataUtils.setNumberOfKeys(configuration, 1);
+ ExternalDataUtils.setChangeFeed(configuration,
ExternalDataConstants.TRUE);
bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
couchbaseNodes =
configuration.get(ExternalDataConstants.KEY_NODES).split(",");
feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index a929eec..1524219 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -23,11 +23,11 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import
org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import
org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.runtime.IngestionRuntime;
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
index 67d84b5..c834c64 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -32,10 +32,12 @@
import org.apache.asterix.om.base.AMutableString;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
public class RecordWithMetadataParser<T> implements
IRecordDataParser<RecordWithMetadata<T>> {
@@ -95,17 +97,29 @@
valueBuffer.reset();
recBuilder.init();
RecordWithMetadata<T> rwm = record.get();
- for (int i = 0; i < numberOfFields; i++) {
- if (i == valueIndex) {
- valueParser.parse(rwm.getRecord(),
valueBuffer.getDataOutput());
- recBuilder.addField(i, valueBuffer);
- } else {
- recBuilder.addField(i, rwm.getMetadata(metaIndexes[i]));
+ if (rwm.getRecord().size() == 0) {
+ // null record, delete message
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ } else {
+ for (int i = 0; i < numberOfFields; i++) {
+ if (i == valueIndex) {
+ valueParser.parse(rwm.getRecord(),
valueBuffer.getDataOutput());
+ recBuilder.addField(i, valueBuffer);
+ } else {
+ recBuilder.addField(i,
rwm.getMetadata(metaIndexes[i]));
+ }
}
+ recBuilder.write(out, true);
}
- recBuilder.write(out, true);
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
+
+ @Override
+ public void appendKeys(IRawRecord<? extends RecordWithMetadata<T>> record,
ArrayTupleBuilder tb)
+ throws IOException {
+ RecordWithMetadata<T> rwm = record.get();
+ rwm.writeKey(tb);
+ }
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index dfe7aed..8fc4ce6 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -34,6 +34,7 @@
import org.apache.asterix.external.api.IStreamDataParser;
import org.apache.asterix.external.api.IStreamDataParserFactory;
import org.apache.asterix.external.api.IStreamFlowController;
+import org.apache.asterix.external.dataflow.ChangeFeedDataFlowController;
import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
import org.apache.asterix.external.dataflow.IndexingDataFlowController;
@@ -69,7 +70,11 @@
if (indexingOp) {
recordDataFlowController = new
IndexingDataFlowController();
} else if (ExternalDataUtils.isFeed(configuration)) {
- recordDataFlowController = new
FeedRecordDataFlowController();
+ if (ExternalDataUtils.isChangeFeed(configuration)) {
+ recordDataFlowController = new
ChangeFeedDataFlowController();
+ } else {
+ recordDataFlowController = new
FeedRecordDataFlowController();
+ }
} else {
recordDataFlowController = new RecordDataFlowController();
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 3d7f60b..32a8bf0 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -91,6 +91,10 @@
public static final String KEY_VALUE_INDEX = "value-index";
// a string representing the format of the raw record in the value field
in the data type
public static final String KEY_VALUE_FORMAT = "value-format";
+ // a boolean indicating whether the feed is a change feed
+ public static final String KEY_IS_CHANGE_FEED = "change-feed";
+ // an integer representing the number of keys in a change feed
+ public static final String KEY_KEY_SIZE = "key-size";
/**
* HDFS class names
*/
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 7c03e4d..a9d91f3 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -264,4 +264,20 @@
?
Integer.parseInt(configuration.get(ExternalDataConstants.KEY_QUEUE_SIZE))
: ExternalDataConstants.DEFAULT_QUEUE_SIZE;
}
+
+ public static boolean isChangeFeed(Map<String, String> configuration) {
+ return
Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_CHANGE_FEED));
+ }
+
+ public static int getNumberOfKeys(Map<String, String> configuration) {
+ return
Integer.parseInt(configuration.get(ExternalDataConstants.KEY_KEY_SIZE));
+ }
+
+ public static void setNumberOfKeys(Map<String, String> configuration, int
value) {
+ configuration.put(ExternalDataConstants.KEY_KEY_SIZE,
String.valueOf(value));
+ }
+
+ public static void setChangeFeed(Map<String, String> configuration, String
booleanString) {
+ configuration.put(ExternalDataConstants.KEY_IS_CHANGE_FEED,
booleanString);
+ }
}
diff --git
a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 3f85ba9..2689cc4 100644
---
a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++
b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -31,6 +31,7 @@
import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.watch.FeedActivity;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.lang.aql.parser.AQLParserFactory;
import org.apache.asterix.lang.common.base.IParser;
import org.apache.asterix.lang.common.base.IParserFactory;
@@ -178,6 +179,14 @@
return connectionRequest.getReceivingFeedId().getDataverse();
}
+ public boolean isChangeFeed(MetadataTransactionContext mdTxnCtx) throws
MetadataException, AlgebricksException {
+ FeedId feedId = connectionRequest.getReceivingFeedId();
+ Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
feedId.getDataverse(), feedId.getFeedName());
+ FeedPolicyAccessor policyAccessor = new
FeedPolicyAccessor(connectionRequest.getPolicyParameters());
+ FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(feed, policyAccessor,
mdTxnCtx);
+ return ExternalDataUtils.isChangeFeed(feed.getAdapterConfiguration());
+ }
+
private String getOutputType(MetadataTransactionContext mdTxnCtx) throws
MetadataException {
String outputType = null;
FeedId feedId = connectionRequest.getReceivingFeedId();
diff --git
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index f3523da..0704be1 100644
---
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -70,6 +70,7 @@
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import
org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
@@ -330,11 +331,12 @@
IDataSource<AqlSourceId> dataSource, List<LogicalVariable>
scanVariables,
List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars,
List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv,
- JobGenContext context, JobSpecification jobSpec, Object
implConfig) throws AlgebricksException {
+ JobGenContext context, JobSpecification jobSpec, Object
implConfig, List<LogicalVariable> keyVars)
+ throws AlgebricksException {
try {
switch (((AqlDataSource) dataSource).getDatasourceType()) {
case FEED:
- return buildFeedCollectRuntime(jobSpec, dataSource);
+ return buildFeedCollectRuntime(jobSpec, dataSource,
typeEnv, keyVars);
case INTERNAL_DATASET: {
// querying an internal dataset
return buildInternalDatasetScan(jobSpec, scanVariables,
minFilterVars, maxFilterVars, opSchema,
@@ -380,7 +382,8 @@
@SuppressWarnings("rawtypes")
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildFeedCollectRuntime(JobSpecification jobSpec,
- IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+ IDataSource<AqlSourceId> dataSource, IVariableTypeEnvironment
typeEnv, List<LogicalVariable> keyVars)
+ throws AlgebricksException {
FeedDataSource feedDataSource = (FeedDataSource) dataSource;
FeedCollectOperatorDescriptor feedCollector = null;
@@ -389,7 +392,20 @@
ARecordType feedOutputType = (ARecordType)
feedDataSource.getItemType();
ISerializerDeserializer payloadSerde =
NonTaggedDataFormat.INSTANCE.getSerdeProvider()
.getSerializerDeserializer(feedOutputType);
- RecordDescriptor feedDesc = new RecordDescriptor(new
ISerializerDeserializer[] { payloadSerde });
+ RecordDescriptor feedDesc;
+ if (keyVars != null) {
+ ArrayList<ISerializerDeserializer> serdes = new ArrayList<>();
+ serdes.add(payloadSerde);
+ for (LogicalVariable var : keyVars) {
+ IAType type = (IAType) typeEnv.getVarType(var);
+ ISerializerDeserializer serde =
AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(type);
+ serdes.add(serde);
+ }
+ feedDesc = new RecordDescriptor(serdes.toArray(new
ISerializerDeserializer[serdes.size()]));
+ } else {
+ feedDesc = new RecordDescriptor(new ISerializerDeserializer[]
{ payloadSerde });
+ }
FeedPolicyEntity feedPolicy = (FeedPolicyEntity) ((AqlDataSource)
dataSource).getProperties()
.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
@@ -402,10 +418,8 @@
feedCollector = new FeedCollectOperatorDescriptor(jobSpec,
feedConnectionId,
feedDataSource.getSourceFeedId(), feedOutputType,
feedDesc, feedPolicy.getProperties(),
feedDataSource.getLocation());
-
return new Pair<IOperatorDescriptor,
AlgebricksPartitionConstraint>(feedCollector,
determineLocationConstraint(feedDataSource));
-
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -2260,7 +2274,6 @@
}
boolean temp = dataset.getDatasetDetails().isTemp();
isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
int numKeys = primaryKeys.size();
int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0
: 1;
// Move key fields to front. {keys, record, filters}
diff --git
a/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
b/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
index c2eae36..7600925 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
@@ -31,6 +31,7 @@
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.asterix.om.visitors.IOMVisitor;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -177,6 +178,7 @@
return subRecordType;
}
+ // Note: this method doesn't work for nested fields
/**
* Returns the field type of the field name if it exists, otherwise null.
*
@@ -299,4 +301,14 @@
return NonTaggedFormatUtil.hasNullableField(rt) ? (int)
Math.ceil(rt.getFieldNames().length / 8.0) : 0;
}
+ public void getFieldTypes(List<List<String>> fields, List<IAType>
emptyList) throws AlgebricksException {
+ for (List<String> field : fields) {
+ try {
+ emptyList.add(getSubFieldType(field));
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ }
+
}
diff --git
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index f35f4d6..ce2ae26 100644
---
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.INullWriter;
@@ -110,7 +111,6 @@
writer.open();
indexHelper.open();
lsmIndex = (LSMBTree) indexHelper.getIndexInstance();
-
try {
nullTupleBuilder = new ArrayTupleBuilder(1);
DataOutput out = nullTupleBuilder.getDataOutput();
@@ -144,14 +144,15 @@
key.reset(accessor, tupleIndex);
}
- protected void writeOutput(int tupleIndex) throws Exception {
+ protected void writeOutput(int tupleIndex, boolean insert) throws
Exception {
+ boolean delete = prevTuple != null;
tb.reset();
frameTuple.reset(accessor, tupleIndex);
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
frameTuple.getFieldLength(i));
tb.addFieldEndOffset();
}
- if (prevTuple != null) {
+ if (delete) {
dos.write(prevTuple.getFieldData(numOfPrimaryKeys),
prevTuple.getFieldStart(numOfPrimaryKeys),
prevTuple.getFieldLength(numOfPrimaryKeys));
tb.addFieldEndOffset();
@@ -168,12 +169,18 @@
addNullField();
}
}
- FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(),
tb.getByteArray(), 0, tb.getSize());
+ if (insert || delete) {
+ FrameUtils.appendToWriter(writer, appender,
tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ }
}
private void addNullField() throws IOException {
dos.write(nullTupleBuilder.getByteArray());
tb.addFieldEndOffset();
+ }
+
+ public static boolean isNull(PermutingFrameTupleReference t1, int field) {
+ return t1.getFieldData(0)[t1.getFieldStart(field)] ==
ATypeTag.SERIALIZED_NULL_TYPE_TAG;
}
//TODO: use tryDelete/tryInsert in order to prevent deadlocks
@@ -182,12 +189,11 @@
accessor.reset(buffer);
LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor)
indexAccessor;
int tupleCount = accessor.getTupleCount();
-
try {
for (int i = 0; i < tupleCount; i++) {
+ boolean insert = false;
tuple.reset(accessor, i);
resetSearchPredicate(i);
- cursor.reset();
lsmAccessor.search(cursor, searchPred);
if (cursor.hasNext()) {
cursor.next();
@@ -203,15 +209,19 @@
lsmAccessor.forceDelete(prevTuple);
}
} else {
+ cursor.reset();
prevTuple = null;
}
modCallback.setOp(Operation.INSERT);
- if (prevTuple == null && i == 0) {
- lsmAccessor.insert(tuple);
- } else {
- lsmAccessor.forceInsert(tuple);
+ if (!isNull(tuple, numOfPrimaryKeys)) {
+ insert = true;
+ if (prevTuple == null && i == 0) {
+ lsmAccessor.insert(tuple);
+ } else {
+ lsmAccessor.forceInsert(tuple);
+ }
}
- writeOutput(i);
+ writeOutput(i, insert);
}
if (tupleCount > 0) {
// All tuples has to move forward to maintain the correctness
of the transaction pipeline
@@ -272,4 +282,4 @@
public void flush() throws HyracksDataException {
writer.flush();
}
-}
+}
\ No newline at end of file
diff --git
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
index 65dc83f..40bd21d 100644
---
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
+++
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
@@ -88,7 +88,7 @@
return true;
}
- private boolean isNull(PermutingFrameTupleReference t1) {
+ public static boolean isNull(PermutingFrameTupleReference t1) {
return t1.getFieldData(0)[t1.getFieldStart(0)] ==
ATypeTag.SERIALIZED_NULL_TYPE_TAG;
}
@@ -122,7 +122,6 @@
modCallback.setOp(Operation.INSERT);
lsmAccessor.forceInsert(tuple);
}
-
} catch (HyracksDataException e) {
throw e;
} catch (Exception e) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/621
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: If136a03d424970132dfb09f0dda56e160d4c0078
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>