Till Westmann has submitted this change and it was merged. Change subject: [NO ISSUE] Readability improvements ......................................................................
[NO ISSUE] Readability improvements Change-Id: I8b27805be1668fe6591c442fcd44020a418c2931 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2333 Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Murtadha Hubail <[email protected]> Tested-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java 3 files changed, 86 insertions(+), 85 deletions(-) Approvals: Anon. E. Moose #1000171: Murtadha Hubail: Looks good to me, approved; Verified; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 0f91275..a18277e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.PrintWriter; -import java.rmi.RemoteException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -95,6 +94,7 @@ import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext; import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider; import org.apache.hyracks.api.client.IClusterInfoCollector; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.client.NodeControllerInfo; @@ -106,6 +106,7 @@ import org.apache.hyracks.control.common.config.OptionTypes; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.collect.ImmutableSet; /** @@ -201,63 +202,48 @@ } public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider, - Query rwQ, int varCounter, String outputDatasetName, SessionOutput output, ICompiledDmlStatement statement) - throws AlgebricksException, RemoteException, ACIDException { + Query query, int varCounter, String outputDatasetName, SessionOutput output, + ICompiledDmlStatement statement) throws AlgebricksException, ACIDException { + + // establish facts + final boolean isQuery = query != null; + final boolean isLoad = statement != null && statement.getKind() == Statement.Kind.LOAD; SessionConfig conf = output.config(); if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) { output.out().println(); printPlanPrefix(output, "Rewritten expression tree"); - if (rwQ != null) { - rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0); + if (isQuery) { + query.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0); } printPlanPostfix(output); } - TxnId txnId = TxnIdFactory.create(); + final TxnId txnId = TxnIdFactory.create(); metadataProvider.setTxnId(txnId); ILangExpressionToPlanTranslator t = translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter); - ILogicalPlan plan; - // statement = null when it's a query - if (statement == null || statement.getKind() != Statement.Kind.LOAD) { - plan = t.translate(rwQ, outputDatasetName, statement); - } else { - plan = t.translateLoad(statement); - } + ILogicalPlan plan = isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement); if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) { output.out().println(); printPlanPrefix(output, "Logical plan"); - if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) { + if (isQuery || isLoad) { PlanPrettyPrinter.printPlan(plan, getPrettyPrintVisitor(output.config().getLpfmt(), output.out()), 0); } printPlanPostfix(output); } CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties(); - int frameSize = compilerProperties.getFrameSize(); - Map<String, String> querySpecificConfig = metadataProvider.getConfig(); - validateConfig(querySpecificConfig); // Validates the user-overridden query parameters. - int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY), - compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT); - int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY), - compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY); - int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY), - compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN); - OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize); - OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalSort(sortFrameLimit); - OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit); - OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit); + Map<String, String> querySpecificConfig = validateConfig(metadataProvider.getConfig()); + final PhysicalOptimizationConfig physOptConf = + getPhysicalOptimizationConfig(compilerProperties, querySpecificConfig); HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(OptimizationContextFactory.INSTANCE); - builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig()); + builder.setPhysicalOptimizationConfig(physOptConf); builder.setLogicalRewrites(ruleSetFactory.getLogicalRewrites(metadataProvider.getApplicationContext())); builder.setPhysicalRewrites(ruleSetFactory.getPhysicalRewrites(metadataProvider.getApplicationContext())); IDataFormat format = metadataProvider.getDataFormat(); @@ -285,7 +271,7 @@ PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0); } else { printPlanPrefix(output, "Optimized logical plan"); - if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) { + if (isQuery || isLoad) { PlanPrettyPrinter.printPlan(plan, getPrettyPrintVisitor(output.config().getLpfmt(), output.out()), 0); } @@ -293,7 +279,7 @@ } } } - if (rwQ != null && rwQ.isExplain()) { + if (isQuery && query.isExplain()) { try { LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(); PlanPrettyPrinter.printPlan(plan, pvisitor, 0); @@ -318,25 +304,7 @@ builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider()); builder.setMissingWriterFactory(format.getMissingWriterFactory()); builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider()); - - final SessionConfig.OutputFormat outputFormat = conf.fmt(); - switch (outputFormat) { - case LOSSLESS_JSON: - builder.setPrinterProvider(format.getLosslessJSONPrinterFactoryProvider()); - break; - case CSV: - builder.setPrinterProvider(format.getCSVPrinterFactoryProvider()); - break; - case ADM: - builder.setPrinterProvider(format.getADMPrinterFactoryProvider()); - break; - case CLEAN_JSON: - builder.setPrinterProvider(format.getCleanJSONPrinterFactoryProvider()); - break; - default: - throw new AlgebricksException("Unexpected OutputFormat: " + outputFormat); - } - + builder.setPrinterProvider(getPrinterFactoryProvider(format, conf.fmt())); builder.setSerializerDeserializerProvider(format.getSerdeProvider()); builder.setTypeTraitProvider(format.getTypeTraitProvider()); builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider()); @@ -345,24 +313,66 @@ new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction()); JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory); - // When the top-level statement is a query, the statement parameter is null. - if (statement == null) { + if (isQuery) { // Sets a required capacity, only for read-only queries. // DDLs and DMLs are considered not that frequent. // limit the computation locations to the locations that will be used in the query - final AlgebricksAbsolutePartitionConstraint jobLocations = getJobLocations(spec, - metadataProvider.getApplicationContext().getNodeJobTracker(), computationLocations); - final IClusterCapacity jobRequiredCapacity = ResourceUtils.getRequiredCapacity(plan, jobLocations, - sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize); + final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker(); + final AlgebricksAbsolutePartitionConstraint jobLocations = + getJobLocations(spec, nodeJobTracker, computationLocations); + final IClusterCapacity jobRequiredCapacity = + ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf); spec.setRequiredClusterCapacity(jobRequiredCapacity); } + printJobSpec(query, spec, conf, output); + return spec; + } + + protected PhysicalOptimizationConfig getPhysicalOptimizationConfig(CompilerProperties compilerProperties, + Map<String, String> querySpecificConfig) throws AlgebricksException { + int frameSize = compilerProperties.getFrameSize(); + int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY, + querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY), + compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT); + int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY, + querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY), + compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY); + int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY, + querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY), + compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN); + final PhysicalOptimizationConfig physOptConf = OptimizationConfUtil.getPhysicalOptimizationConfig(); + physOptConf.setFrameSize(frameSize); + physOptConf.setMaxFramesExternalSort(sortFrameLimit); + physOptConf.setMaxFramesExternalGroupBy(groupFrameLimit); + physOptConf.setMaxFramesForJoin(joinFrameLimit); + return physOptConf; + } + + protected IPrinterFactoryProvider getPrinterFactoryProvider(IDataFormat format, + SessionConfig.OutputFormat outputFormat) throws AlgebricksException { + switch (outputFormat) { + case LOSSLESS_JSON: + return format.getLosslessJSONPrinterFactoryProvider(); + case CSV: + return format.getCSVPrinterFactoryProvider(); + case ADM: + return format.getADMPrinterFactoryProvider(); + case CLEAN_JSON: + return format.getCleanJSONPrinterFactoryProvider(); + default: + throw new AlgebricksException("Unexpected OutputFormat: " + outputFormat); + } + } + + protected void printJobSpec(Query rwQ, JobSpecification spec, SessionConfig conf, SessionOutput output) + throws AlgebricksException { if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) { printPlanPrefix(output, "Hyracks job"); if (rwQ != null) { try { - output.out().println( - new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON())); + final ObjectWriter objectWriter = new ObjectMapper().writerWithDefaultPrettyPrinter(); + output.out().println(objectWriter.writeValueAsString(spec.toJSON())); } catch (IOException e) { throw new AlgebricksException(e); } @@ -370,7 +380,6 @@ } printPlanPostfix(output); } - return spec; } private AbstractLogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor(SessionConfig.PlanFormat planFormat, @@ -390,7 +399,6 @@ double duration = (endTime - startTime) / 1000.00; out.println("<pre>Duration: " + duration + " sec</pre>"); } - } public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out) throws Exception { @@ -499,12 +507,13 @@ } // Validates if the query contains unsupported query parameters. - private static void validateConfig(Map<String, String> config) throws AlgebricksException { + private static Map<String, String> validateConfig(Map<String, String> config) throws AlgebricksException { for (String parameterName : config.keySet()) { if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)) { throw AsterixException.create(ErrorCode.COMPILATION_UNSUPPORTED_QUERY_PARAMETER, parameterName); } } + return config; } public static AlgebricksAbsolutePartitionConstraint getJobLocations(JobSpecification spec, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java index 89c4c76..ccda1e7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java @@ -24,6 +24,7 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.job.resource.ClusterCapacity; import org.apache.hyracks.api.job.resource.IClusterCapacity; @@ -40,21 +41,20 @@ * a given query plan. * @param computationLocations, * the partitions for computation. - * @param sortFrameLimit, - * the frame limit for one sorter partition. - * @param groupFrameLimit, - * the frame limit for one group-by partition. - * @param joinFrameLimit - * the frame limit for one joiner partition. - * @param frameSize - * the frame size used in query execution. + * @param physicalOptimizationConfig, + * a PhysicalOptimizationConfig. * @return the required cluster capacity for executing the query. * @throws AlgebricksException * if the query plan is malformed. */ public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan, - AlgebricksAbsolutePartitionConstraint computationLocations, int sortFrameLimit, int groupFrameLimit, - int joinFrameLimit, int frameSize) throws AlgebricksException { + AlgebricksAbsolutePartitionConstraint computationLocations, + PhysicalOptimizationConfig physicalOptimizationConfig) throws AlgebricksException { + final int frameSize = physicalOptimizationConfig.getFrameSize(); + final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort(); + final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy(); + final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin(); + // Creates a cluster capacity visitor. IClusterCapacity clusterCapacity = new ClusterCapacity(); RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 62337ad..f740d09 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -355,8 +355,7 @@ throws AlgebricksException { DataSource source = findDataSource(dataSourceId); Dataset dataset = ((DatasetDataSource) source).getDataset(); - String indexName = indexId; - Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName); + Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexId); return (secondaryIndex != null) ? new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this) : null; @@ -381,26 +380,19 @@ List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException { - try { - return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables, - projectVariables, projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec, - implConfig); - } catch (AsterixException e) { - throw new AlgebricksException(e); - } + return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables, projectVariables, + projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec, implConfig); } protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan( JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc) throws AlgebricksException { ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory); - AlgebricksPartitionConstraint constraint; try { - constraint = adapterFactory.getPartitionConstraint(); + return new Pair<>(dataScanner, adapterFactory.getPartitionConstraint()); } catch (Exception e) { throw new AlgebricksException(e); } - return new Pair<>(dataScanner, constraint); } public Dataverse findDataverse(String dataverseName) throws AlgebricksException { -- To view, visit https://asterix-gerrit.ics.uci.edu/2333 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I8b27805be1668fe6591c442fcd44020a418c2931 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Till Westmann <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
