Xikui Wang has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1648
Change subject: Fix record loss for certain certain feed type ...................................................................... Fix record loss for certain certain feed type 1. Fix blindly replace connector between FeedCollector and AssignOperator. 2. Wrap AssignOperator into the FeedMetaOperator to make sure the operators inside (udf, accessor, etc.) can handle messages in the feed workflow. 3. Revise feed connection job merge function. 4. Test case fix. Change-Id: I64806edd5e687ea6722d0f2c7802d023aaaa3e21 --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java M asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm M asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java M asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm 7 files changed, 33 insertions(+), 27 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/48/1648/1 diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java index 1a0ecd9..dfb73ee 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java @@ -61,7 +61,7 @@ final FeedDataSource feedDataSource = (FeedDataSource) dataSource; FeedConnection feedConnection = feedDataSource.getFeedConnection(); - if (feedConnection.getAppliedFunctions() == null) { + if (feedConnection.getAppliedFunctions() == null || feedConnection.getAppliedFunctions().size() == 0) { return false; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index 4ea524a..7e54395 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -80,6 +80,7 @@ import org.apache.hyracks.algebricks.common.utils.Triple; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; +import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.constraints.Constraint; @@ -207,12 +208,15 @@ Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>(); Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>(); List<JobId> jobIds = new ArrayList<>(); + FeedMetaOperatorDescriptor metaOp; for (int iter1 = 0; iter1 < jobsList.size(); iter1++) { FeedConnection curFeedConnection = feedConnections.get(iter1); JobSpecification subJob = jobsList.get(iter1); operatorIdMapping.clear(); Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = subJob.getOperatorMap(); + FeedConnectionId feedConnectionId = new FeedConnectionId(ingestionOp.getEntityId(), + feedConnections.get(iter1).getDatasetName()); FeedPolicyEntity feedPolicyEntity = FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(), @@ -221,26 +225,41 @@ for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorsMap.entrySet()) { IOperatorDescriptor opDesc = entry.getValue(); OperatorDescriptorId oldId = opDesc.getOperatorId(); - OperatorDescriptorId opId; + OperatorDescriptorId opId = null; if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) { String operandId = ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName(); - FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(jobSpec, - new FeedConnectionId(ingestionOp.getEntityId(), - feedConnections.get(iter1).getDatasetName()), - opDesc, feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, false, operandId); + metaOp = new FeedMetaOperatorDescriptor(jobSpec, + feedConnectionId, opDesc, feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, + operandId); opId = metaOp.getOperatorId(); opDesc.setOperatorId(opId); } else { if (opDesc instanceof AlgebricksMetaOperatorDescriptor) { AlgebricksMetaOperatorDescriptor algOp = (AlgebricksMetaOperatorDescriptor) opDesc; - for (IPushRuntimeFactory runtimeFactory : algOp.getPipeline().getRuntimeFactories()) { + IPushRuntimeFactory[] runtimeFactories = algOp.getPipeline().getRuntimeFactories(); + for (IPushRuntimeFactory runtimeFactory : runtimeFactories) { if (runtimeFactory instanceof StreamSelectRuntimeFactory) { ((StreamSelectRuntimeFactory) runtimeFactory).retainMissing(true, 0); } } + // Tweak AssignOp to work with messages + if (runtimeFactories[0] instanceof AssignRuntimeFactory && runtimeFactories.length > 1) { + IConnectorDescriptor connectorDesc = subJob.getOperatorInputMap() + .get(opDesc.getOperatorId()).get(0); + // anything on the network interface needs to be message compatible + if (connectorDesc instanceof MToNPartitioningConnectorDescriptor) { + metaOp = new FeedMetaOperatorDescriptor(jobSpec, + feedConnectionId, opDesc, feedPolicyEntity.getProperties(), + FeedRuntimeType.COMPUTE, null); + opId = metaOp.getOperatorId(); + opDesc.setOperatorId(opId); + } + } } - opId = jobSpec.createOperatorDescriptorId(opDesc); + if (opId == null) { + opId = jobSpec.createOperatorDescriptorId(opDesc); + } } operatorIdMapping.put(oldId, opId); } @@ -250,9 +269,6 @@ for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : subJob.getConnectorMap().entrySet()) { IConnectorDescriptor connDesc = entry.getValue(); ConnectorDescriptorId newConnId; - if (entry.getKey().getId() == 0) { - continue; - } if (connDesc instanceof MToNPartitioningConnectorDescriptor) { MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor) connDesc; connDesc = new MToNPartitioningWithMessageConnectorDescriptor(jobSpec, @@ -277,11 +293,8 @@ if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor) { jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), replicateOp, iter1, leftOpDesc, leftOp.getRight()); - jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), leftOpDesc, leftOp.getRight(), - rightOpDesc, rightOp.getRight()); - } else { - jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight()); } + jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight()); } // prepare for setting partition constraints @@ -295,16 +308,10 @@ switch (lexpr.getTag()) { case PARTITION_COUNT: opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId(); - if (opId.getId() == 0) { - continue; - } operatorCounts.put(operatorIdMapping.get(opId), (int) ((ConstantExpression) cexpr).getValue()); break; case PARTITION_LOCATION: opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId(); - if (opId.getId() == 0) { - continue; - } IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(operatorIdMapping.get(opId)); List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId()); if (locations == null) { diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm index cbdc907..c31da8b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm @@ -1 +1 @@ -788 +804 \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm index cbdc907..14b76cb 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm @@ -1 +1 @@ -788 +804 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java index c4cb650..97e5511 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java @@ -54,8 +54,7 @@ private final FeedRuntimeType subscriptionLocation; public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, ARecordType atype, - RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, - FeedRuntimeType subscriptionLocation) { + RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, FeedRuntimeType subscriptionLocation) { super(spec, 1, 1); this.recordDescriptors[0] = rDesc; this.outputType = atype; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java index d0d9f7b..cffd303 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java @@ -76,7 +76,7 @@ public FeedMetaOperatorDescriptor(final JobSpecification spec, final FeedConnectionId feedConnectionId, final IOperatorDescriptor coreOperatorDescriptor, final Map<String, String> feedPolicyProperties, - final FeedRuntimeType runtimeType, final boolean enableSubscriptionMode, final String operandId) { + final FeedRuntimeType runtimeType, final String operandId) { super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity()); this.feedConnectionId = feedConnectionId; this.feedPolicyProperties = feedPolicyProperties; diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm index 2975e63..c31da8b 100644 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm +++ b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm @@ -1 +1 @@ -788 \ No newline at end of file +804 \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/1648 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I64806edd5e687ea6722d0f2c7802d023aaaa3e21 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Xikui Wang <xkk...@gmail.com>