>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18143 )
Change subject: [ASTERIXDB-3340][EXT] Use proper cluster locations based on partitioning scheme ...................................................................... [ASTERIXDB-3340][EXT] Use proper cluster locations based on partitioning scheme - user model changes: no - storage format changes: no - interface changes: yes Details: When specifying the constraints (cluster locations) of an external data source: Non-cloud deployment (Dynamic partitioning): use a constraints that is sorted on the node names (sorted cluster locations) so that it matches the constraints used when creating an internal dataset. Cloud-deployment (Static partitioning): use a constraints that is based on the partitioning map. Change-Id: Ia830326f86712d5a0868979ed54afda00df53b78 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18143 Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Wail Alkowaileet <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java 13 files changed, 68 insertions(+), 16 deletions(-) Approvals: Wail Alkowaileet: Looks good to me, approved Ali Alsuliman: Looks good to me, but someone else must approve Jenkins: Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java index bd72a66..1eac011 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java @@ -54,7 +54,7 @@ } @Override - protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { + protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) { return constraint; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java index b3889fd..52054d6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java @@ -80,7 +80,7 @@ } @Override - protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { + protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) { return storageLocations; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java index 430ba36..b4d1b03 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java @@ -63,7 +63,7 @@ } @Override - protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { - return csm.getSortedClusterLocations(); + protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) { + return md.getDataPartitioningProvider().getClusterLocations(); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java index 67319e3..3bbaee2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java @@ -67,7 +67,7 @@ } @Override - protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { - return csm.getSortedClusterLocations(); + protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) { + return md.getDataPartitioningProvider().getClusterLocations(); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index ffa6c87..a5d503b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -173,7 +173,7 @@ /** * @return the constraint representing all the partitions of the cluster sorted by node name */ - AlgebricksAbsolutePartitionConstraint getSortedClusterLocations(); + AlgebricksAbsolutePartitionConstraint getNodeSortedClusterLocations(); /** * @param excludePendingRemoval true, if the desired set shouldn't have pending removal nodes diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java index e59d4e7..828715d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java @@ -18,5 +18,10 @@ */ package org.apache.asterix.common.dataflow; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; + public interface IDataPartitioningProvider { + + AlgebricksAbsolutePartitionConstraint getClusterLocations(); + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java index ac8859b..06662e5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java @@ -70,8 +70,8 @@ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException { this.configuration = configuration; - this.partitionConstraint = ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager() - .getSortedClusterLocations(); + this.partitionConstraint = ((ICcApplicationContext) ctx.getApplicationContext()).getDataPartitioningProvider() + .getClusterLocations(); this.filterEvaluatorFactory = filterEvaluatorFactory; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java index e92448c..f7638b4 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java @@ -252,7 +252,7 @@ public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx, AlgebricksAbsolutePartitionConstraint clusterLocations) { if (clusterLocations == null) { - return ((ICcApplicationContext) appCtx).getClusterStateManager().getSortedClusterLocations(); + return ((ICcApplicationContext) appCtx).getDataPartitioningProvider().getClusterLocations(); } return clusterLocations; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java index f6aa347..91f7615 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java @@ -110,7 +110,7 @@ adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE); IClusterStateManager csm = metadataProvider.getApplicationContext().getClusterStateManager(); FunctionDataSourceFactory factory = - new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm))); + new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm, metadataProvider))); IDataParserFactory dataParserFactory = createDataParserFactory(); dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE); dataParserFactory.configure(Collections.emptyMap()); @@ -126,8 +126,8 @@ protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider, AlgebricksAbsolutePartitionConstraint locations); - protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { - String[] sortedLocations = csm.getSortedClusterLocations().getLocations(); + protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) { + String[] sortedLocations = md.getDataPartitioningProvider().getClusterLocations().getLocations(); return new AlgebricksAbsolutePartitionConstraint( Arrays.stream(sortedLocations).distinct().toArray(String[]::new)); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 107cfb2..8d8ca80 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -1010,9 +1010,14 @@ } public AlgebricksAbsolutePartitionConstraint getClusterLocations() { + //TODO(partitioning): should this be removed and getSortedClusterLocations() is used instead? return appCtx.getClusterStateManager().getClusterLocations(); } + public DataPartitioningProvider getDataPartitioningProvider() { + return dataPartitioningProvider; + } + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime( JobSpecification jobSpec, Dataset dataset, int[] ridIndexes, boolean retainInput, IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema, JobGenContext context, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java index 57392db..1a1c8ac 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java @@ -24,6 +24,7 @@ import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.entities.Dataset; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -64,4 +65,9 @@ int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second)); return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap); } + + @Override + public AlgebricksAbsolutePartitionConstraint getClusterLocations() { + return clusterStateManager.getNodeSortedClusterLocations(); + } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java index 7206a50..44141cb 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java @@ -30,6 +30,7 @@ import org.apache.asterix.common.cluster.StorageComputePartitionsMap; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.metadata.MetadataConstants; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; @@ -50,10 +51,10 @@ @Override public PartitioningProperties getPartitioningProperties(String databaseName) { - SplitComputeLocations dataverseSplits = getSplits(databaseName); + SplitComputeLocations databaseSplits = getSplits(databaseName); StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap(); int[][] partitionsMap = partitionMap.getComputeToStorageMap(false); - return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), dataverseSplits.getConstraints(), + return PartitioningProperties.of(databaseSplits.getSplitsProvider(), databaseSplits.getConstraints(), partitionsMap); } @@ -127,4 +128,10 @@ new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0])); return new SplitComputeLocations(splitProvider, constraints); } + + @Override + public AlgebricksAbsolutePartitionConstraint getClusterLocations() { + SplitComputeLocations locations = getSplits(MetadataConstants.DEFAULT_DATABASE); + return (AlgebricksAbsolutePartitionConstraint) locations.getConstraints(); + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 0a10204..444d91b 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -302,7 +302,7 @@ } @Override - public synchronized AlgebricksAbsolutePartitionConstraint getSortedClusterLocations() { + public synchronized AlgebricksAbsolutePartitionConstraint getNodeSortedClusterLocations() { String[] clone = getClusterLocations().getLocations().clone(); Arrays.sort(clone); return new AlgebricksAbsolutePartitionConstraint(clone); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18143 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Ia830326f86712d5a0868979ed54afda00df53b78 Gerrit-Change-Number: 18143 Gerrit-PatchSet: 2 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Wail Alkowaileet <[email protected]> Gerrit-MessageType: merged
