Taewoo Kim has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1880

Change subject: [WIP][ASTERIXDB-1984][COMP] proper fieldtype propagation
......................................................................

[WIP][ASTERIXDB-1984][COMP] proper fieldtype propagation

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Let the IntroduceJoinAccessMethod accept arbitrary
  forms of sub-tree for the probe-tree.
- Field-type propagation on a field with an enforced-index
  happens properly for the probe-tree in a join.

Change-Id: Ib353c85bf627d8dd65dba0ea307dee428edb4a25
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodAnalysisContext.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
A 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_04.sqlpp
M 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_01.aql
M 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_02.aql
M 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_03.aql
A 
asterixdb/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_04.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/nonpure/keep-datetime-local.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1019.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1029.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1029_2.plan
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
M 
asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/common/TypeResolverUtilTest.java
23 files changed, 507 insertions(+), 244 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/80/1880/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index bed5c7f..26bbf90 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -22,9 +22,11 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
@@ -62,6 +64,8 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 import com.google.common.collect.ImmutableSet;
@@ -109,16 +113,44 @@
     }
 
     protected void fillSubTreeIndexExprs(OptimizableOperatorSubTree subTree,
-            Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs, 
IOptimizationContext context)
-            throws AlgebricksException {
+            Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs, 
IOptimizationContext context,
+            boolean updateEntireExprInfo) throws AlgebricksException {
         Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = 
analyzedAMs.entrySet().iterator();
         // Check applicability of indexes by access method type.
         while (amIt.hasNext()) {
             Map.Entry<IAccessMethod, AccessMethodAnalysisContext> entry = 
amIt.next();
             AccessMethodAnalysisContext amCtx = entry.getValue();
-            // For the current access method type, map variables to applicable
-            // indexes.
-            fillAllIndexExprs(subTree, amCtx, context);
+            // For the current access method type, map variables to applicable 
indexes.
+            if (updateEntireExprInfo) {
+                fillAllIndexExprs(subTree, amCtx, context);
+            } else {
+                // For the probe tree, we do not need to update the entire 
information.
+                fillFieldTypeForOptFuncExpr(subTree, amCtx, context);
+            }
+        }
+    }
+
+    /**
+     * Sets the subtree and the field type for the variables in the given 
function expression.
+     */
+    private void fillFieldTypeForOptFuncExpr(OptimizableOperatorSubTree 
subTree,
+            AccessMethodAnalysisContext analysisCtx, ITypingContext context) 
throws AlgebricksException {
+        ILogicalOperator rootOp = subTree.getRoot();
+        IVariableTypeEnvironment envSubtree = 
context.getOutputTypeEnvironment(rootOp);
+        Set<LogicalVariable> liveVarsAtRootOp = new HashSet<LogicalVariable>();
+        VariableUtilities.getLiveVariables(rootOp, liveVarsAtRootOp);
+
+        // For each optimizable function expression, applies the field type of 
each varible.
+        for (IOptimizableFuncExpr optFuncExpr : 
analysisCtx.getMatchedFuncExprs()) {
+            for (LogicalVariable var : liveVarsAtRootOp) {
+                int optVarIndex = optFuncExpr.findLogicalVar(var);
+                if (optVarIndex == -1) {
+                    continue;
+                }
+                optFuncExpr.setOptimizableSubTree(optVarIndex, subTree);
+                IAType fieldType = (IAType) envSubtree.getVarType(var);
+                optFuncExpr.setFieldType(optVarIndex, fieldType);
+            }
         }
     }
 
@@ -242,6 +274,9 @@
                     //retrieve types of expressions joined/selected with an 
indexed field
                     for (int j = 0; j < optFuncExpr.getNumLogicalVars(); j++) {
                         if (j != exprAndVarIdx.second) {
+                            // Temp:
+//                            System.out.println(
+//                                    "pruneIndexCandidates() get idx " + j + 
" type:" + optFuncExpr.getFieldType(j));
                             matchedTypes.add(optFuncExpr.getFieldType(j));
                         }
 
@@ -291,6 +326,10 @@
                                     throw new IllegalArgumentException();
                                 }
                             }));
+                    // Temp:
+                    System.out
+                            .println("pruneIndexCandidates() get idx type: "
+                                    + matchedTypes.get(matchedTypes.size() - 
1));
 
                     //for the case when jaccard similarity is measured between 
ordered & unordered lists
                     boolean jaccardSimilarity = 
optFuncExpr.getFuncExpr().getFunctionIdentifier().getName()
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodAnalysisContext.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodAnalysisContext.java
index 16ee6d1..5e80d8e 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodAnalysisContext.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodAnalysisContext.java
@@ -108,7 +108,7 @@
         return lojGroupbyOpRef;
     }
 
