Till Westmann has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2333
Change subject: [NO ISSUE] (Subjective) readability improvements
......................................................................
[NO ISSUE] (Subjective) readability improvements
Change-Id: I8b27805be1668fe6591c442fcd44020a418c2931
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
3 files changed, 85 insertions(+), 87 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/33/2333/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index b0edb3e..bad6e63 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.PrintWriter;
-import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -95,6 +94,7 @@
import
org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
import
org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
import
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
@@ -106,6 +106,7 @@
import org.apache.hyracks.control.common.config.OptionTypes;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.ImmutableSet;
/**
@@ -201,63 +202,48 @@
}
public JobSpecification compileQuery(IClusterInfoCollector
clusterInfoCollector, MetadataProvider metadataProvider,
- Query rwQ, int varCounter, String outputDatasetName, SessionOutput
output, ICompiledDmlStatement statement)
- throws AlgebricksException, RemoteException, ACIDException {
+ Query query, int varCounter, String outputDatasetName,
SessionOutput output, ICompiledDmlStatement statement)
+ throws AlgebricksException, ACIDException {
+
+ // establish facts
+ final boolean isQuery = query != null;
+ final boolean isLoad = statement != null && statement.getKind() ==
Statement.Kind.LOAD;
SessionConfig conf = output.config();
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) &&
conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
output.out().println();
printPlanPrefix(output, "Rewritten expression tree");
- if (rwQ != null) {
-
rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
+ if (isQuery) {
+
query.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
}
printPlanPostfix(output);
}
- TxnId txnId = TxnIdFactory.create();
+ final TxnId txnId = TxnIdFactory.create();
metadataProvider.setTxnId(txnId);
ILangExpressionToPlanTranslator t =
translatorFactory.createExpressionToPlanTranslator(metadataProvider,
varCounter);
- ILogicalPlan plan;
- // statement = null when it's a query
- if (statement == null || statement.getKind() != Statement.Kind.LOAD) {
- plan = t.translate(rwQ, outputDatasetName, statement);
- } else {
- plan = t.translateLoad(statement);
- }
+ ILogicalPlan plan = isLoad ? t.translateLoad(statement) :
t.translate(query, outputDatasetName, statement);
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) &&
conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
output.out().println();
printPlanPrefix(output, "Logical plan");
- if (rwQ != null || (statement != null && statement.getKind() ==
Statement.Kind.LOAD)) {
+ if (isQuery || isLoad) {
PlanPrettyPrinter.printPlan(plan,
getPrettyPrintVisitor(output.config().getLpfmt(), output.out()), 0);
}
printPlanPostfix(output);
}
CompilerProperties compilerProperties =
metadataProvider.getApplicationContext().getCompilerProperties();
- int frameSize = compilerProperties.getFrameSize();
- Map<String, String> querySpecificConfig = metadataProvider.getConfig();
- validateConfig(querySpecificConfig); // Validates the user-overridden
query parameters.
- int sortFrameLimit =
getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
-
querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
- compilerProperties.getSortMemorySize(), frameSize,
MIN_FRAME_LIMIT_FOR_SORT);
- int groupFrameLimit =
getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
-
querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
- compilerProperties.getGroupMemorySize(), frameSize,
MIN_FRAME_LIMIT_FOR_GROUP_BY);
- int joinFrameLimit =
getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
-
querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
- compilerProperties.getJoinMemorySize(), frameSize,
MIN_FRAME_LIMIT_FOR_JOIN);
-
OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
-
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalSort(sortFrameLimit);
-
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit);
-
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit);
+ Map<String, String> querySpecificConfig =
validateConfig(metadataProvider.getConfig());
+ final PhysicalOptimizationConfig physOptConf =
getPhysicalOptimizationConfig(compilerProperties,
+ querySpecificConfig);
HeuristicCompilerFactoryBuilder builder =
new
HeuristicCompilerFactoryBuilder(OptimizationContextFactory.INSTANCE);
-
builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
+ builder.setPhysicalOptimizationConfig(physOptConf);
builder.setLogicalRewrites(ruleSetFactory.getLogicalRewrites(metadataProvider.getApplicationContext()));
builder.setPhysicalRewrites(ruleSetFactory.getPhysicalRewrites(metadataProvider.getApplicationContext()));
IDataFormat format = metadataProvider.getDataFormat();
@@ -285,7 +271,7 @@
PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
} else {
printPlanPrefix(output, "Optimized logical plan");
- if (rwQ != null || (statement != null &&
statement.getKind() == Statement.Kind.LOAD)) {
+ if (isQuery || isLoad) {
PlanPrettyPrinter.printPlan(plan,
getPrettyPrintVisitor(output.config().getLpfmt(), output.out()), 0);
}
@@ -293,7 +279,7 @@
}
}
}
- if (rwQ != null && rwQ.isExplain()) {
+ if (isQuery && query.isExplain()) {
try {
LogicalOperatorPrettyPrintVisitor pvisitor = new
LogicalOperatorPrettyPrintVisitor();
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
@@ -318,25 +304,7 @@
builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider());
builder.setMissingWriterFactory(format.getMissingWriterFactory());
builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider());
-
- final SessionConfig.OutputFormat outputFormat = conf.fmt();
- switch (outputFormat) {
- case LOSSLESS_JSON:
-
builder.setPrinterProvider(format.getLosslessJSONPrinterFactoryProvider());
- break;
- case CSV:
-
builder.setPrinterProvider(format.getCSVPrinterFactoryProvider());
- break;
- case ADM:
-
builder.setPrinterProvider(format.getADMPrinterFactoryProvider());
- break;
- case CLEAN_JSON:
-
builder.setPrinterProvider(format.getCleanJSONPrinterFactoryProvider());
- break;
- default:
- throw new AlgebricksException("Unexpected OutputFormat: " +
outputFormat);
- }
-
+ builder.setPrinterProvider(getPrinterFactoryProvider(format,
conf.fmt()));
builder.setSerializerDeserializerProvider(format.getSerdeProvider());
builder.setTypeTraitProvider(format.getTypeTraitProvider());
builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
@@ -345,26 +313,66 @@
new JobEventListenerFactory(txnId,
metadataProvider.isWriteTransaction());
JobSpecification spec =
compiler.createJob(metadataProvider.getApplicationContext(),
jobEventListenerFactory);
- // When the top-level statement is a query, the statement parameter is
null.
- if (statement == null) {
+ if (isQuery) {
// Sets a required capacity, only for read-only queries.
// DDLs and DMLs are considered not that frequent.
// limit the computation locations to the locations that will be
used in the query
+ final INodeJobTracker nodeJobTracker =
metadataProvider.getApplicationContext().getNodeJobTracker();
final AlgebricksAbsolutePartitionConstraint jobLocations =
- getJobLocations(spec,
metadataProvider.getApplicationContext().getNodeJobTracker(),
- computationLocations);
- final IClusterCapacity jobRequiredCapacity = ResourceUtils
- .getRequiredCapacity(plan, jobLocations, sortFrameLimit,
groupFrameLimit, joinFrameLimit,
- frameSize);
+ getJobLocations(spec, nodeJobTracker,
computationLocations);
+ final IClusterCapacity jobRequiredCapacity =
+ ResourceUtils.getRequiredCapacity(plan, jobLocations,
physOptConf);
spec.setRequiredClusterCapacity(jobRequiredCapacity);
}
+ printJobSpec(query, spec, conf, output);
+ return spec;
+ }
+
+ protected PhysicalOptimizationConfig
getPhysicalOptimizationConfig(CompilerProperties compilerProperties,
+ Map<String, String> querySpecificConfig) throws
AlgebricksException {
+ int frameSize = compilerProperties.getFrameSize();
+ int sortFrameLimit =
getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
+
querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
+ compilerProperties.getSortMemorySize(), frameSize,
MIN_FRAME_LIMIT_FOR_SORT);
+ int groupFrameLimit =
getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
+
querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
+ compilerProperties.getGroupMemorySize(), frameSize,
MIN_FRAME_LIMIT_FOR_GROUP_BY);
+ int joinFrameLimit =
getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
+
querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
+ compilerProperties.getJoinMemorySize(), frameSize,
MIN_FRAME_LIMIT_FOR_JOIN);
+ final PhysicalOptimizationConfig physOptConf =
OptimizationConfUtil.getPhysicalOptimizationConfig();
+ physOptConf.setFrameSize(frameSize);
+ physOptConf.setMaxFramesExternalSort(sortFrameLimit);
+ physOptConf.setMaxFramesExternalGroupBy(groupFrameLimit);
+ physOptConf.setMaxFramesForJoin(joinFrameLimit);
+ return physOptConf;
+ }
+
+ protected IPrinterFactoryProvider getPrinterFactoryProvider(IDataFormat
format,
+ SessionConfig.OutputFormat outputFormat) throws
AlgebricksException {
+ switch (outputFormat) {
+ case LOSSLESS_JSON:
+ return format.getLosslessJSONPrinterFactoryProvider();
+ case CSV:
+ return format.getCSVPrinterFactoryProvider();
+ case ADM:
+ return format.getADMPrinterFactoryProvider();
+ case CLEAN_JSON:
+ return format.getCleanJSONPrinterFactoryProvider();
+ default:
+ throw new AlgebricksException("Unexpected OutputFormat: " +
outputFormat);
+ }
+ }
+
+ protected void printJobSpec(Query rwQ, JobSpecification spec,
SessionConfig conf, SessionOutput output)
+ throws AlgebricksException {
if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
printPlanPrefix(output, "Hyracks job");
if (rwQ != null) {
try {
- output.out().println(
- new
ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON()));
+ final ObjectWriter objectWriter = new
ObjectMapper().writerWithDefaultPrettyPrinter();
+
output.out().println(objectWriter.writeValueAsString(spec.toJSON()));
} catch (IOException e) {
throw new AlgebricksException(e);
}
@@ -372,7 +380,6 @@
}
printPlanPostfix(output);
}
- return spec;
}
private AbstractLogicalOperatorPrettyPrintVisitor
getPrettyPrintVisitor(SessionConfig.PlanFormat planFormat,
@@ -392,7 +399,6 @@
double duration = (endTime - startTime) / 1000.00;
out.println("<pre>Duration: " + duration + " sec</pre>");
}
-
}
public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs,
PrintWriter out) throws Exception {
@@ -501,12 +507,13 @@
}
// Validates if the query contains unsupported query parameters.
- private static void validateConfig(Map<String, String> config) throws
AlgebricksException {
+ private static Map<String, String> validateConfig(Map<String, String>
config) throws AlgebricksException {
for (String parameterName : config.keySet()) {
if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)) {
throw
AsterixException.create(ErrorCode.COMPILATION_UNSUPPORTED_QUERY_PARAMETER,
parameterName);
}
}
+ return config;
}
public static AlgebricksAbsolutePartitionConstraint
getJobLocations(JobSpecification spec,
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index 1763a98..ccda1e7 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
@@ -40,22 +41,20 @@
* a given query plan.
* @param computationLocations,
* the partitions for computation.
- * @param sortFrameLimit,
- * the frame limit for one sorter partition.
- * @param groupFrameLimit,
- * the frame limit for one group-by partition.
- * @param joinFrameLimit
- * the frame limit for one joiner partition.
- * @param frameSize
- * the frame size used in query execution.
+ * @param physicalOptimizationConfig,
+ * a PhysicalOptimizationConfig.
* @return the required cluster capacity for executing the query.
* @throws AlgebricksException
* if the query plan is malformed.
*/
public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan,
- AlgebricksAbsolutePartitionConstraint computationLocations, int
sortFrameLimit, int groupFrameLimit,
- int joinFrameLimit, int frameSize)
- throws AlgebricksException {
+ AlgebricksAbsolutePartitionConstraint computationLocations,
+ PhysicalOptimizationConfig physicalOptimizationConfig) throws
AlgebricksException {
+ final int frameSize = physicalOptimizationConfig.getFrameSize();
+ final int sortFrameLimit =
physicalOptimizationConfig.getMaxFramesExternalSort();
+ final int groupFrameLimit =
physicalOptimizationConfig.getMaxFramesForGroupBy();
+ final int joinFrameLimit =
physicalOptimizationConfig.getMaxFramesForJoin();
+
// Creates a cluster capacity visitor.
IClusterCapacity clusterCapacity = new ClusterCapacity();
RequiredCapacityVisitor visitor = new
RequiredCapacityVisitor(computationLocations.getLocations().length,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 6f58b0a..38d13f6 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -355,8 +355,7 @@
throws AlgebricksException {
DataSource source = findDataSource(dataSourceId);
Dataset dataset = ((DatasetDataSource) source).getDataset();
- String indexName = indexId;
- Index secondaryIndex = getIndex(dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
+ Index secondaryIndex = getIndex(dataset.getDataverseName(),
dataset.getDatasetName(), indexId);
return (secondaryIndex != null)
? new DataSourceIndex(secondaryIndex,
dataset.getDataverseName(), dataset.getDatasetName(), this)
: null;
@@ -381,26 +380,19 @@
List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars,
List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv,
JobGenContext context, JobSpecification jobSpec, Object
implConfig) throws AlgebricksException {
- try {
- return ((DataSource) dataSource).buildDatasourceScanRuntime(this,
dataSource, scanVariables,
- projectVariables, projectPushed, minFilterVars,
maxFilterVars, opSchema, typeEnv, context, jobSpec,
- implConfig);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
+ return ((DataSource) dataSource).buildDatasourceScanRuntime(this,
dataSource, scanVariables, projectVariables,
+ projectPushed, minFilterVars, maxFilterVars, opSchema,
typeEnv, context, jobSpec, implConfig);
}
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildLoadableDatasetScan(
JobSpecification jobSpec, IAdapterFactory adapterFactory,
RecordDescriptor rDesc)
throws AlgebricksException {
ExternalScanOperatorDescriptor dataScanner = new
ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
- AlgebricksPartitionConstraint constraint;
try {
- constraint = adapterFactory.getPartitionConstraint();
+ return new Pair<>(dataScanner,
adapterFactory.getPartitionConstraint());
} catch (Exception e) {
throw new AlgebricksException(e);
}
- return new Pair<>(dataScanner, constraint);
}
public Dataverse findDataverse(String dataverseName) throws
AlgebricksException {
--
To view, visit https://asterix-gerrit.ics.uci.edu/2333
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8b27805be1668fe6591c442fcd44020a418c2931
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>