This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new dc9718b745 [ASTERIXDB-3457][FUN] Add query-partition() to get all tuples in a partition dc9718b745 is described below commit dc9718b745f5ea3c7f9e5be78af5b436541c3aac Author: Ali Alsuliman <ali.al.solai...@gmail.com> AuthorDate: Sat Jul 13 15:24:56 2024 +0300 [ASTERIXDB-3457][FUN] Add query-partition() to get all tuples in a partition - user model changes: no - storage format changes: no - interface changes: no Details: Add internal query-partition() to get all tuples in a partition. Ext-ref: MB-62720 Change-Id: I37185d159a38d26c8cc93ddd6500e437891c44f5 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18483 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Tested-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../app/function/QueryPartitionDatasource.java | 131 ++++++++++++ .../app/function/QueryPartitionRewriter.java | 220 +++++++++++++++++++++ .../asterix/util/MetadataBuiltinFunctions.java | 6 + .../metadata/declared/MetadataProvider.java | 45 ++++- .../hyracks/hyracks-storage-am-btree/pom.xml | 4 + .../BTreePartitionSearchOperatorDescriptor.java | 72 +++++++ .../BTreePartitionSearchOperatorNodePushable.java | 76 +++++++ .../dataflow/IndexSearchOperatorNodePushable.java | 2 +- 8 files changed, 549 insertions(+), 7 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java new file mode 100644 index 0000000000..4d40ba6913 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.metadata.api.IDatasourceFunction; +import org.apache.asterix.metadata.declared.DataSourceId; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource; +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider; +import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; + +public class QueryPartitionDatasource extends FunctionDataSource { + + private final Dataset ds; + private final AlgebricksAbsolutePartitionConstraint storageLocations; + private final int partitionNum; + + public QueryPartitionDatasource(Dataset ds, INodeDomain domain, + AlgebricksAbsolutePartitionConstraint storageLocations, ARecordType recType, int partitionNum) + throws AlgebricksException { + super(createQueryPartitionDataSourceId(ds), QueryPartitionRewriter.QUERY_PARTITION, domain, recType); + if (partitionNum < 0) { + throw new IllegalArgumentException("partition must be >= 0"); + } + this.partitionNum = partitionNum; + this.ds = ds; + this.storageLocations = storageLocations; + } + + @Override + protected void initSchemaType(IAType iType) { + ARecordType type = (ARecordType) iType; + IAType[] fieldTypes = type.getFieldTypes(); + schemaTypes = new IAType[fieldTypes.length]; + System.arraycopy(fieldTypes, 0, schemaTypes, 0, schemaTypes.length); + } + + @Override + protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) { + return storageLocations; + } + + @Override + public boolean isScanAccessPathALeaf() { + // the index scan op is not a leaf op. the ETS op will start the scan of the index. we need the ETS op below + // the index scan to be still generated + return false; + } + + @Override + protected IDatasourceFunction createFunction(MetadataProvider metadataProvider, + AlgebricksAbsolutePartitionConstraint locations) { + throw new UnsupportedOperationException("query-partition() does not use record reader adapter"); + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime( + MetadataProvider metadataProvider, IDataSource<DataSourceId> dataSource, + List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed, + List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, + ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema, + IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig, + IProjectionFiltrationInfo projectionInfo) throws AlgebricksException { + return metadataProvider.getBtreePartitionSearchRuntime(jobSpec, opSchema, typeEnv, context, ds, + tupleFilterFactory, outputLimit, partitionNum); + } + + @Override + public IDataSourcePropertiesProvider getPropertiesProvider() { + return new IDataSourcePropertiesProvider() { + @Override + public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables, + IOptimizationContext ctx) { + return StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR; + } + + @Override + public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables, + IOptimizationContext ctx) { + List<ILocalStructuralProperty> propsLocal = new ArrayList<>(1); + return new StructuralPropertiesVector(new RandomPartitioningProperty(domain), propsLocal); + } + }; + } + + private static DataSourceId createQueryPartitionDataSourceId(Dataset dataset) { + return new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(), + new String[] { dataset.getDatasetName(), QueryPartitionRewriter.QUERY_PARTITION.getName() }); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java new file mode 100644 index 0000000000..4514a30362 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import static org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier.VARARGS; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.common.cluster.PartitioningProperties; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.functions.FunctionConstants; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.metadata.MetadataUtil; +import org.apache.asterix.lang.common.util.FunctionUtil; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.utils.DatasetUtil; +import org.apache.asterix.metadata.utils.KeyFieldTypeUtil; +import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.constants.AsterixConstantValue; +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.typecomputer.base.IResultTypeComputer; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.utils.ConstantExpressionUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; +import org.apache.hyracks.api.exceptions.SourceLocation; + +/** + * query-partition("db", "dv", "ds", 0); + * query-partition("dv", "ds", 0); + */ +public class QueryPartitionRewriter extends FunctionRewriter implements IResultTypeComputer { + + public static final FunctionIdentifier QUERY_PARTITION = FunctionConstants.newAsterix("query-partition", VARARGS); + public static final QueryPartitionRewriter INSTANCE = new QueryPartitionRewriter(QUERY_PARTITION); + + private QueryPartitionRewriter(FunctionIdentifier functionId) { + super(functionId); + } + + @Override + public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp) + throws AlgebricksException { + return computeRecType((AbstractFunctionCallExpression) expression, (MetadataProvider) mp, null, null, null); + } + + @Override + public FunctionDataSource toDatasource(IOptimizationContext ctx, AbstractFunctionCallExpression f) + throws AlgebricksException { + final SourceLocation loc = f.getSourceLocation(); + int numArgs = f.getArguments().size(); + int nextArg = 0; + if (numArgs > 3) { + nextArg++; + } + DataverseName dvName = getDataverseName(loc, f.getArguments(), nextArg++); + String dsName = getString(loc, f.getArguments(), nextArg++); + Long partitionNum = ConstantExpressionUtil.getLongArgument(f, nextArg); + if (partitionNum == null) { + throw new IllegalArgumentException("partition number should be a number"); + } + String dbName; + if (numArgs > 3) { + dbName = getString(loc, f.getArguments(), 0); + } else { + dbName = MetadataUtil.databaseFor(dvName); + } + MetadataProvider mp = (MetadataProvider) ctx.getMetadataProvider(); + final Dataset dataset = validateDataset(mp, dbName, dvName, dsName, loc); + return createQueryPartitionDatasource(mp, dataset, partitionNum.intValue(), loc, f); + } + + @Override + protected void createDataScanOp(Mutable<ILogicalOperator> opRef, UnnestOperator unnest, IOptimizationContext ctx, + AbstractFunctionCallExpression f) throws AlgebricksException { + FunctionDataSource datasource = toDatasource(ctx, f); + List<LogicalVariable> variables = new ArrayList<>(); + List<Mutable<ILogicalExpression>> closedRecArgs = new ArrayList<>(); + MetadataProvider mp = (MetadataProvider) ctx.getMetadataProvider(); + computeRecType(f, mp, variables, closedRecArgs, ctx); + DataSourceScanOperator scan = new DataSourceScanOperator(variables, datasource); + scan.setSourceLocation(unnest.getSourceLocation()); + List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs(); + scanInpList.addAll(unnest.getInputs()); + ScalarFunctionCallExpression recordCreationFunc = new ScalarFunctionCallExpression( + FunctionUtil.getFunctionInfo(BuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR), closedRecArgs); + recordCreationFunc.setSourceLocation(unnest.getSourceLocation()); + AssignOperator assignOp = new AssignOperator(unnest.getVariable(), new MutableObject<>(recordCreationFunc)); + assignOp.getInputs().add(new MutableObject<>(scan)); + assignOp.setSourceLocation(unnest.getSourceLocation()); + ctx.computeAndSetTypeEnvironmentForOperator(scan); + ctx.computeAndSetTypeEnvironmentForOperator(assignOp); + opRef.setValue(assignOp); + } + + @Override + protected boolean invalidArgs(List<Mutable<ILogicalExpression>> args) { + return args.size() < 3; + } + + private FunctionDataSource createQueryPartitionDatasource(MetadataProvider mp, Dataset ds, int partitionNum, + SourceLocation loc, AbstractFunctionCallExpression f) throws AlgebricksException { + INodeDomain domain = mp.findNodeDomain(ds.getNodeGroupName()); + PartitioningProperties partitioningProperties = mp.getPartitioningProperties(ds); + AlgebricksPartitionConstraint constraints = partitioningProperties.getConstraints(); + ARecordType recType = computeRecType(f, mp, null, null, null); + return new QueryPartitionDatasource(ds, domain, (AlgebricksAbsolutePartitionConstraint) constraints, recType, + partitionNum); + } + + private ARecordType computeRecType(AbstractFunctionCallExpression f, MetadataProvider metadataProvider, + List<LogicalVariable> outVars, List<Mutable<ILogicalExpression>> closedRecArgs, + IOptimizationContext context) throws AlgebricksException { + final SourceLocation loc = f.getSourceLocation(); + int numArgs = f.getArguments().size(); + int nextArg = 0; + if (numArgs > 3) { + nextArg++; + } + DataverseName dvName = getDataverseName(loc, f.getArguments(), nextArg++); + String dsName = getString(loc, f.getArguments(), nextArg++); + String dbName; + if (numArgs > 3) { + dbName = getString(loc, f.getArguments(), 0); + } else { + dbName = MetadataUtil.databaseFor(dvName); + } + Dataset dataset = validateDataset(metadataProvider, dbName, dvName, dsName, loc); + ARecordType dsType = (ARecordType) metadataProvider.findType(dataset); + ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset); + dsType = (ARecordType) metadataProvider.findTypeForDatasetWithoutType(dsType, metaType, dataset); + + List<IAType> dsKeyTypes = KeyFieldTypeUtil.getPartitoningKeyTypes(dataset, dsType, metaType); + List<List<String>> primaryKeys = dataset.getPrimaryKeys(); + int numPrimaryKeys = dsKeyTypes.size(); + int numPayload = metaType == null ? 1 : 2; + String[] fieldNames = new String[numPrimaryKeys + numPayload]; + IAType[] fieldTypes = new IAType[numPrimaryKeys + numPayload]; + int keyIdx = 0; + for (int k = 0; k < numPrimaryKeys; k++, keyIdx++) { + fieldTypes[keyIdx] = dsKeyTypes.get(k); + fieldNames[keyIdx] = StringUtils.join(primaryKeys.get(k), "."); + setAssignVarsExprs(outVars, closedRecArgs, context, loc, fieldNames, keyIdx); + } + fieldTypes[keyIdx] = dsType; + fieldNames[keyIdx] = "rec"; + setAssignVarsExprs(outVars, closedRecArgs, context, loc, fieldNames, keyIdx); + if (metaType != null) { + keyIdx++; + fieldTypes[keyIdx] = metaType; + fieldNames[keyIdx] = "meta"; + setAssignVarsExprs(outVars, closedRecArgs, context, loc, fieldNames, keyIdx); + } + return new ARecordType("", fieldNames, fieldTypes, false); + } + + private void setAssignVarsExprs(List<LogicalVariable> outVars, List<Mutable<ILogicalExpression>> closedRecArgs, + IOptimizationContext context, SourceLocation loc, String[] fieldNames, int n) { + if (context != null) { + LogicalVariable logicalVariable = context.newVar(); + outVars.add(logicalVariable); + ConstantExpression nameExpr = new ConstantExpression(new AsterixConstantValue(new AString(fieldNames[n]))); + VariableReferenceExpression varRefExpr = new VariableReferenceExpression(logicalVariable); + nameExpr.setSourceLocation(loc); + varRefExpr.setSourceLocation(loc); + closedRecArgs.add(new MutableObject<>(nameExpr)); + closedRecArgs.add(new MutableObject<>(varRefExpr)); + } + } + + private static Dataset validateDataset(MetadataProvider mp, String dbName, DataverseName dvName, String dsName, + SourceLocation loc) throws AlgebricksException { + Dataset dataset = mp.findDataset(dbName, dvName, dsName); + if (dataset == null) { + throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, loc, dsName, + MetadataUtil.dataverseName(dbName, dvName, mp.isUsingDatabase())); + } + return dataset; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java index 5a2ef3c768..9d39088534 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java @@ -27,6 +27,7 @@ import org.apache.asterix.app.function.FeedRewriter; import org.apache.asterix.app.function.JobSummariesRewriter; import org.apache.asterix.app.function.PingRewriter; import org.apache.asterix.app.function.QueryIndexRewriter; +import org.apache.asterix.app.function.QueryPartitionRewriter; import org.apache.asterix.app.function.StorageComponentsRewriter; import org.apache.asterix.app.function.TPCDSAllTablesDataGeneratorRewriter; import org.apache.asterix.app.function.TPCDSSingleTableDataGeneratorRewriter; @@ -100,6 +101,11 @@ public class MetadataBuiltinFunctions { BuiltinFunctions.addFunction(QueryIndexRewriter.QUERY_INDEX, QueryIndexRewriter.INSTANCE, true); BuiltinFunctions.addUnnestFun(QueryIndexRewriter.QUERY_INDEX, false); BuiltinFunctions.addDatasourceFunction(QueryIndexRewriter.QUERY_INDEX, QueryIndexRewriter.INSTANCE); + // Query index partition function + BuiltinFunctions.addPrivateFunction(QueryPartitionRewriter.QUERY_PARTITION, QueryPartitionRewriter.INSTANCE, + true); + BuiltinFunctions.addUnnestFun(QueryPartitionRewriter.QUERY_PARTITION, false); + BuiltinFunctions.addDatasourceFunction(QueryPartitionRewriter.QUERY_PARTITION, QueryPartitionRewriter.INSTANCE); } private MetadataBuiltinFunctions() { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 023b01cd53..fe2127cea3 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -164,12 +164,14 @@ import org.apache.hyracks.data.std.primitive.ShortPointable; import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer; import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory; import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; +import org.apache.hyracks.storage.am.btree.dataflow.BTreePartitionSearchOperatorDescriptor; import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeBatchPointSearchOperatorDescriptor; import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; @@ -596,6 +598,15 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return new Triple<>(feedIngestor, partitionConstraint, adapterFactory); } + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBtreePartitionSearchRuntime( + JobSpecification jobSpec, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, + Dataset dataset, ITupleFilterFactory tupleFilterFactory, long outputLimit, int partitionNum) + throws AlgebricksException { + return getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, dataset, + dataset.getDatasetName(), null, null, true, true, false, null, null, null, tupleFilterFactory, + outputLimit, false, false, DefaultTupleProjectorFactory.INSTANCE, false, partitionNum); + } + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBtreeSearchRuntime(JobSpecification jobSpec, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput, boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, String indexName, @@ -604,6 +615,21 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit, boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, ITupleProjectorFactory tupleProjectorFactory, boolean partitionInputTuples) throws AlgebricksException { + return getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, retainInput, retainMissing, + nonMatchWriterFactory, dataset, indexName, lowKeyFields, highKeyFields, lowKeyInclusive, + highKeyInclusive, propagateFilter, nonFilterWriterFactory, minFilterFieldIndexes, maxFilterFieldIndexes, + tupleFilterFactory, outputLimit, isIndexOnlyPlan, isPrimaryIndexPointSearch, tupleProjectorFactory, + partitionInputTuples, -1); + } + + private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBtreeSearchRuntime(JobSpecification jobSpec, + IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput, + boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, String indexName, + int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, + boolean propagateFilter, IMissingWriterFactory nonFilterWriterFactory, int[] minFilterFieldIndexes, + int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit, + boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, ITupleProjectorFactory tupleProjectorFactory, + boolean partitionInputTuples, int targetPartition) throws AlgebricksException { boolean isSecondary = true; Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); @@ -678,12 +704,19 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> retainMissing, nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory, outputLimit, tupleProjectorFactory, tuplePartitionerFactory, partitionsMap) - : new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields, - lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing, - nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, - propagateFilter, nonFilterWriterFactory, tupleFilterFactory, outputLimit, - proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan, - tupleProjectorFactory, tuplePartitionerFactory, partitionsMap); + : targetPartition < 0 ? new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, + highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, + retainMissing, nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes, + maxFilterFieldIndexes, propagateFilter, nonFilterWriterFactory, tupleFilterFactory, + outputLimit, proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan, + tupleProjectorFactory, tuplePartitionerFactory, partitionsMap) + : new BTreePartitionSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, + highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, + retainMissing, nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes, + maxFilterFieldIndexes, propagateFilter, nonFilterWriterFactory, tupleFilterFactory, + outputLimit, proceedIndexOnlyPlan, failValueForIndexOnlyPlan, + successValueForIndexOnlyPlan, tupleProjectorFactory, tuplePartitionerFactory, + partitionsMap, targetPartition); } else { btreeSearchOp = null; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml index 52cd6cf1bb..f3881e2914 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml @@ -109,6 +109,10 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil-core</artifactId> + </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java new file mode 100644 index 0000000000..dfa59b71a8 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.btree.dataflow; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory; + +public class BTreePartitionSearchOperatorDescriptor extends BTreeSearchOperatorDescriptor { + + private static final long serialVersionUID = 1L; + private final int targetStoragePartition; + + public BTreePartitionSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, + int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, + IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing, + IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, + int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean appendIndexFilter, + IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit, + boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue, + byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory tupleProjectorFactory, + ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap, int targetStoragePartition) { + super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, + retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes, + maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, tupleFilterFactory, outputLimit, + appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue, + searchCallbackProceedResultTrueValue, tupleProjectorFactory, tuplePartitionerFactory, partitionsMap); + this.targetStoragePartition = targetStoragePartition; + } + + @Override + public BTreeSearchOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + return new BTreePartitionSearchOperatorNodePushable(ctx, partition, + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields, + lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, + retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter, + nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult, + searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, tupleProjectorFactory, + tuplePartitionerFactory, partitionsMap, targetStoragePartition); + } + + @Override + public String getDisplayName() { + return "BTree Partition Search"; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java new file mode 100644 index 0000000000..75187914f4 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.btree.dataflow; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory; + +public class BTreePartitionSearchOperatorNodePushable extends BTreeSearchOperatorNodePushable { + + private final int pIdx; + + public BTreePartitionSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, + RecordDescriptor inputRecDesc, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, + boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, + IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing, + IMissingWriterFactory nonMatchWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, + boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory, + ITupleFilterFactory tupleFilterFactory, long outputLimit, boolean appendOpCallbackProceedResult, + byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue, + ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory tuplePartitionerFactory, + int[][] partitionsMap, int targetStoragePartition) throws HyracksDataException { + super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, + minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing, + nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter, nonFilterWriterFactory, + tupleFilterFactory, outputLimit, appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue, + searchCallbackProceedResultTrueValue, projectorFactory, tuplePartitionerFactory, partitionsMap); + pIdx = storagePartitionId2Index.getOrDefault(targetStoragePartition, Integer.MIN_VALUE); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + accessor.reset(buffer); + int tupleCount = accessor.getTupleCount(); + try { + searchPartition(tupleCount); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + private void searchPartition(int tupleCount) throws Exception { + if (pIdx >= 0 && pIdx < cursors.length) { + for (int i = 0; i < tupleCount && !finished; i++) { + resetSearchPredicate(i); + cursors[pIdx].close(); + indexAccessors[pIdx].search(cursors[pIdx], searchPred); + writeSearchResults(i, cursors[pIdx]); + } + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java index 91b87c66a2..da5df23997 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java @@ -120,7 +120,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput protected final ITupleProjector tupleProjector; protected final ITuplePartitioner tuplePartitioner; protected final int[] partitions; - private final Int2IntMap storagePartitionId2Index = new Int2IntOpenHashMap(); + protected final Int2IntMap storagePartitionId2Index = new Int2IntOpenHashMap(); public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,