Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1646

Change subject: WIP - Fix record loss for certain certain feed type
......................................................................

WIP - Fix record loss for certain certain feed type

1. Fix blindly replace connector between FeedCollector and
   AssignOperator.
2. Remove the IntroduceRandomPartitioningFeedComputationRule. This is no
   longer useful for current feed mode.
3. Revise feed connection job merge function.
4. Test case fix.

Change-Id: Ifb2d550750ec084232831befa0bf0fd0d94e7bae
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
D 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
7 files changed, 5 insertions(+), 121 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/46/1646/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index ec73185..653ae7c 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -47,7 +47,6 @@
 import org.apache.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
 import org.apache.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
 import 
org.apache.asterix.optimizer.rules.IntroduceMaterializationForInsertWithSelfScanRule;
-import 
org.apache.asterix.optimizer.rules.IntroduceRandomPartitioningFeedComputationRule;
 import 
org.apache.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectAssignRule;
 import 
org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
 import org.apache.asterix.optimizer.rules.IntroduceStaticTypeCastForInsertRule;
@@ -352,7 +351,6 @@
         physicalRewritesTopLevel.add(new SetAlgebricksPhysicalOperatorsRule());
         physicalRewritesTopLevel.add(new 
IntroduceRapidFrameFlushProjectAssignRule());
         physicalRewritesTopLevel.add(new SetExecutionModeRule());
-        physicalRewritesTopLevel.add(new 
IntroduceRandomPartitioningFeedComputationRule());
         return physicalRewritesTopLevel;
     }
 
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
deleted file mode 100644
index 1a0ecd9..0000000
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.optimizer.rules;
-
-import org.apache.asterix.metadata.declared.DataSource;
-import org.apache.asterix.metadata.declared.FeedDataSource;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedConnection;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class IntroduceRandomPartitioningFeedComputationRule implements 
IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
-            throws AlgebricksException {
-        ILogicalOperator op = opRef.getValue();
-        if (!op.getOperatorTag().equals(LogicalOperatorTag.ASSIGN)) {
-            return false;
-        }
-
-        ILogicalOperator opChild = op.getInputs().get(0).getValue();
-        if 
(!opChild.getOperatorTag().equals(LogicalOperatorTag.DATASOURCESCAN)) {
-            return false;
-        }
-
-        DataSourceScanOperator scanOp = (DataSourceScanOperator) opChild;
-        DataSource dataSource = (DataSource) scanOp.getDataSource();
-        if (dataSource.getDatasourceType() != DataSource.Type.FEED) {
-            return false;
-        }
-
-        final FeedDataSource feedDataSource = (FeedDataSource) dataSource;
-        FeedConnection feedConnection = feedDataSource.getFeedConnection();
-        if (feedConnection.getAppliedFunctions() == null) {
-            return false;
-        }
-
-        ExchangeOperator exchangeOp = new ExchangeOperator();
-        INodeDomain domain = new INodeDomain() {
-            @Override
-            public boolean sameAs(INodeDomain domain) {
-                return domain == this;
-            }
-
-            @Override
-            public Integer cardinality() {
-                return feedDataSource.getComputeCardinality();
-            }
-        };
-
-        exchangeOp.setPhysicalOperator(new 
RandomPartitionExchangePOperator(domain));
-        op.getInputs().get(0).setValue(exchangeOp);
-        exchangeOp.getInputs().add(new 
MutableObject<ILogicalOperator>(scanOp));
-        ExecutionMode em = ((AbstractLogicalOperator) 
scanOp).getExecutionMode();
-        exchangeOp.setExecutionMode(em);
-        exchangeOp.computeDeliveredPhysicalProperties(context);
-        context.computeAndSetTypeEnvironmentForOperator(exchangeOp);
-
-        AssignOperator assignOp = (AssignOperator) opRef.getValue();
-        AssignPOperator assignPhyOp = (AssignPOperator) 
assignOp.getPhysicalOperator();
-        assignPhyOp.setCardinalityConstraint(domain.cardinality());
-
-        return true;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
-            throws AlgebricksException {
-        return false;
-    }
-
-}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 4ea524a..0b0ad18 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -250,9 +250,6 @@
             for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : 
subJob.getConnectorMap().entrySet()) {
                 IConnectorDescriptor connDesc = entry.getValue();
                 ConnectorDescriptorId newConnId;
-                if (entry.getKey().getId() == 0) {
-                    continue;
-                }
                 if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
                     MToNPartitioningConnectorDescriptor m2nConn = 
(MToNPartitioningConnectorDescriptor) connDesc;
                     connDesc = new 
MToNPartitioningWithMessageConnectorDescriptor(jobSpec,
@@ -277,11 +274,8 @@
                 if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor) 
{
                     jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), 
replicateOp, iter1, leftOpDesc,
                             leftOp.getRight());
-                    jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), 
leftOpDesc, leftOp.getRight(),
-                            rightOpDesc, rightOp.getRight());
-                } else {
-                    jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), 
rightOpDesc, rightOp.getRight());
                 }
+                jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), 
rightOpDesc, rightOp.getRight());
             }
 
             // prepare for setting partition constraints
@@ -295,16 +289,10 @@
                 switch (lexpr.getTag()) {
                     case PARTITION_COUNT:
                         opId = ((PartitionCountExpression) 
lexpr).getOperatorDescriptorId();
-                        if (opId.getId() == 0) {
-                            continue;
-                        }
                         operatorCounts.put(operatorIdMapping.get(opId), (int) 
((ConstantExpression) cexpr).getValue());
                         break;
                     case PARTITION_LOCATION:
                         opId = ((PartitionLocationExpression) 
lexpr).getOperatorDescriptorId();
-                        if (opId.getId() == 0) {
-                            continue;
-                        }
                         IOperatorDescriptor opDesc = 
jobSpec.getOperatorMap().get(operatorIdMapping.get(opId));
                         List<LocationConstraint> locations = 
operatorLocations.get(opDesc.getOperatorId());
                         if (locations == null) {
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
index cbdc907..c31da8b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
@@ -1 +1 @@
-788
+804
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
index cbdc907..14b76cb 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
@@ -1 +1 @@
-788
+804
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index c4cb650..97e5511 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -54,8 +54,7 @@
     private final FeedRuntimeType subscriptionLocation;
 
     public FeedCollectOperatorDescriptor(JobSpecification spec, 
FeedConnectionId feedConnectionId, ARecordType atype,
-            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
-            FeedRuntimeType subscriptionLocation) {
+            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, 
FeedRuntimeType subscriptionLocation) {
         super(spec, 1, 1);
         this.recordDescriptors[0] = rDesc;
         this.outputType = atype;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index d0d9f7b..f60f3af 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -94,6 +94,7 @@
             throws HyracksDataException {
         IOperatorNodePushable nodePushable = null;
         switch (runtimeType) {
+            // TODO: remove ComputeNodePushable and the runtimeType (xikui)
             case COMPUTE:
                 nodePushable = new FeedMetaComputeNodePushable(ctx, 
recordDescProvider, partition, nPartitions,
                         coreOperator, feedConnectionId, feedPolicyProperties, 
operandId, this);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1646
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifb2d550750ec084232831befa0bf0fd0d94e7bae
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xkk...@gmail.com>

Reply via email to