This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dc9718b745 [ASTERIXDB-3457][FUN] Add query-partition() to get all 
tuples in a partition
dc9718b745 is described below

commit dc9718b745f5ea3c7f9e5be78af5b436541c3aac
Author: Ali Alsuliman <ali.al.solai...@gmail.com>
AuthorDate: Sat Jul 13 15:24:56 2024 +0300

    [ASTERIXDB-3457][FUN] Add query-partition() to get all tuples in a partition
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Add internal query-partition() to get all tuples in a partition.
    
    Ext-ref: MB-62720
    Change-Id: I37185d159a38d26c8cc93ddd6500e437891c44f5
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18483
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Tested-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../app/function/QueryPartitionDatasource.java     | 131 ++++++++++++
 .../app/function/QueryPartitionRewriter.java       | 220 +++++++++++++++++++++
 .../asterix/util/MetadataBuiltinFunctions.java     |   6 +
 .../metadata/declared/MetadataProvider.java        |  45 ++++-
 .../hyracks/hyracks-storage-am-btree/pom.xml       |   4 +
 .../BTreePartitionSearchOperatorDescriptor.java    |  72 +++++++
 .../BTreePartitionSearchOperatorNodePushable.java  |  76 +++++++
 .../dataflow/IndexSearchOperatorNodePushable.java  |   2 +-
 8 files changed, 549 insertions(+), 7 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java