-    public void setLOJIsNullFuncInGroupBy(ScalarFunctionCallExpression 
isNullFunc) {
+    public void setLOJIsMissingFuncInGroupBy(ScalarFunctionCallExpression 
isNullFunc) {
         lojIsNullFuncInGroupBy = isNullFunc;
     }
 
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
index 7fc7902..98ffba1 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
@@ -303,13 +303,8 @@
 
             // Analyze the condition of SELECT operator and initialize 
analyzedAMs.
             // Check whether the function in the SELECT operator can be truly 
transformed.
-            boolean matchInLeftSubTree = false;
             boolean matchInRightSubTree = false;
             if (continueCheck) {
-                if (leftSubTree.hasDataSource()) {
-                    matchInLeftSubTree = 
analyzeSelectOrJoinOpConditionAndUpdateAnalyzedAM(joinCond,
-                            leftSubTree.getAssignsAndUnnests(), analyzedAMs, 
context, typeEnvironment);
-                }
                 if (rightSubTree.hasDataSource()) {
                     matchInRightSubTree = 
analyzeSelectOrJoinOpConditionAndUpdateAnalyzedAM(joinCond,
                             rightSubTree.getAssignsAndUnnests(), analyzedAMs, 
context, typeEnvironment);
@@ -318,26 +313,20 @@
 
             // Find the dataset from the data-source and the record type of 
the dataset from the metadata.
             // This will be used to find an applicable index on the dataset.
-            boolean checkLeftSubTreeMetadata = false;
             boolean checkRightSubTreeMetadata = false;
-            if (continueCheck && (matchInLeftSubTree || matchInRightSubTree)) {
+            if (continueCheck && matchInRightSubTree) {
                 // Set dataset and type metadata.
-                if (matchInLeftSubTree) {
-                    checkLeftSubTreeMetadata = 
leftSubTree.setDatasetAndTypeMetadata(metadataProvider);
-                }
                 if (matchInRightSubTree) {
                     checkRightSubTreeMetadata = 
rightSubTree.setDatasetAndTypeMetadata(metadataProvider);
                 }
             }
 
-            if (continueCheck && (checkLeftSubTreeMetadata || 
checkRightSubTreeMetadata)) {
+            if (continueCheck && checkRightSubTreeMetadata) {
                 // Map variables to the applicable indexes and find the field 
name and type.
                 // Then find the applicable indexes for the variables used in 
the JOIN condition.
-                if (checkLeftSubTreeMetadata) {
-                    fillSubTreeIndexExprs(leftSubTree, analyzedAMs, context);
-                }
+                fillSubTreeIndexExprs(leftSubTree, analyzedAMs, context, 
false);
                 if (checkRightSubTreeMetadata) {
-                    fillSubTreeIndexExprs(rightSubTree, analyzedAMs, context);
+                    fillSubTreeIndexExprs(rightSubTree, analyzedAMs, context, 
true);
                 }
 
                 // Prune the access methods based on the function expression 
and access methods.
@@ -365,7 +354,7 @@
                         analysisCtx.setLOJGroupbyOpRef(opRef);
                         ScalarFunctionCallExpression isNullFuncExpr = 
AccessMethodUtils
                                 
.findLOJIsMissingFuncInGroupBy((GroupByOperator) opRef.getValue());
-                        analysisCtx.setLOJIsNullFuncInGroupBy(isNullFuncExpr);
+                        
analysisCtx.setLOJIsMissingFuncInGroupBy(isNullFuncExpr);
                     }
 
                     Dataset indexDataset = 
analysisCtx.getDatasetFromIndexDatasetMap(chosenIndex.second);
@@ -414,8 +403,8 @@
         }
         joinCond = (AbstractFunctionCallExpression) condExpr;
 
-        boolean leftSubTreeInitialized = 
leftSubTree.initFromSubTree(joinOp.getInputs().get(0));
-        boolean rightSubTreeInitialized = 
rightSubTree.initFromSubTree(joinOp.getInputs().get(1));
+        boolean leftSubTreeInitialized = 
leftSubTree.initFromSubTree(joinOp.getInputs().get(0), false);
+        boolean rightSubTreeInitialized = 
rightSubTree.initFromSubTree(joinOp.getInputs().get(1), true);
 
         if (!leftSubTreeInitialized || !rightSubTreeInitialized) {
             return false;
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
index d95b278..9ec2ad6 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
@@ -164,7 +164,7 @@
 
         // Initialize the subtree information.
         // Match and put assign, unnest, and datasource information.
-        boolean res = subTree.initFromSubTree(selectOp.getInputs().get(0));
+        boolean res = subTree.initFromSubTree(selectOp.getInputs().get(0), 
true);
         return res && subTree.hasDataSourceScan();
     }
 
@@ -337,7 +337,7 @@
             if (continueCheck) {
                 // Map variables to the applicable indexes and find the field 
name and type.
                 // Then find the applicable indexes for the variables used in 
the SELECT condition.
-                fillSubTreeIndexExprs(subTree, analyzedAMs, context);
+                fillSubTreeIndexExprs(subTree, analyzedAMs, context, true);
 
                 // Prune the access methods based on the function expression 
and access methods.
                 pruneIndexCandidates(analyzedAMs, context, typeEnvironment);
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 725de12..ecfed8b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -22,8 +22,11 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
 
 import 
org.apache.asterix.common.annotations.SkipSecondaryIndexSearchExpressionAnnotation;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -49,10 +52,12 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
 import 
org.apache.asterix.runtime.evaluators.functions.FullTextContainsDescriptor;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -597,11 +602,19 @@
             ILogicalExpression joinCond, IOptimizableFuncExpr optFuncExpr, 
List<LogicalVariable> originalSubTreePKs,
             List<LogicalVariable> surrogateSubTreePKs, IOptimizationContext 
context) throws AlgebricksException {
 
-        probeSubTree.getPrimaryKeyVars(null, originalSubTreePKs);
+        // Gets the primary key(s) from the probe subtree.
+        Pair<ILogicalOperator, Set<LogicalVariable>> RootOpAndPKVars =
+                
EquivalenceClassUtils.findOrCreatePrimaryKeyOpAndVariables(probeSubTree.getRoot(),
 true, context);
+        ILogicalOperator rootOp = RootOpAndPKVars.first;
+        originalSubTreePKs.addAll(RootOpAndPKVars.second);
+        if (!probeSubTree.getRoot().equals(rootOp)) {
+            probeSubTree.getRootRef().setValue(rootOp);
+            probeSubTree.setRoot(rootOp);
+        }
 
         // Create two copies of the original probe subtree.
-        // The first copy, which becomes the new probe subtree, will retain 
the primary-key and secondary-search key variables,
-        // but have all other variables replaced with new ones.
+        // The first copy, which becomes the new probe subtree, will retain 
the primary-key and
+        // secondary-search key variables, but have all other variables 
replaced with new ones.
         // The second copy, which will become an input to the top-level 
equi-join to resolve the surrogates,
         // will have all primary-key and secondary-search keys replaced, but 
retains all other original variables.
 
@@ -644,11 +657,7 @@
         Mutable<ILogicalOperator> originalProbeSubTreeRootRef = 
probeSubTree.getRootRef();
 
         // Replace the original probe subtree with its copy.
-        Dataset origDataset = probeSubTree.getDataset();
-        ARecordType origRecordType = probeSubTree.getRecordType();
-        probeSubTree.initFromSubTree(newProbeSubTreeRootRef);
-        probeSubTree.setDataset(origDataset);
-        probeSubTree.setRecordType(origRecordType);
+        probeSubTree.initFromSubTree(newProbeSubTreeRootRef, false);
 
         // Replace the variables in the join condition based on the mapping of 
variables
         // in the new probe subtree.
@@ -1112,28 +1121,31 @@
             return null;
         }
 
-        for (AbstractLogicalOperator op : subTree.getAssignsAndUnnests()) {
-            if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
-                continue;
+        Queue<Mutable<ILogicalOperator>> queue = new LinkedList<>();
+        queue.add(subTree.getRootRef());
+        while (!queue.isEmpty()) {
+            ILogicalOperator descendantOp = queue.poll().getValue();
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                List<Mutable<ILogicalExpression>> exprList = ((AssignOperator) 
descendantOp).getExpressions();
+                for (Mutable<ILogicalExpression> expr : exprList) {
+                    if (expr.getValue().getExpressionTag() != 
LogicalExpressionTag.FUNCTION_CALL) {
+                        continue;
+                    }
+                    AbstractFunctionCallExpression funcExpr = 
(AbstractFunctionCallExpression) expr.getValue();
+                    if (funcExpr.getFunctionIdentifier() != funcId) {
+                        continue;
+                    }
+                    ILogicalExpression varExpr = 
funcExpr.getArguments().get(0).getValue();
+                    if (varExpr.getExpressionTag() != 
LogicalExpressionTag.VARIABLE) {
+                        continue;
+                    }
+                    if (((VariableReferenceExpression) 
varExpr).getVariableReference() == targetVar) {
+                        continue;
+                    }
+                    return (ScalarFunctionCallExpression) funcExpr;
+                }
             }
-            List<Mutable<ILogicalExpression>> exprList = ((AssignOperator) 
op).getExpressions();
-            for (Mutable<ILogicalExpression> expr : exprList) {
-                if (expr.getValue().getExpressionTag() != 
LogicalExpressionTag.FUNCTION_CALL) {
-                    continue;
-                }
-                AbstractFunctionCallExpression funcExpr = 
(AbstractFunctionCallExpression) expr.getValue();
-                if (funcExpr.getFunctionIdentifier() != funcId) {
-                    continue;
-                }
-                ILogicalExpression varExpr = 
funcExpr.getArguments().get(0).getValue();
-                if (varExpr.getExpressionTag() != 
LogicalExpressionTag.VARIABLE) {
-                    continue;
-                }
-                if (((VariableReferenceExpression) 
varExpr).getVariableReference() == targetVar) {
-                    continue;
-                }
-                return (ScalarFunctionCallExpression) funcExpr;
-            }
+            queue.addAll(descendantOp.getInputs());
         }
         return null;
     }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
index 2534680..44f622a 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
@@ -83,10 +83,25 @@
     private List<Dataset> ixJoinOuterAdditionalDatasets = null;
     private List<ARecordType> ixJoinOuterAdditionalRecordTypes = null;
 
-    public boolean initFromSubTree(Mutable<ILogicalOperator> subTreeOpRef) 
throws AlgebricksException {
+    /**
+     * Identifies the root of the sub-tree and initializes the data-source, 
assign, and unnest information.
+     *
+     * @param subTreeOpRef
+     *            the root operator
+     * @param initDataSource
+     *            tries to initialize the data-source, assign, and unnest 
information if true.
+     * @return
+     * @throws AlgebricksException
+     */
+    public boolean initFromSubTree(Mutable<ILogicalOperator> subTreeOpRef, 
boolean initDataSource)
+            throws AlgebricksException {
         reset();
         rootRef = subTreeOpRef;
         root = subTreeOpRef.getValue();
+
+        if (!initDataSource) {
+            return true;
+        }
         boolean passedSource = false;
         boolean result = false;
         Mutable<ILogicalOperator> searchOpRef = subTreeOpRef;
@@ -287,7 +302,7 @@
                 throw 
CompilationException.create(ErrorCode.NO_METADATA_FOR_DATASET, datasetName);
             }
             // Get the record type for that dataset.
-            IAType itemType = 
metadataProvider.findType(ds.getItemTypeDataverseName(), ds.getItemTypeName());
+            IAType itemType = metadataProvider.findType(ds);
             if (itemType.getTypeTag() != ATypeTag.OBJECT) {
                 if (i == 0) {
                     return false;
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_04.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_04.sqlpp
new file mode 100644
index 0000000..8496f2b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_04.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+use test;
+
+create type test.TestType as
+{
+  id : integer,
+  val : integer
+}
+
+create  dataset testdst(TestType) primary key id;
+create  dataset testdst2(TestType) primary key id;
+create  dataset testdst3(TestType) primary key id;
+
+create  index sec_Idx  on testdst (val) type btree;
+create  index sec2_Idx  on testdst2 (val) type btree;
+create  index sec3_Idx  on testdst3 (val) type btree;
+
+SELECT * FROM
+testdst a JOIN testdst2 b on a.val = b.val JOIN testdst3 c ON b.val /*+ 
indexnl */ =  c.val;
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_01.aql
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_01.aql
index 6792ae4..8e41ef1 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_01.aql
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_01.aql
@@ -36,11 +36,11 @@
 }
 
 create dataset Names(NameType) primary key nested.id;
-create index Name_idx on Names(nested.fname: string?,lnested.name: string?) 
enforced;
+create index Name_idx on Names(nested.fname: string?,nested.lname: string?) 
enforced;
 
 write output to 
asterix_nc1:"rttest/btree-index-join_secondary-composite-key-prefix-join_01.adm";
 
-for $emp1 in dataset('Names') 
-for $emp2 in dataset('Names') 
+for $emp1 in dataset('Names')
+for $emp2 in dataset('Names')
 where $emp1.nested.fname /*+ indexnl*/> $emp2.nested.fname and 
$emp1.nested.lname /*+ indexnl*/> $emp2.nested.lname
 return {"emp1": $emp1, "emp2": $emp2 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_02.aql
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_02.aql
index a4a2dbd..ecc8947 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_02.aql
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_02.aql
@@ -36,11 +36,11 @@
 }
 
 create dataset Names(NameType) primary key nested.id;
-create index Name_idx on Names(nested.fname: string?,lnested.name: string?) 
enforced;
+create index Name_idx on Names(nested.fname: string?,nested.lname: string?) 
enforced;
 
 write output to 
asterix_nc1:"rttest/btree-index-join_secondary-composite-key-prefix-join_02.adm";
 
-for $emp1 in dataset('Names') 
-for $emp2 in dataset('Names') 
+for $emp1 in dataset('Names')
+for $emp2 in dataset('Names')
 where $emp1.nested.fname /*+ indexnl*/< $emp2.nested.fname and 
$emp1.nested.lname /*+ indexnl*/< $emp2.nested.lname
 return {"emp1": $emp1, "emp2": $emp2 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_03.aql
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_03.aql
index 6a1e4e9..ec78dfe 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_03.aql
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/nested-open-index/btree-index-join/secondary-composite-key-join_03.aql
@@ -36,11 +36,11 @@
 }
 
 create dataset Names(NameType) primary key nested.id;
-create index Name_idx on Names(nested.fname: string?,lnested.name: string?) 
enforced;
+create index Name_idx on Names(nested.fname: string?,nested.lname: string?) 
enforced;
 
 write output to 
asterix_nc1:"rttest/btree-index-join_secondary-composite-key-prefix-join_03.adm";
 
-for $emp1 in dataset('Names') 
-for $emp2 in dataset('Names') 
+for $emp1 in dataset('Names')
+for $emp2 in dataset('Names')
 where $emp1.nested.fname /*+ indexnl*/= $emp2.nested.fname and 
$emp1.nested.lname /*+ indexnl*/= $emp2.nested.lname
 return {"emp1": $emp1, "emp2": $emp2 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_04.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_04.plan
new file mode 100644
index 0000000..9f60440
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_04.plan
@@ -0,0 +1,33 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- BTREE_SEARCH  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STABLE_SORT [$$22(ASC)]  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- HYBRID_HASH_JOIN [$$19][$$15]  
|PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$19]  
|PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                  -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$15]  
|PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                  -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/nonpure/keep-datetime-local.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/nonpure/keep-datetime-local.plan
index 5246d83..5fcbcc8 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/nonpure/keep-datetime-local.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/nonpure/keep-datetime-local.plan
@@ -16,21 +16,25 @@
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                       -- STABLE_SORT [$$30(ASC), $$23(ASC)]  |PARTITIONED|
                         -- HASH_PARTITION_EXCHANGE [$$30]  |PARTITIONED|
-                          -- HYBRID_HASH_JOIN [$$25][$$24]  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$25]  |PARTITIONED|
+                          -- STREAM_SELECT  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
                               -- ASSIGN  |PARTITIONED|
-                                -- STREAM_SELECT  |PARTITIONED|
-                                  -- ASSIGN  |PARTITIONED|
-                                    -- STREAM_PROJECT  |PARTITIONED|
-                                      -- ASSIGN  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- DATASOURCE_SCAN  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                              -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$24]  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- BTREE_SEARCH  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                                        -- STABLE_SORT [$$34(ASC)]  
|PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                  -- BROADCAST_EXCHANGE  
|PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- STREAM_SELECT  
|PARTITIONED|
+                                                        -- ASSIGN  
|PARTITIONED|
+                                                          -- STREAM_PROJECT  
|PARTITIONED|
+                                                            -- ASSIGN  
|PARTITIONED|
+                                                              -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                  -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1019.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1019.plan
index ed40740..18aab46 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1019.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1019.plan
@@ -34,53 +34,56 @@
                                             -- STABLE_SORT [$$47(ASC), 
$$53(ASC)]  |PARTITIONED|
                                               -- HASH_PARTITION_EXCHANGE 
[$$47, $$53]  |PARTITIONED|
                                                 -- STREAM_PROJECT  
|PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                    -- NESTED_LOOP  
|PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                  -- STREAM_SELECT  
|PARTITIONED|
+                                                    -- STREAM_PROJECT  
|PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
                                                         -- STREAM_PROJECT  
|PARTITIONED|
-                                                          -- STREAM_SELECT  
|PARTITIONED|
-                                                            -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- 
PRE_CLUSTERED_GROUP_BY[$$40, $$52]  |PARTITIONED|
-                                                                      {
-                                                                        -- 
AGGREGATE  |LOCAL|
-                                                                          -- 
STREAM_SELECT  |LOCAL|
-                                                                            -- 
NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                      }
-                                                                -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- 
STABLE_SORT [$$40(ASC), $$52(ASC)]  |PARTITIONED|
-                                                                    -- 
HASH_PARTITION_EXCHANGE [$$40, $$52]  |PARTITIONED|
-                                                                      -- 
STREAM_PROJECT  |PARTITIONED|
-                                                                        -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- 
HYBRID_HASH_JOIN [$$50][$$43]  |PARTITIONED|
-                                                                            -- 
HASH_PARTITION_EXCHANGE [$$50]  |PARTITIONED|
-                                                                              
-- NESTED_LOOP  |PARTITIONED|
-                                                                               
 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
   -- STREAM_PROJECT  |PARTITIONED|
-                                                                               
     -- ASSIGN  |PARTITIONED|
-                                                                               
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
         -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                               
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                               
 -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                               
   -- STREAM_PROJECT  |PARTITIONED|
-                                                                               
     -- ASSIGN  |PARTITIONED|
-                                                                               
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
         -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                               
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                            -- 
HASH_PARTITION_EXCHANGE [$$43]  |PARTITIONED|
-                                                                              
-- STREAM_PROJECT  |PARTITIONED|
-                                                                               
 -- ASSIGN  |PARTITIONED|
-                                                                               
   -- STREAM_PROJECT  |PARTITIONED|
-                                                                               
     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
       -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                               
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
           -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                      -- BROADCAST_EXCHANGE  
|PARTITIONED|
-                                                        -- STREAM_PROJECT  
|PARTITIONED|
-                                                          -- ASSIGN  
|PARTITIONED|
-                                                            -- STREAM_PROJECT  
|PARTITIONED|
+                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- BTREE_SEARCH  
|PARTITIONED|
                                                               -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                -- STABLE_SORT 
[$$65(ASC)]  |PARTITIONED|
                                                                   -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                    -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                      -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- 
RTREE_SEARCH  |PARTITIONED|
+                                                                          -- 
BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                            -- 
ASSIGN  |PARTITIONED|
+                                                                              
-- STREAM_PROJECT  |PARTITIONED|
+                                                                               
 -- STREAM_SELECT  |PARTITIONED|
+                                                                               
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
     -- PRE_CLUSTERED_GROUP_BY[$$40, $$52]  |PARTITIONED|
+                                                                               
             {
+                                                                               
               -- AGGREGATE  |LOCAL|
+                                                                               
                 -- STREAM_SELECT  |LOCAL|
+                                                                               
                   -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                               
             }
+                                                                               
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
         -- STABLE_SORT [$$40(ASC), $$52(ASC)]  |PARTITIONED|
+                                                                               
           -- HASH_PARTITION_EXCHANGE [$$40, $$52]  |PARTITIONED|
+                                                                               
             -- STREAM_PROJECT  |PARTITIONED|
+                                                                               
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
                 -- HYBRID_HASH_JOIN [$$50][$$43]  |PARTITIONED|
+                                                                               
                   -- HASH_PARTITION_EXCHANGE [$$50]  |PARTITIONED|
+                                                                               
                     -- NESTED_LOOP  |PARTITIONED|
+                                                                               
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
                         -- STREAM_PROJECT  |PARTITIONED|
+                                                                               
                           -- ASSIGN  |PARTITIONED|
+                                                                               
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
                               -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                               
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                               
                       -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                               
                         -- STREAM_PROJECT  |PARTITIONED|
+                                                                               
                           -- ASSIGN  |PARTITIONED|
+                                                                               
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
                               -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                               
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                               
                   -- HASH_PARTITION_EXCHANGE [$$43]  |PARTITIONED|
+                                                                               
                     -- STREAM_PROJECT  |PARTITIONED|
+                                                                               
                       -- ASSIGN  |PARTITIONED|
+                                                                               
                         -- STREAM_PROJECT  |PARTITIONED|
+                                                                               
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
                             -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                               
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
                                 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1029.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1029.plan
index 71070bd..048b6ff 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1029.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1029.plan
@@ -19,57 +19,44 @@
                           -- STABLE_SORT [$$36(ASC), $$37(ASC)]  |PARTITIONED|
                             -- HASH_PARTITION_EXCHANGE [$$36, $$37]  
|PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- NESTED_LOOP  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- NESTED_LOOP  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  -- STREAM_SELECT  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- BTREE_SEARCH  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STABLE_SORT [$$64(ASC)]  
|PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                -- DATASOURCE_SCAN  
|PARTITIONED|
+                                                -- STREAM_PROJECT  
|PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                    -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
-                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_SELECT  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                    -- BTREE_SEARCH  
|PARTITIONED|
+                                                    -- RTREE_SEARCH  
|PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                        -- STABLE_SORT 
[$$55(ASC)]  |PARTITIONED|
+                                                        -- ASSIGN  
|PARTITIONED|
                                                           -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STREAM_PROJECT  
|PARTITIONED|
+                                                            -- NESTED_LOOP  
|PARTITIONED|
                                                               -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- 
BTREE_SEARCH  |PARTITIONED|
-                                                                  -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- 
REPLICATE  |PARTITIONED|
-                                                                      -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- 
ASSIGN  |PARTITIONED|
+                                                                -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  
|PARTITIONED|
+                                                                    -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                        -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                           -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                      -- ASSIGN  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- STREAM_SELECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                  -- BTREE_SEARCH  
|PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                      -- STABLE_SORT 
[$$58(ASC)]  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE 
 |PARTITIONED|
-                                                          -- STREAM_PROJECT  
|PARTITIONED|
-                                                            -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- BTREE_SEARCH  
|PARTITIONED|
-                                                                -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- 
STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ASSIGN  
|PARTITIONED|
-                                                                      -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- 
REPLICATE  |PARTITIONED|
-                                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- 
ASSIGN  |PARTITIONED|
-                                                                              
-- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                              -- 
BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  
|PARTITIONED|
+                                                                    -- 
STREAM_SELECT  |PARTITIONED|
+                                                                      -- 
ASSIGN  |PARTITIONED|
+                                                                        -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- 
BTREE_SEARCH  |PARTITIONED|
+                                                                            -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              
-- STABLE_SORT [$$55(ASC)]  |PARTITIONED|
+                                                                               
 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
   -- STREAM_PROJECT  |PARTITIONED|
+                                                                               
     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
       -- BTREE_SEARCH  |PARTITIONED|
+                                                                               
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
           -- ASSIGN  |PARTITIONED|
+                                                                               
             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
             -- BROADCAST_EXCHANGE  |PARTITIONED|
               -- AGGREGATE  |UNPARTITIONED|
                 -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1029_2.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1029_2.plan
index 71070bd..048b6ff 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1029_2.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1029_2.plan
@@ -19,57 +19,44 @@
                           -- STABLE_SORT [$$36(ASC), $$37(ASC)]  |PARTITIONED|
                             -- HASH_PARTITION_EXCHANGE [$$36, $$37]  
|PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- NESTED_LOOP  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- NESTED_LOOP  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  -- STREAM_SELECT  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- BTREE_SEARCH  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STABLE_SORT [$$64(ASC)]  
|PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                -- DATASOURCE_SCAN  
|PARTITIONED|
+                                                -- STREAM_PROJECT  
|PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                    -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
-                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_SELECT  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                    -- BTREE_SEARCH  
|PARTITIONED|
+                                                    -- RTREE_SEARCH  
|PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                        -- STABLE_SORT 
[$$55(ASC)]  |PARTITIONED|
+                                                        -- ASSIGN  
|PARTITIONED|
                                                           -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STREAM_PROJECT  
|PARTITIONED|
+                                                            -- NESTED_LOOP  
|PARTITIONED|
                                                               -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- 
BTREE_SEARCH  |PARTITIONED|
-                                                                  -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- 
REPLICATE  |PARTITIONED|
-                                                                      -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- 
ASSIGN  |PARTITIONED|
+                                                                -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  
|PARTITIONED|
+                                                                    -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                        -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                           -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                      -- ASSIGN  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- STREAM_SELECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                  -- BTREE_SEARCH  
|PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                      -- STABLE_SORT 
[$$58(ASC)]  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE 
 |PARTITIONED|
-                                                          -- STREAM_PROJECT  
|PARTITIONED|
-                                                            -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- BTREE_SEARCH  
|PARTITIONED|
-                                                                -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- 
STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ASSIGN  
|PARTITIONED|
-                                                                      -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- 
REPLICATE  |PARTITIONED|
-                                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- 
ASSIGN  |PARTITIONED|
-                                                                              
-- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                              -- 
BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  
|PARTITIONED|
+                                                                    -- 
STREAM_SELECT  |PARTITIONED|
+                                                                      -- 
ASSIGN  |PARTITIONED|
+                                                                        -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- 
BTREE_SEARCH  |PARTITIONED|
+                                                                            -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              
-- STABLE_SORT [$$55(ASC)]  |PARTITIONED|
+                                                                               
 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
   -- STREAM_PROJECT  |PARTITIONED|
+                                                                               
     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
       -- BTREE_SEARCH  |PARTITIONED|
+                                                                               
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
           -- ASSIGN  |PARTITIONED|
+                                                                               
             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
             -- BROADCAST_EXCHANGE  |PARTITIONED|
               -- AGGREGATE  |UNPARTITIONED|
                 -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 7c82ca3..420e418 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -113,6 +113,10 @@
     public static final int DATASET_ID_EXHAUSTED = 1040;
     public static final int INDEX_ILLEGAL_ENFORCED_NON_OPTIONAL = 1041;
     public static final int INDEX_ILLEGAL_NON_ENFORCED_TYPED = 1042;
+    public static final int UNKNOWN_TYPE_IN_DATAVERSE = 1043;
+    public static final int METADATA_TYPE_LOOKUP_EXCEPTION = 1044;
+    public static final int DATASOURCE_NOT_FOUND = 1045;
+    public static final int FIELDNAME_OF_ENFORCED_INDEX_CANNOT_BE_NULL = 1046;
 
     // Feed errors
     public static final int DATAFLOW_ILLEGAL_STATE = 3001;
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index facf1a9..183ed82 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -99,6 +99,10 @@
 1040 = Dataset id space is exhausted
 1041 = Cannot create enforced index on \"%1$s\" field with non-optional type
 1042 = Cannot create non-enforced typed index of this kind: %1$s
+1043 = Type name %1$s is unknown in dataverse %2$s
+1044 = Metadata exception while looking up type %1$s in dataverse %2$s
+1045 = Datasource with the id %1$s was not found
+1046 = A field name of an enforced index cannot be null or empty.
 
 # Feed Errors
 3001 = Illegal state.
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 11645e8..7fe9af1 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -398,9 +398,9 @@
             //concurrent access to UTF8StringPointable comparator in 
ARecordType object.
             //see issue 510
             ARecordType aRecType = (ARecordType) datatype.getDatatype();
-            return new Datatype(
-                    datatype.getDataverseName(), datatype.getDatatypeName(), 
new ARecordType(aRecType.getTypeName(),
-                            aRecType.getFieldNames(), 
aRecType.getFieldTypes(), aRecType.isOpen()),
+            return new Datatype(datatype.getDataverseName(),
+                    datatype.getDatatypeName(), new 
ARecordType(aRecType.getTypeName(), aRecType.getFieldNames(),
+                            aRecType.getFieldTypes(), aRecType.isOpen(), 
aRecType.getEnforcedFieldNameToTypeMap()),
                     datatype.getIsAnonymous());
         }
         try {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index b13f4c2..815062a 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -101,7 +101,7 @@
             j++;
         }
         return new ARecordType(recType.getTypeName(), fieldNames.toArray(new 
String[0]),
-                fieldTypes.toArray(new IAType[0]), recType.isOpen());
+                fieldTypes.toArray(new IAType[0]), recType.isOpen(), 
recType.getEnforcedFieldNameToTypeMap());
     }
 
     public List<List<String>> getPartitioningKeys() {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
index 49b32c0..c3fe7d7 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -20,8 +20,11 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -56,13 +59,129 @@
         try {
             type = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverse, 
typeName);
         } catch (MetadataException e) {
-            throw new AlgebricksException(
-                    "Metadata exception while looking up type '" + typeName + 
"' in dataverse '" + dataverse + "'", e);
+            throw new 
CompilationException(ErrorCode.METADATA_TYPE_LOOKUP_EXCEPTION, e, typeName, 
dataverse);
         }
         if (type == null) {
-            throw new AlgebricksException("Type name '" + typeName + "' 
unknown in dataverse '" + dataverse + "'");
+            throw 
CompilationException.create(ErrorCode.UNKNOWN_TYPE_IN_DATAVERSE, typeName, 
dataverse);
         }
         return type.getDatatype();
+    }
+
+    /**
+     * Returns the datatype for a dataset with the augmented information about 
the fields with enforced-indexes.
+     */
+    public static IAType findTypeOfDataSet(MetadataTransactionContext 
mdTxnCtx, String dataverse, String dataset,
+            String typeName) throws AlgebricksException {
+        IAType itemType = findType(mdTxnCtx, dataverse, typeName);
+        if (itemType == null) {
+            throw 
CompilationException.create(ErrorCode.UNKNOWN_TYPE_IN_DATAVERSE, typeName, 
dataverse);
+        }
+        ARecordType currentRecType = (ARecordType) itemType;
+        // Keeps the correct field type of a (nested) field if there is an 
enforced index on it.
+        List<Index> indexes = getDatasetIndexes(mdTxnCtx, dataverse, dataset);
+        for (Index index : indexes) {
+            if (!index.isEnforced()) {
+                continue;
+            }
+            List<List<String>> idxKeyFieldNames = index.getKeyFieldNames();
+            List<IAType> idxKeyFieldTypes = index.getKeyFieldTypes();
+            for (int i = 0; i < idxKeyFieldNames.size(); i++) {
+                List<String> currentFieldNames = idxKeyFieldNames.get(i);
+
+                // A field can be nested. Therefore, for a nested-field case, 
we either get the original record type
+                // at that level or create an empty open record type in case 
the field at that level doesn't exist.
+                // Since each getField(field-name) only deals with that level, 
we need to go through this process.
+                // E.g., a nested field - datasetA.b.c.d field access will be 
seen in the plan as:
+                // $3 <- assign ($2.getField("d"))
+                // $2 <- assign ($1.getField("c"))
+                // $1 <- assign ($REC.getField("b"))
+                // data-scan($PK,$REC) <- datasetA
+
+                // The last level doesn't require a record type since there's 
no sub-field. So, we deduct 1.
+                IAType origRecTypes[] = new IAType[currentFieldNames.size() - 
1];
+                String[] origRecFieldNames = currentRecType.getFieldNames();
+                IAType[] origRecFieldTypes = currentRecType.getFieldTypes();
+                for (int j = 0; j < currentFieldNames.size() - 1; j++) {
+                    boolean fieldFoundInRecordType = false;
+                    boolean searchNotNecessaryFromNow = false;
+                    if (!searchNotNecessaryFromNow) {
+                        for (int k = 0; k < origRecFieldNames.length; k++) {
+                            if 
(origRecFieldNames[k].equals(currentFieldNames.get(j))) {
+                                fieldFoundInRecordType = true;
+                                origRecTypes[j] = origRecFieldTypes[k];
+                                if (origRecTypes[j] instanceof ARecordType) {
+                                    origRecFieldNames = ((ARecordType) 
origRecTypes[j]).getFieldNames();
+                                    origRecFieldTypes = ((ARecordType) 
origRecTypes[j]).getFieldTypes();
+                                }
+                                break;
+                            }
+                        }
+                    }
+                    // An empty open-type from now on since there cannot be a 
defined type after this level.
+                    if (!fieldFoundInRecordType) {
+                        origRecTypes[j] = new ARecordType(null, new String[] 
{}, new IAType[] {}, true);
+                        searchNotNecessaryFromNow = true;
+                    }
+                }
+
+                // Now, appends the actual field information to the record 
types.
+                // This process happens backward since each parent needs to 
keep the information of its child.
+                IAType fieldType = null;
+                ARecordType tempRecType = null;
+                ARecordType childRecType = null;
+
+                fieldType = idxKeyFieldTypes.get(i);
+                // If this is not a nested-field, then we only need to add the 
"field:type" mapping.
+                if (currentFieldNames.size() == 1) {
+                    Map<String, IAType> enforcedFieldNameToTypeMap = 
currentRecType.getEnforcedFieldNameToTypeMap();
+                    
enforcedFieldNameToTypeMap.put(currentFieldNames.get(currentFieldNames.size() - 
1), fieldType);
+                    currentRecType = new ARecordType(currentRecType, 
enforcedFieldNameToTypeMap);
+                }
+                // For the non-leaf levels: we deduct 2 since we begin with 
the parent of the leaf level.
+                for (int j = currentFieldNames.size() - 2; j >= 0; j--) {
+                    Map<String, IAType> enforcedFieldNameToTypeMap =
+                            ((ARecordType) 
origRecTypes[j]).getEnforcedFieldNameToTypeMap();
+                    // leaf-level mapping
+                    if (j + 2 == currentFieldNames.size()) {
+                        
enforcedFieldNameToTypeMap.put(currentFieldNames.get(currentFieldNames.size() - 
1), fieldType);
+                        childRecType = new ARecordType((ARecordType) 
origRecTypes[j], enforcedFieldNameToTypeMap);
+                    } else {
+                        // non leaf-level mapping
+                        
enforcedFieldNameToTypeMap.put(currentFieldNames.get(j), childRecType);
+                        tempRecType = (ARecordType) origRecTypes[j];
+                        String[] recFieldNames = tempRecType.getFieldNames();
+                        IAType[] recFieldTypes = tempRecType.getFieldTypes();
+                        boolean isFieldFound = false;
+                        for (int k = 0; k < recFieldNames.length; k++) {
+                            if 
(recFieldNames[k].equals(currentFieldNames.get(j))) {
+                                isFieldFound = true;
+                                recFieldTypes[k] = childRecType;
+                            }
+                        }
+                        childRecType =
+                                !isFieldFound ? new ARecordType(tempRecType, 
enforcedFieldNameToTypeMap) : tempRecType;
+                    }
+                }
+
+                // Deals with the top level.
+                String[] curRecFieldNames = currentRecType.getFieldNames();
+                IAType[] curRecFieldTypes = currentRecType.getFieldTypes();
+                boolean fieldFound = false;
+                for (int j = 0; j < curRecFieldNames.length; j++) {
+                    if (curRecFieldNames[j].equals(currentFieldNames.get(0))) {
+                        curRecFieldTypes[j] = childRecType;
+                        fieldFound = true;
+                    }
+                }
+                Map<String, IAType> enforcedFieldNameToTypeMap = 
currentRecType.getEnforcedFieldNameToTypeMap();
+                if (!fieldFound && currentFieldNames.size() > 1) {
+                    enforcedFieldNameToTypeMap.put(currentFieldNames.get(0), 
childRecType);
+                }
+                itemType = !fieldFound ? new ARecordType(currentRecType, 
enforcedFieldNameToTypeMap) : currentRecType;
+            }
+            currentRecType = (ARecordType) itemType;
+        }
+        return itemType;
     }
 
     public static ARecordType findOutputRecordType(MetadataTransactionContext 
mdTxnCtx, String dataverse,
@@ -178,9 +297,10 @@
             throws AlgebricksException {
         Dataset dataset = findDataset(mdTxnCtx, aqlId.getDataverseName(), 
aqlId.getDatasourceName());
         if (dataset == null) {
-            throw new AlgebricksException("Datasource with id " + aqlId + " 
was not found.");
+            throw CompilationException.create(ErrorCode.DATASOURCE_NOT_FOUND, 
aqlId.toString());
         }
-        IAType itemType = findType(mdTxnCtx, 
dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+        IAType itemType = findTypeOfDataSet(mdTxnCtx, 
dataset.getItemTypeDataverseName(), dataset.getDatasetName(),
+                dataset.getItemTypeName());
         IAType metaItemType = findType(mdTxnCtx, 
dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
         INodeDomain domain = findNodeDomain(mdTxnCtx, 
dataset.getNodeGroupName());
         byte datasourceType = 
dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? 
DataSource.Type.EXTERNAL_DATASET
@@ -188,4 +308,5 @@
         return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, 
datasourceType,
                 dataset.getDatasetDetails(), domain);
     }
+
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 3b70ea9..e340899 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -310,7 +310,8 @@
     }
 
     public IAType findType(Dataset dataset) throws AlgebricksException {
-        return findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
+        return MetadataManagerUtil.findTypeOfDataSet(mdTxnCtx, 
dataset.getItemTypeDataverseName(),
+                dataset.getDatasetName(), dataset.getItemTypeName());
     }
 
     public IAType findMetaType(Dataset dataset) throws AlgebricksException {
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
index baaed59..21c7fe8 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
@@ -56,6 +56,8 @@
     // If allPossibleAdditionalFieldNames is null, that means compiler does 
not know
     // the bounded set of all possible additional field names.
     private final Set<String> allPossibleAdditionalFieldNames;
+    // the following map contains the actual field type of an open-type field 
with an enforced-index on it.
+    private final Map<String, IAType> enforcedFieldNameToTypeMap;
 
     /**
      * @param typeName
@@ -68,7 +70,22 @@
      *            whether the record is open
      */
     public ARecordType(String typeName, String[] fieldNames, IAType[] 
fieldTypes, boolean isOpen) {
-        this(typeName, fieldNames, fieldTypes, isOpen, null);
+        this(typeName, fieldNames, fieldTypes, isOpen, null, null);
+    }
+
+    public ARecordType(String typeName, String[] fieldNames, IAType[] 
fieldTypes, boolean isOpen,
+            Map<String, IAType> addEnforcedfieldNameToTypeMap) {
+        this(typeName, fieldNames, fieldTypes, isOpen, null, 
addEnforcedfieldNameToTypeMap);
+    }
+
+    public ARecordType(ARecordType curRecType, Map<String, IAType> 
addEnforcedfieldNameToTypeMap) {
+        this(curRecType.getTypeName(), curRecType.getFieldNames(), 
curRecType.getFieldTypes(), curRecType.isOpen(),
+                null, addEnforcedfieldNameToTypeMap);
+    }
+
+    public ARecordType(String typeName, String[] fieldNames, IAType[] 
fieldTypes, boolean isOpen,
+            Set<String> allPossibleAdditionalFieldNames) {
+        this(typeName, fieldNames, fieldTypes, isOpen, null, null);
     }
 
     /**
@@ -82,9 +99,11 @@
      *            whether the record is open
      * @param allPossibleAdditionalFieldNames,
      *            all possible additional field names.
+     * @param addEnforcedfieldNameToTypeMap
+     *            a map contains a field name with an enforced index to its 
type
      */
     public ARecordType(String typeName, String[] fieldNames, IAType[] 
fieldTypes, boolean isOpen,
-            Set<String> allPossibleAdditionalFieldNames) {
+            Set<String> allPossibleAdditionalFieldNames, Map<String, IAType> 
addEnforcedfieldNameToTypeMap) {
         super(typeName);
         this.fieldNames = fieldNames;
         this.fieldTypes = fieldTypes;
@@ -95,6 +114,8 @@
             fieldNameToIndexMap.put(fieldNames[index], index);
         }
         this.allPossibleAdditionalFieldNames = allPossibleAdditionalFieldNames;
+        this.enforcedFieldNameToTypeMap =
+                addEnforcedfieldNameToTypeMap == null ? new HashMap<>() : 
addEnforcedfieldNameToTypeMap;
     }
 
     public boolean canContainField(String fieldName) {
@@ -126,6 +147,10 @@
 
     public IAType[] getFieldTypes() {
         return fieldTypes;
+    }
+
+    public Map<String, IAType> getEnforcedFieldNameToTypeMap() {
+        return enforcedFieldNameToTypeMap;
     }
 
     public List<IRecordTypeAnnotation> getAnnotations() {
@@ -218,7 +243,6 @@
                     throw new AsterixException(
                             "Field accessor is not defined for values of type 
" + subRecordType.getTypeTag());
                 }
-
             }
             subRecordType = ((ARecordType) 
subRecordType).getFieldType(subFieldName.get(i));
         }
@@ -236,7 +260,8 @@
     public IAType getFieldType(String fieldName) {
         int fieldPos = getFieldIndex(fieldName);
         if ((fieldPos < 0) || (fieldPos >= fieldTypes.length)) {
-            return null;
+            // a field-name with an enforced-index case?
+            return enforcedFieldNameToTypeMap.containsKey(fieldName) ? 
enforcedFieldNameToTypeMap.get(fieldName) : null;
         }
         return fieldTypes[fieldPos];
     }
@@ -270,7 +295,7 @@
                 newTypes[i] = type.fieldTypes[i];
             }
         }