new file mode 100644
index 0000000000..4d40ba6913
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import 
org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import 
org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+
+public class QueryPartitionDatasource extends FunctionDataSource {
+
+    private final Dataset ds;
+    private final AlgebricksAbsolutePartitionConstraint storageLocations;
+    private final int partitionNum;
+
+    public QueryPartitionDatasource(Dataset ds, INodeDomain domain,
+            AlgebricksAbsolutePartitionConstraint storageLocations, 
ARecordType recType, int partitionNum)
+            throws AlgebricksException {
+        super(createQueryPartitionDataSourceId(ds), 
QueryPartitionRewriter.QUERY_PARTITION, domain, recType);
+        if (partitionNum < 0) {
+            throw new IllegalArgumentException("partition must be >= 0");
+        }
+        this.partitionNum = partitionNum;
+        this.ds = ds;
+        this.storageLocations = storageLocations;
+    }
+
+    @Override
+    protected void initSchemaType(IAType iType) {
+        ARecordType type = (ARecordType) iType;
+        IAType[] fieldTypes = type.getFieldTypes();
+        schemaTypes = new IAType[fieldTypes.length];
+        System.arraycopy(fieldTypes, 0, schemaTypes, 0, schemaTypes.length);
+    }
+
+    @Override
+    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm, MetadataProvider md) {
+        return storageLocations;
+    }
+
+    @Override
+    public boolean isScanAccessPathALeaf() {
+        // the index scan op is not a leaf op. the ETS op will start the scan 
of the index. we need the ETS op below
+        // the index scan to be still generated
+        return false;
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider 
metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        throw new UnsupportedOperationException("query-partition() does not 
use record reader adapter");
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildDatasourceScanRuntime(
+            MetadataProvider metadataProvider, IDataSource<DataSourceId> 
dataSource,
+            List<LogicalVariable> scanVariables, List<LogicalVariable> 
projectVariables, boolean projectPushed,
+            List<LogicalVariable> minFilterVars, List<LogicalVariable> 
maxFilterVars,
+            ITupleFilterFactory tupleFilterFactory, long outputLimit, 
IOperatorSchema opSchema,
+            IVariableTypeEnvironment typeEnv, JobGenContext context, 
JobSpecification jobSpec, Object implConfig,
+            IProjectionFiltrationInfo projectionInfo) throws 
AlgebricksException {
+        return metadataProvider.getBtreePartitionSearchRuntime(jobSpec, 
opSchema, typeEnv, context, ds,
+                tupleFilterFactory, outputLimit, partitionNum);
+    }
+
+    @Override
+    public IDataSourcePropertiesProvider getPropertiesProvider() {
+        return new IDataSourcePropertiesProvider() {
+            @Override
+            public IPhysicalPropertiesVector 
computeRequiredProperties(List<LogicalVariable> scanVariables,
+                    IOptimizationContext ctx) {
+                return StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+            }
+
+            @Override
+            public IPhysicalPropertiesVector 
computeDeliveredProperties(List<LogicalVariable> scanVariables,
+                    IOptimizationContext ctx) {
+                List<ILocalStructuralProperty> propsLocal = new ArrayList<>(1);
+                return new StructuralPropertiesVector(new 
RandomPartitioningProperty(domain), propsLocal);
+            }
+        };
+    }
+
+    private static DataSourceId createQueryPartitionDataSourceId(Dataset 
dataset) {
+        return new DataSourceId(dataset.getDatabaseName(), 
dataset.getDataverseName(), dataset.getDatasetName(),
+                new String[] { dataset.getDatasetName(), 
QueryPartitionRewriter.QUERY_PARTITION.getName() });
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java
new file mode 100644
index 0000000000..4514a30362
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import static 
org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier.VARARGS;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+/**
+ * query-partition("db", "dv", "ds", 0);
+ * query-partition("dv", "ds", 0);
+ */
+public class QueryPartitionRewriter extends FunctionRewriter implements 
IResultTypeComputer {
+
+    public static final FunctionIdentifier QUERY_PARTITION = 
FunctionConstants.newAsterix("query-partition", VARARGS);
+    public static final QueryPartitionRewriter INSTANCE = new 
QueryPartitionRewriter(QUERY_PARTITION);
+
+    private QueryPartitionRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    public IAType computeType(ILogicalExpression expression, 
IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+            throws AlgebricksException {
+        return computeRecType((AbstractFunctionCallExpression) expression, 
(MetadataProvider) mp, null, null, null);
+    }
+
+    @Override
+    public FunctionDataSource toDatasource(IOptimizationContext ctx, 
AbstractFunctionCallExpression f)
+            throws AlgebricksException {
+        final SourceLocation loc = f.getSourceLocation();
+        int numArgs = f.getArguments().size();
+        int nextArg = 0;
+        if (numArgs > 3) {
+            nextArg++;
+        }
+        DataverseName dvName = getDataverseName(loc, f.getArguments(), 
nextArg++);
+        String dsName = getString(loc, f.getArguments(), nextArg++);
+        Long partitionNum = ConstantExpressionUtil.getLongArgument(f, nextArg);
+        if (partitionNum == null) {
+            throw new IllegalArgumentException("partition number should be a 
number");
+        }
+        String dbName;
+        if (numArgs > 3) {
+            dbName = getString(loc, f.getArguments(), 0);
+        } else {
+            dbName = MetadataUtil.databaseFor(dvName);
+        }
+        MetadataProvider mp = (MetadataProvider) ctx.getMetadataProvider();
+        final Dataset dataset = validateDataset(mp, dbName, dvName, dsName, 
loc);
+        return createQueryPartitionDatasource(mp, dataset, 
partitionNum.intValue(), loc, f);
+    }
+
+    @Override
+    protected void createDataScanOp(Mutable<ILogicalOperator> opRef, 
UnnestOperator unnest, IOptimizationContext ctx,
+            AbstractFunctionCallExpression f) throws AlgebricksException {
+        FunctionDataSource datasource = toDatasource(ctx, f);
+        List<LogicalVariable> variables = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> closedRecArgs = new ArrayList<>();
+        MetadataProvider mp = (MetadataProvider) ctx.getMetadataProvider();
+        computeRecType(f, mp, variables, closedRecArgs, ctx);
+        DataSourceScanOperator scan = new DataSourceScanOperator(variables, 
datasource);
+        scan.setSourceLocation(unnest.getSourceLocation());
+        List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+        scanInpList.addAll(unnest.getInputs());
+        ScalarFunctionCallExpression recordCreationFunc = new 
ScalarFunctionCallExpression(
+                
FunctionUtil.getFunctionInfo(BuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR), 
closedRecArgs);
+        recordCreationFunc.setSourceLocation(unnest.getSourceLocation());
+        AssignOperator assignOp = new AssignOperator(unnest.getVariable(), new 
MutableObject<>(recordCreationFunc));
+        assignOp.getInputs().add(new MutableObject<>(scan));
+        assignOp.setSourceLocation(unnest.getSourceLocation());
+        ctx.computeAndSetTypeEnvironmentForOperator(scan);
+        ctx.computeAndSetTypeEnvironmentForOperator(assignOp);
+        opRef.setValue(assignOp);
+    }
+
+    @Override
+    protected boolean invalidArgs(List<Mutable<ILogicalExpression>> args) {
+        return args.size() < 3;
+    }
+
+    private FunctionDataSource createQueryPartitionDatasource(MetadataProvider 
mp, Dataset ds, int partitionNum,
+            SourceLocation loc, AbstractFunctionCallExpression f) throws 
AlgebricksException {
+        INodeDomain domain = mp.findNodeDomain(ds.getNodeGroupName());
+        PartitioningProperties partitioningProperties = 
mp.getPartitioningProperties(ds);
+        AlgebricksPartitionConstraint constraints = 
partitioningProperties.getConstraints();
+        ARecordType recType = computeRecType(f, mp, null, null, null);
+        return new QueryPartitionDatasource(ds, domain, 
(AlgebricksAbsolutePartitionConstraint) constraints, recType,
+                partitionNum);
+    }
+
+    private ARecordType computeRecType(AbstractFunctionCallExpression f, 
MetadataProvider metadataProvider,
+            List<LogicalVariable> outVars, List<Mutable<ILogicalExpression>> 
closedRecArgs,
+            IOptimizationContext context) throws AlgebricksException {
+        final SourceLocation loc = f.getSourceLocation();
+        int numArgs = f.getArguments().size();
+        int nextArg = 0;
+        if (numArgs > 3) {
+            nextArg++;
+        }
+        DataverseName dvName = getDataverseName(loc, f.getArguments(), 
nextArg++);
+        String dsName = getString(loc, f.getArguments(), nextArg++);
+        String dbName;
+        if (numArgs > 3) {
+            dbName = getString(loc, f.getArguments(), 0);
+        } else {
+            dbName = MetadataUtil.databaseFor(dvName);
+        }
+        Dataset dataset = validateDataset(metadataProvider, dbName, dvName, 
dsName, loc);
+        ARecordType dsType = (ARecordType) metadataProvider.findType(dataset);
+        ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, 
dataset);
+        dsType = (ARecordType) 
metadataProvider.findTypeForDatasetWithoutType(dsType, metaType, dataset);
+
+        List<IAType> dsKeyTypes = 
KeyFieldTypeUtil.getPartitoningKeyTypes(dataset, dsType, metaType);
+        List<List<String>> primaryKeys = dataset.getPrimaryKeys();
+        int numPrimaryKeys = dsKeyTypes.size();
+        int numPayload = metaType == null ? 1 : 2;
+        String[] fieldNames = new String[numPrimaryKeys + numPayload];
+        IAType[] fieldTypes = new IAType[numPrimaryKeys + numPayload];
+        int keyIdx = 0;
+        for (int k = 0; k < numPrimaryKeys; k++, keyIdx++) {
+            fieldTypes[keyIdx] = dsKeyTypes.get(k);
+            fieldNames[keyIdx] = StringUtils.join(primaryKeys.get(k), ".");
+            setAssignVarsExprs(outVars, closedRecArgs, context, loc, 
fieldNames, keyIdx);
+        }
+        fieldTypes[keyIdx] = dsType;
+        fieldNames[keyIdx] = "rec";
+        setAssignVarsExprs(outVars, closedRecArgs, context, loc, fieldNames, 
keyIdx);
+        if (metaType != null) {
+            keyIdx++;
+            fieldTypes[keyIdx] = metaType;
+            fieldNames[keyIdx] = "meta";
+            setAssignVarsExprs(outVars, closedRecArgs, context, loc, 
fieldNames, keyIdx);
+        }
+        return new ARecordType("", fieldNames, fieldTypes, false);
+    }
+
+    private void setAssignVarsExprs(List<LogicalVariable> outVars, 
List<Mutable<ILogicalExpression>> closedRecArgs,
+            IOptimizationContext context, SourceLocation loc, String[] 
fieldNames, int n) {
+        if (context != null) {
+            LogicalVariable logicalVariable = context.newVar();
+            outVars.add(logicalVariable);
+            ConstantExpression nameExpr = new ConstantExpression(new 
AsterixConstantValue(new AString(fieldNames[n])));
+            VariableReferenceExpression varRefExpr = new 
VariableReferenceExpression(logicalVariable);
+            nameExpr.setSourceLocation(loc);
+            varRefExpr.setSourceLocation(loc);
+            closedRecArgs.add(new MutableObject<>(nameExpr));
+            closedRecArgs.add(new MutableObject<>(varRefExpr));
+        }
+    }
+
+    private static Dataset validateDataset(MetadataProvider mp, String dbName, 
DataverseName dvName, String dsName,
+            SourceLocation loc) throws AlgebricksException {
+        Dataset dataset = mp.findDataset(dbName, dvName, dsName);
+        if (dataset == null) {
+            throw new 
CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, loc, dsName,
+                    MetadataUtil.dataverseName(dbName, dvName, 
mp.isUsingDatabase()));
+        }
+        return dataset;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index 5a2ef3c768..9d39088534 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -27,6 +27,7 @@ import org.apache.asterix.app.function.FeedRewriter;
 import org.apache.asterix.app.function.JobSummariesRewriter;
 import org.apache.asterix.app.function.PingRewriter;
 import org.apache.asterix.app.function.QueryIndexRewriter;
+import org.apache.asterix.app.function.QueryPartitionRewriter;
 import org.apache.asterix.app.function.StorageComponentsRewriter;
 import org.apache.asterix.app.function.TPCDSAllTablesDataGeneratorRewriter;
 import org.apache.asterix.app.function.TPCDSSingleTableDataGeneratorRewriter;
@@ -100,6 +101,11 @@ public class MetadataBuiltinFunctions {
         BuiltinFunctions.addFunction(QueryIndexRewriter.QUERY_INDEX, 
QueryIndexRewriter.INSTANCE, true);
         BuiltinFunctions.addUnnestFun(QueryIndexRewriter.QUERY_INDEX, false);
         BuiltinFunctions.addDatasourceFunction(QueryIndexRewriter.QUERY_INDEX, 
QueryIndexRewriter.INSTANCE);
+        // Query index partition function
+        
BuiltinFunctions.addPrivateFunction(QueryPartitionRewriter.QUERY_PARTITION, 
QueryPartitionRewriter.INSTANCE,
+                true);
+        BuiltinFunctions.addUnnestFun(QueryPartitionRewriter.QUERY_PARTITION, 
false);
+        
BuiltinFunctions.addDatasourceFunction(QueryPartitionRewriter.QUERY_PARTITION, 
QueryPartitionRewriter.INSTANCE);
     }
 
     private MetadataBuiltinFunctions() {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 023b01cd53..fe2127cea3 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -164,12 +164,14 @@ import 
org.apache.hyracks.data.std.primitive.ShortPointable;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import 
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.btree.dataflow.BTreePartitionSearchOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeBatchPointSearchOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
@@ -596,6 +598,15 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
     }
 
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getBtreePartitionSearchRuntime(
+            JobSpecification jobSpec, IOperatorSchema opSchema, 
IVariableTypeEnvironment typeEnv, JobGenContext context,
+            Dataset dataset, ITupleFilterFactory tupleFilterFactory, long 
outputLimit, int partitionNum)
+            throws AlgebricksException {
+        return getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, 
true, false, null, dataset,
+                dataset.getDatasetName(), null, null, true, true, false, null, 
null, null, tupleFilterFactory,
+                outputLimit, false, false, 
DefaultTupleProjectorFactory.INSTANCE, false, partitionNum);
+    }
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getBtreeSearchRuntime(JobSpecification jobSpec,
             IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, 
JobGenContext context, boolean retainInput,
             boolean retainMissing, IMissingWriterFactory 
nonMatchWriterFactory, Dataset dataset, String indexName,
@@ -604,6 +615,21 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             int[] maxFilterFieldIndexes, ITupleFilterFactory 
tupleFilterFactory, long outputLimit,
             boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, 
ITupleProjectorFactory tupleProjectorFactory,
             boolean partitionInputTuples) throws AlgebricksException {
+        return getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, 
retainInput, retainMissing,
+                nonMatchWriterFactory, dataset, indexName, lowKeyFields, 
highKeyFields, lowKeyInclusive,
+                highKeyInclusive, propagateFilter, nonFilterWriterFactory, 
minFilterFieldIndexes, maxFilterFieldIndexes,
+                tupleFilterFactory, outputLimit, isIndexOnlyPlan, 
isPrimaryIndexPointSearch, tupleProjectorFactory,
+                partitionInputTuples, -1);
+    }
+
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getBtreeSearchRuntime(JobSpecification jobSpec,
+            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, 
JobGenContext context, boolean retainInput,
+            boolean retainMissing, IMissingWriterFactory 
nonMatchWriterFactory, Dataset dataset, String indexName,
+            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, 
boolean highKeyInclusive,
+            boolean propagateFilter, IMissingWriterFactory 
nonFilterWriterFactory, int[] minFilterFieldIndexes,
+            int[] maxFilterFieldIndexes, ITupleFilterFactory 
tupleFilterFactory, long outputLimit,
+            boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, 
ITupleProjectorFactory tupleProjectorFactory,
+            boolean partitionInputTuples, int targetPartition) throws 
AlgebricksException {
         boolean isSecondary = true;
         Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDatabaseName(),
                 dataset.getDataverseName(), dataset.getDatasetName(), 
dataset.getDatasetName());
@@ -678,12 +704,19 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                             retainMissing, nonMatchWriterFactory, 
searchCallbackFactory, minFilterFieldIndexes,
                             maxFilterFieldIndexes, tupleFilterFactory, 
outputLimit, tupleProjectorFactory,
                             tuplePartitionerFactory, partitionsMap)
-                    : new BTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc, lowKeyFields, highKeyFields,
-                            lowKeyInclusive, highKeyInclusive, 
indexHelperFactory, retainInput, retainMissing,
-                            nonMatchWriterFactory, searchCallbackFactory, 
minFilterFieldIndexes, maxFilterFieldIndexes,
-                            propagateFilter, nonFilterWriterFactory, 
tupleFilterFactory, outputLimit,
-                            proceedIndexOnlyPlan, failValueForIndexOnlyPlan, 
successValueForIndexOnlyPlan,
-                            tupleProjectorFactory, tuplePartitionerFactory, 
partitionsMap);
+                    : targetPartition < 0 ? new 
BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
+                            highKeyFields, lowKeyInclusive, highKeyInclusive, 
indexHelperFactory, retainInput,
+                            retainMissing, nonMatchWriterFactory, 
searchCallbackFactory, minFilterFieldIndexes,
+                            maxFilterFieldIndexes, propagateFilter, 
nonFilterWriterFactory, tupleFilterFactory,
+                            outputLimit, proceedIndexOnlyPlan, 
failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
+                            tupleProjectorFactory, tuplePartitionerFactory, 
partitionsMap)
+                            : new 
BTreePartitionSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
+                                    highKeyFields, lowKeyInclusive, 
highKeyInclusive, indexHelperFactory, retainInput,
+                                    retainMissing, nonMatchWriterFactory, 
searchCallbackFactory, minFilterFieldIndexes,
+                                    maxFilterFieldIndexes, propagateFilter, 
nonFilterWriterFactory, tupleFilterFactory,
+                                    outputLimit, proceedIndexOnlyPlan, 
failValueForIndexOnlyPlan,
+                                    successValueForIndexOnlyPlan, 
tupleProjectorFactory, tuplePartitionerFactory,
+                                    partitionsMap, targetPartition);
         } else {
             btreeSearchOp = null;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
index 52cd6cf1bb..f3881e2914 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
@@ -109,6 +109,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
new file mode 100644
index 0000000000..dfa59b71a8
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.btree.dataflow;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class BTreePartitionSearchOperatorDescriptor extends 
BTreeSearchOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final int targetStoragePartition;
+
+    public BTreePartitionSearchOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor outRecDesc,
+            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, 
boolean highKeyInclusive,
+            IIndexDataflowHelperFactory indexHelperFactory, boolean 
retainInput, boolean retainMissing,
+            IMissingWriterFactory missingWriterFactory, 
ISearchOperationCallbackFactory searchCallbackFactory,
+            int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean 
appendIndexFilter,
+            IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory 
tupleFilterFactory, long outputLimit,
+            boolean appendOpCallbackProceedResult, byte[] 
searchCallbackProceedResultFalseValue,
+            byte[] searchCallbackProceedResultTrueValue, 
ITupleProjectorFactory tupleProjectorFactory,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap, int targetStoragePartition) {
+        super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, 
highKeyInclusive, indexHelperFactory,
+                retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, minFilterFieldIndexes,
+                maxFilterFieldIndexes, appendIndexFilter, 
nonFilterWriterFactory, tupleFilterFactory, outputLimit,
+                appendOpCallbackProceedResult, 
searchCallbackProceedResultFalseValue,
+                searchCallbackProceedResultTrueValue, tupleProjectorFactory, 
tuplePartitionerFactory, partitionsMap);
+        this.targetStoragePartition = targetStoragePartition;
+    }
+
+    @Override
+    public BTreeSearchOperatorNodePushable createPushRuntime(final 
IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
+        return new BTreePartitionSearchOperatorNodePushable(ctx, partition,
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), lowKeyFields, highKeyFields,
+                lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
+                retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, appendIndexFilter,
+                nonFilterWriterFactory, tupleFilterFactory, outputLimit, 
appendOpCallbackProceedResult,
+                searchCallbackProceedResultFalseValue, 
searchCallbackProceedResultTrueValue, tupleProjectorFactory,
+                tuplePartitionerFactory, partitionsMap, 
targetStoragePartition);
+    }
+
+    @Override
+    public String getDisplayName() {
+        return "BTree Partition Search";
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
new file mode 100644
index 0000000000..75187914f4
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class BTreePartitionSearchOperatorNodePushable extends 
BTreeSearchOperatorNodePushable {
+
+    private final int pIdx;
+
+    public BTreePartitionSearchOperatorNodePushable(IHyracksTaskContext ctx, 
int partition,
+            RecordDescriptor inputRecDesc, int[] lowKeyFields, int[] 
highKeyFields, boolean lowKeyInclusive,
+            boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] 
maxFilterFieldIndexes,
+            IIndexDataflowHelperFactory indexHelperFactory, boolean 
retainInput, boolean retainMissing,
+            IMissingWriterFactory nonMatchWriterFactory, 
ISearchOperationCallbackFactory searchCallbackFactory,
+            boolean appendIndexFilter, IMissingWriterFactory 
nonFilterWriterFactory,
+            ITupleFilterFactory tupleFilterFactory, long outputLimit, boolean 
appendOpCallbackProceedResult,
+            byte[] searchCallbackProceedResultFalseValue, byte[] 
searchCallbackProceedResultTrueValue,
+            ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory 
tuplePartitionerFactory,
+            int[][] partitionsMap, int targetStoragePartition) throws 
HyracksDataException {
+        super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, 
lowKeyInclusive, highKeyInclusive,
+                minFilterFieldIndexes, maxFilterFieldIndexes, 
indexHelperFactory, retainInput, retainMissing,
+                nonMatchWriterFactory, searchCallbackFactory, 
appendIndexFilter, nonFilterWriterFactory,
+                tupleFilterFactory, outputLimit, 
appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
+                searchCallbackProceedResultTrueValue, projectorFactory, 
tuplePartitionerFactory, partitionsMap);
+        pIdx = storagePartitionId2Index.getOrDefault(targetStoragePartition, 
Integer.MIN_VALUE);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        try {
+            searchPartition(tupleCount);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void searchPartition(int tupleCount) throws Exception {
+        if (pIdx >= 0 && pIdx < cursors.length) {
+            for (int i = 0; i < tupleCount && !finished; i++) {
+                resetSearchPredicate(i);
+                cursors[pIdx].close();
+                indexAccessors[pIdx].search(cursors[pIdx], searchPred);
+                writeSearchResults(i, cursors[pIdx]);
+            }
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 91b87c66a2..da5df23997 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -120,7 +120,7 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
     protected final ITupleProjector tupleProjector;
     protected final ITuplePartitioner tuplePartitioner;
     protected final int[] partitions;
-    private final Int2IntMap storagePartitionId2Index = new 
Int2IntOpenHashMap();
+    protected final Int2IntMap storagePartitionId2Index = new 
Int2IntOpenHashMap();
 
     public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, 
RecordDescriptor inputRecDesc, int partition,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, 
IIndexDataflowHelperFactory indexHelperFactory,


Reply via email to