-        return new ARecordType(type.typeName, type.fieldNames, newTypes, 
type.isOpen);
+        return new ARecordType(type.typeName, type.fieldNames, newTypes, 
type.isOpen, type.enforcedFieldNameToTypeMap);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/common/TypeResolverUtilTest.java
 
b/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/common/TypeResolverUtilTest.java
index 5303870..56ce29a 100644
--- 
a/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/common/TypeResolverUtilTest.java
+++ 
b/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/common/TypeResolverUtilTest.java
@@ -44,9 +44,9 @@
     public void testRecordType() {
         // Constructs input types.
         ARecordType leftRecordType = new ARecordType(null, new String[] { "a", 
"b" },
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, 
false, null);
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, 
false, null, null);
         ARecordType rightRecordType = new ARecordType(null, new String[] { 
"b", "c" },
-                new IAType[] { BuiltinType.AINT32, BuiltinType.ABINARY }, 
false, null);
+                new IAType[] { BuiltinType.AINT32, BuiltinType.ABINARY }, 
false, null, null);
 
         // Resolves input types to a generalized type.
         List<IAType> inputTypes = new ArrayList<>();
@@ -71,9 +71,9 @@
     public void testIsmophicRecordType() {
         // Constructs input types.
         ARecordType leftRecordType = new ARecordType(null, new String[] { "a", 
"b" },
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, 
false, null);
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, 
false, null, null);
         ARecordType rightRecordType = new ARecordType(null, new String[] { 
"b", "a" },
-                new IAType[] { BuiltinType.AINT32, BuiltinType.ASTRING }, 
false, null);
+                new IAType[] { BuiltinType.AINT32, BuiltinType.ASTRING }, 
false, null, null);
 
         // Resolves input types to a generalized type.
         List<IAType> inputTypes = new ArrayList<>();
@@ -92,14 +92,14 @@
                 new ARecordType("null", new String[] { "a", "b" },
                         new IAType[] { BuiltinType.ASTRING,
                                 new ARecordType(null, new String[] { "c", "d" 
},
-                                        new IAType[] { BuiltinType.ASTRING, 
BuiltinType.AINT32 }, false, null) },
-                        false, null);
+                                        new IAType[] { BuiltinType.ASTRING, 
BuiltinType.AINT32 }, false, null, null) },
+                        false, null, null);
         ARecordType rightRecordType =
                 new ARecordType("null", new String[] { "a", "b" },
                         new IAType[] { BuiltinType.ASTRING,
                                 new ARecordType(null, new String[] { "d", "e" 
},
-                                        new IAType[] { BuiltinType.AINT32, 
BuiltinType.AINT32 }, false, null) },
-                        false, null);
+                                        new IAType[] { BuiltinType.AINT32, 
BuiltinType.AINT32 }, false, null, null) },
+                        false, null, null);
 
         // Resolves input types to a generalized type.
         List<IAType> inputTypes = new ArrayList<>();
@@ -116,7 +116,7 @@
                 new ARecordType(null, new String[] { "a", "b" },
                         new IAType[] { BuiltinType.ASTRING, new 
ARecordType(null, new String[] { "d" },
                                 new IAType[] { BuiltinType.AINT32 }, true, 
nestedPossibleAdditionalFields) },
-                        false, null);
+                        false, null, null);
 
         // Compares the resolved type with the expected type.
         Assert.assertEquals(expectedType, resolvedType);
@@ -177,7 +177,7 @@
     @Test
     public void testNullType() {
         ARecordType leftRecordType = new ARecordType(null, new String[] { "a", 
"b" },
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, 
false, null);
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, 
false, null, null);
         List<IAType> inputTypes = new ArrayList<>();
         inputTypes.add(leftRecordType);
         inputTypes.add(BuiltinType.ANULL);
@@ -188,7 +188,7 @@
     @Test
     public void testMissingType() {
         ARecordType leftRecordType = new ARecordType(null, new String[] { "a", 
"b" },
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, 
false, null);
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, 
false, null, null);
         List<IAType> inputTypes = new ArrayList<>();
         inputTypes.add(leftRecordType);
         inputTypes.add(BuiltinType.AMISSING);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1880
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib353c85bf627d8dd65dba0ea307dee428edb4a25
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Taewoo Kim <wangs...@gmail.com>

Reply via email to