This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new c20bf9a [ASTERIXDB-2532][RT] per-operator profiling c20bf9a is described below commit c20bf9a74fcb35ee8dae9c355657235ef19297cc Author: Ian Maxon <ima...@apache.org> AuthorDate: Thu Sep 12 12:12:04 2019 -0700 [ASTERIXDB-2532][RT] per-operator profiling Enables profiling in queries at the operator-level when the analyze variable is set in a query. Change-Id: Ie16f3901ae5b32920d8552d5fd1ec8bb6e2ec8ae Reviewed-on: https://asterix-gerrit.ics.uci.edu/3226 Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Till Westmann <ti...@apache.org> --- .../asterix/translator/IStatementExecutor.java | 58 ++++++++- .../apache/asterix/api/common/ResultMetadata.java | 11 ++ .../api/http/server/NCQueryServiceServlet.java | 2 +- .../api/http/server/QueryResultApiServlet.java | 10 +- .../http/server/QueryServiceRequestParameters.java | 10 ++ .../api/http/server/QueryServiceServlet.java | 33 ++++-- .../message/ExecuteStatementRequestMessage.java | 10 +- .../asterix/app/result/JobResultCallback.java | 17 +++ .../{ResponseMertics.java => ResponseMetrics.java} | 26 ++--- .../asterix/app/result/fields/MetricsPrinter.java | 30 ++--- .../asterix/app/result/fields/NcResultPrinter.java | 8 +- .../asterix/app/result/fields/ProfilePrinter.java | 73 ++++++++++++ .../asterix/app/translator/QueryTranslator.java | 6 + .../asterix/test/common/ResultExtractor.java | 6 + .../apache/asterix/test/common/TestExecutor.java | 5 + .../org/apache/asterix/test/common/TestHelper.java | 12 ++ .../test/runtime/ProfiledExecutionTest.java | 65 +++++++++++ .../asterix-app/src/test/resources/cc-single.conf | 50 ++++++++ .../src/test/resources/runtimets/profiled.xml | 30 +++++ .../profile/full-scan/full-scan.1.ddl.sqlpp | 41 ++++--- .../profile/full-scan/full-scan.2.update.sqlpp | 24 ++-- .../profile/full-scan/full-scan.3.profile.sqlpp | 23 ++-- .../profile/full-scan/full-scan.4.ddl.sqlpp | 21 +--- .../profile/full-scan/full-scan.3.regexjson | 64 ++++++++++ .../apache/asterix/builders/IARecordBuilder.java | 7 +- .../OpenRecordConstructorDescriptor.java | 5 + .../NestedPlansRunningAggregatorFactory.java | 15 ++- .../meta/AlgebricksMetaOperatorDescriptor.java | 5 + .../runtime/operators/meta/PipelineAssembler.java | 7 +- .../operators/std/AssignRuntimeFactory.java | 2 +- .../api/com}/job/profiling/counters/Counter.java | 3 +- .../hyracks/api/dataflow/IPassableTimer.java} | 24 ++-- .../hyracks/api/dataflow/TimedFrameWriter.java | 130 +++++++++++++++++++++ .../api/dataflow/TimedOperatorNodePushable.java | 123 +++++++++++++++++++ .../hyracks/api/job/profiling/IStatsCollector.java | 27 ++++- .../hyracks/api}/job/profiling/OperatorStats.java | 11 +- .../runtime/SuperActivityOperatorNodePushable.java | 28 ++++- .../hyracks/client/stats/AggregateCounter.java | 2 +- .../client/stats/impl/ClientCounterContext.java | 2 +- .../common/job/profiling/StatsCollector.java | 44 +++++-- .../common/job/profiling/om/TaskProfile.java | 18 ++- .../java/org/apache/hyracks/control/nc/Joblet.java | 2 +- .../java/org/apache/hyracks/control/nc/Task.java | 2 +- .../hyracks/control/nc/work/StartTasksWork.java | 3 +- .../group/sort/SortGroupByOperatorDescriptor.java | 22 +++- .../OptimizedHybridHashJoinOperatorDescriptor.java | 10 ++ .../std/result/ResultWriterOperatorDescriptor.java | 5 + .../std/sort/AbstractSorterOperatorDescriptor.java | 15 ++- .../std/sort/ExternalSortOperatorDescriptor.java | 9 +- .../hyracks/dataflow/std/sort/IRunGenerator.java | 2 + .../dataflow/std/sort/TimedRunGenerator.java | 51 ++++++++ .../std/sort/TopKSorterOperatorDescriptor.java | 15 ++- .../tests/integration/AbstractIntegrationTest.java | 4 +- .../dataflow/BTreeSearchOperatorDescriptor.java | 5 + .../storage/am/btree/test/FramewriterTest.java | 3 + .../hyracks/hyracks-storage-am-common/pom.xml | 5 - .../dataflow/IndexSearchOperatorNodePushable.java | 16 ++- .../hyracks/test/support/CounterContext.java | 2 +- 58 files changed, 1069 insertions(+), 190 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java index 3718340..606abd2 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.translator; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.rmi.RemoteException; import java.util.ArrayList; @@ -43,6 +46,10 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.result.ResultSetId; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + /** * An interface that takes care of executing a list of statements that are submitted through an Asterix API */ @@ -78,11 +85,18 @@ public interface IStatementExecutor { } class Stats implements Serializable { - private static final long serialVersionUID = 5885273238208454610L; + private static final long serialVersionUID = 5885273238208454611L; + + public enum ProfileType { + COUNTS, + FULL + } private long count; private long size; private long processedObjects; + private Profile profile; + private ProfileType type; private long diskIoCount; private long totalWarningsCount; @@ -125,6 +139,48 @@ public interface IStatementExecutor { public void setTotalWarningsCount(long totalWarningsCount) { this.totalWarningsCount = totalWarningsCount; } + + public void setJobProfile(ObjectNode profile) { + this.profile = new Profile(profile); + } + + public ObjectNode getJobProfile() { + return profile != null ? profile.getProfile() : null; + } + + public ProfileType getType() { + return type; + } + + public void setType(ProfileType type) { + this.type = type; + } + } + + class Profile implements Serializable { + private transient ObjectNode profile; + + public Profile(ObjectNode profile) { + this.profile = profile; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + ObjectMapper om = new ObjectMapper(); + out.writeUTF(om.writeValueAsString(profile)); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + ObjectMapper om = new ObjectMapper(); + JsonNode inNode = om.readTree(in.readUTF()); + if (!inNode.isObject()) { + throw new IOException("Deserialization error"); + } + profile = (ObjectNode) inNode; + } + + public ObjectNode getProfile() { + return profile; + } } /** diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java index a036696..9cf8e6e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java @@ -24,11 +24,14 @@ import org.apache.asterix.translator.SessionConfig; import org.apache.hyracks.api.exceptions.Warning; import org.apache.hyracks.api.result.IResultMetadata; +import com.fasterxml.jackson.databind.node.ObjectNode; + public class ResultMetadata implements IResultMetadata { private final SessionConfig.OutputFormat format; private long jobDuration; private long processedObjects; + private ObjectNode profile; private long diskIoCount; private Set<Warning> warnings; private long totalWarningsCount; @@ -68,6 +71,14 @@ public class ResultMetadata implements IResultMetadata { return jobDuration; } + public void setJobProfile(ObjectNode profile) { + this.profile = profile; + } + + public ObjectNode getJobProfile() { + return profile; + } + /** * @return The reported warnings. */ diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 27e685c..2c499c7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -92,7 +92,7 @@ public class NCQueryServiceServlet extends QueryServiceServlet { new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), resultProperties.getNcToCcResultProperties(), param.getClientContextID(), handleUrl, optionalParameters, statementParameters, - param.isMultiStatement(), stmtCategoryRestrictionMask, requestReference); + param.isMultiStatement(), param.isProfile(), stmtCategoryRestrictionMask, requestReference); execution.start(); ncMb.sendMessageToPrimaryCC(requestMsg); try { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java index 52c62af..4980f5e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java @@ -22,11 +22,12 @@ import java.io.IOException; import java.util.concurrent.ConcurrentMap; import org.apache.asterix.api.common.ResultMetadata; -import org.apache.asterix.app.result.ResponseMertics; +import org.apache.asterix.app.result.ResponseMetrics; import org.apache.asterix.app.result.ResponsePrinter; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.app.result.fields.MetricsPrinter; +import org.apache.asterix.app.result.fields.ProfilePrinter; import org.apache.asterix.app.result.fields.ResultsPrinter; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.translator.IStatementExecutor.Stats; @@ -98,10 +99,13 @@ public class QueryResultApiServlet extends AbstractQueryApiServlet { printer.begin(); printer.addResultPrinter(new ResultsPrinter(appCtx, resultReader, null, stats, sessionOutput)); printer.printResults(); - ResponseMertics mertics = ResponseMertics.of(System.nanoTime() - elapsedStart, + ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart, metadata.getJobDuration(), stats.getCount(), stats.getSize(), metadata.getProcessedObjects(), 0, metadata.getTotalWarningsCount(), metadata.getDiskIoCount()); - printer.addFooterPrinter(new MetricsPrinter(mertics, HttpUtil.getPreferredCharset(request))); + printer.addFooterPrinter(new MetricsPrinter(metrics, HttpUtil.getPreferredCharset(request))); + if (metadata.getJobProfile() != null) { + printer.addFooterPrinter(new ProfilePrinter(metadata.getJobProfile())); + } printer.printFooters(); printer.end(); } else { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java index 566133d..ff6dcbd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java @@ -50,6 +50,7 @@ public class QueryServiceRequestParameters { private boolean logicalPlan; private boolean optimizedLogicalPlan; private boolean job; + private boolean profile; private boolean signature; private boolean multiStatement; @@ -197,6 +198,14 @@ public class QueryServiceRequestParameters { this.job = job; } + public void setProfile(boolean profile) { + this.profile = profile; + } + + public boolean isProfile() { + return profile; + } + public boolean isSignature() { return signature; } @@ -230,6 +239,7 @@ public class QueryServiceRequestParameters { object.put("logicalPlan", logicalPlan); object.put("optimizedLogicalPlan", optimizedLogicalPlan); object.put("job", job); + object.put("profile", profile); object.put("signature", signature); object.put("multiStatement", multiStatement); object.put("parseOnly", parseOnly); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index 1e8f111..d193b13 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -45,13 +45,14 @@ import java.util.function.Function; import org.apache.asterix.algebra.base.ILangExtension; import org.apache.asterix.app.result.ExecutionError; import org.apache.asterix.app.result.ExecutionWarning; -import org.apache.asterix.app.result.ResponseMertics; +import org.apache.asterix.app.result.ResponseMetrics; import org.apache.asterix.app.result.ResponsePrinter; import org.apache.asterix.app.result.fields.ClientContextIdPrinter; import org.apache.asterix.app.result.fields.ErrorsPrinter; import org.apache.asterix.app.result.fields.MetricsPrinter; import org.apache.asterix.app.result.fields.ParseOnlyResultPrinter; import org.apache.asterix.app.result.fields.PlansPrinter; +import org.apache.asterix.app.result.fields.ProfilePrinter; import org.apache.asterix.app.result.fields.RequestIdPrinter; import org.apache.asterix.app.result.fields.SignaturePrinter; import org.apache.asterix.app.result.fields.StatusPrinter; @@ -68,7 +69,6 @@ import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.aql.parser.TokenMgrError; @@ -171,6 +171,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { PARSE_ONLY("parse-only"), READ_ONLY("readonly"), JOB("job"), + PROFILE("profile"), SIGNATURE("signature"), MULTI_STATEMENT("multi-statement"); @@ -383,6 +384,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { param.setReadOnly(getOptBoolean(jsonRequest, Parameter.READ_ONLY, false)); param.setOptimizedLogicalPlan(getOptBoolean(jsonRequest, Parameter.OPTIMIZED_LOGICAL_PLAN, false)); param.setJob(getOptBoolean(jsonRequest, Parameter.JOB, false)); + param.setProfile(getOptBoolean(jsonRequest, Parameter.PROFILE, false)); param.setSignature(getOptBoolean(jsonRequest, Parameter.SIGNATURE, true)); param.setStatementParams( getOptStatementParameters(jsonRequest, jsonRequest.fieldNames(), JsonNode::get, v -> v)); @@ -405,8 +407,15 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { param.setTimeout(getParameter(request, Parameter.TIMEOUT)); param.setMaxResultReads(getParameter(request, Parameter.MAX_RESULT_READS)); param.setPlanFormat(getParameter(request, Parameter.PLAN_FORMAT)); + param.setExpressionTree(getOptBoolean(request, Parameter.EXPRESSION_TREE, false)); + param.setRewrittenExpressionTree(getOptBoolean(request, Parameter.REWRITTEN_EXPRESSION_TREE, false)); + param.setLogicalPlan(getOptBoolean(request, Parameter.LOGICAL_PLAN, false)); param.setParseOnly(getOptBoolean(request, Parameter.PARSE_ONLY, false)); param.setReadOnly(getOptBoolean(request, Parameter.READ_ONLY, false)); + param.setOptimizedLogicalPlan(getOptBoolean(request, Parameter.OPTIMIZED_LOGICAL_PLAN, false)); + param.setJob(getOptBoolean(request, Parameter.JOB, false)); + param.setProfile(getOptBoolean(request, Parameter.PROFILE, false)); + param.setSignature(getOptBoolean(request, Parameter.SIGNATURE, true)); param.setMultiStatement(getOptBoolean(request, Parameter.MULTI_STATEMENT, true)); try { param.setStatementParams(getOptStatementParameters(request, request.getParameterNames().iterator(), @@ -509,6 +518,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { .serializeParameterValues(param.getStatementParams()); setAccessControlHeaders(request, response); response.setStatus(execution.getHttpStatus()); + stats.setType(param.isProfile() ? Stats.ProfileType.FULL : Stats.ProfileType.COUNTS); executeStatement(requestRef, statementsText, sessionOutput, resultProperties, stats, param, execution, optionalParams, statementParams, responsePrinter, warnings); } @@ -561,15 +571,18 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { // in case of ASYNC delivery, the status is printed by query translator responsePrinter.addFooterPrinter(new StatusPrinter(execution.getResultStatus())); } - final ResponseMertics mertics = ResponseMertics.of(System.nanoTime() - elapsedStart, execution.duration(), + final ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart, execution.duration(), stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount, stats.getTotalWarningsCount(), stats.getDiskIoCount()); - responsePrinter.addFooterPrinter(new MetricsPrinter(mertics, resultCharset)); + responsePrinter.addFooterPrinter(new MetricsPrinter(metrics, resultCharset)); + if (stats.getType() == Stats.ProfileType.FULL) { + responsePrinter.addFooterPrinter(new ProfilePrinter(stats.getJobProfile())); + } } protected void validateStatement(String statement) throws RuntimeDataException { if (statement == null || statement.isEmpty()) { - throw new RuntimeDataException(ErrorCode.NO_STATEMENT_PROVIDED); + throw new RuntimeDataException(NO_STATEMENT_PROVIDED); } } @@ -581,8 +594,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { Query query = (Query) stmts.get(stmts.size() - 1); Set<VariableExpr> extVars = compilationProvider.getRewriterFactory().createQueryRewriter().getExternalVariables(query.getBody()); - ResultUtil.ParseOnlyResult parseOnlyResult = new ResultUtil.ParseOnlyResult(extVars); - return parseOnlyResult; + return new ResultUtil.ParseOnlyResult(extVars); } protected void executeStatement(IRequestReference requestReference, String statementsText, @@ -697,10 +709,9 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { public static String extractStatementParameterName(String name) { int ln = name.length(); - if (ln > 1 && name.charAt(0) == '$' && Character.isLetter(name.charAt(1))) { - if (ln == 2 || isStatementParameterNameRest(name, 2)) { - return name.substring(1); - } + if ((ln == 2 || isStatementParameterNameRest(name, 2)) && name.charAt(0) == '$' + && Character.isLetter(name.charAt(1))) { + return name.substring(1); } return null; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index b666085..4e2fea0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -19,6 +19,9 @@ package org.apache.asterix.app.message; +import static org.apache.asterix.translator.IStatementExecutor.Stats.ProfileType.COUNTS; +import static org.apache.asterix.translator.IStatementExecutor.Stats.ProfileType.FULL; + import java.io.PrintWriter; import java.io.StringWriter; import java.util.HashSet; @@ -85,13 +88,14 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage private final Map<String, byte[]> statementParameters; private final boolean multiStatement; private final int statementCategoryRestrictionMask; + private final boolean profile; private final IRequestReference requestReference; public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang, String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties, String clientContextID, String handleUrl, Map<String, String> optionalParameters, - Map<String, byte[]> statementParameters, boolean multiStatement, int statementCategoryRestrictionMask, - IRequestReference requestReference) { + Map<String, byte[]> statementParameters, boolean multiStatement, boolean profile, + int statementCategoryRestrictionMask, IRequestReference requestReference) { this.requestNodeId = requestNodeId; this.requestMessageId = requestMessageId; this.lang = lang; @@ -104,6 +108,7 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage this.statementParameters = statementParameters; this.multiStatement = multiStatement; this.statementCategoryRestrictionMask = statementCategoryRestrictionMask; + this.profile = profile; this.requestReference = requestReference; } @@ -141,6 +146,7 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput, compilationProvider, storageComponentProvider, new ResponsePrinter(sessionOutput)); final IStatementExecutor.Stats stats = new IStatementExecutor.Stats(); + stats.setType(profile ? FULL : COUNTS); Map<String, IAObject> stmtParams = RequestParameters.deserializeParameterValues(statementParameters); final IRequestParameters requestParameters = new RequestParameters(requestReference, statementsText, null, resultProperties, stats, outMetadata, clientContextID, optionalParameters, stmtParams, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java index 55c1f8e..43d7cd9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.asterix.api.common.ResultMetadata; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.exceptions.Warning; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.result.IJobResultCallback; import org.apache.hyracks.api.result.ResultJobRecord; @@ -39,6 +40,8 @@ import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.fasterxml.jackson.databind.node.ObjectNode; + public class JobResultCallback implements IJobResultCallback { private static final Logger LOGGER = LogManager.getLogger(); @@ -67,6 +70,16 @@ public class JobResultCallback implements IJobResultCallback { aggregateJobStats(jobId, metadata); } + ObjectNode getProfile(JobId jobId) { + IJobManager jobManager = + ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager(); + final JobRun run = jobManager.get(jobId); + if (run != null) { + return run.getJobProfile().toJSON(); + } + return null; + } + private void aggregateJobStats(JobId jobId, ResultMetadata metadata) { long processedObjects = 0; long diskIoCount = 0; @@ -99,5 +112,9 @@ public class JobResultCallback implements IJobResultCallback { metadata.setWarnings(AggregateWarnings); metadata.setDiskIoCount(diskIoCount); metadata.setTotalWarningsCount(aggregateTotalWarningsCount); + if (run.getFlags() != null && run.getFlags().contains(JobFlag.PROFILE_RUNTIME)) { + metadata.setJobProfile(getProfile(jobId)); + } } + } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java similarity index 76% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java index 03a20a5..bc91d54 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.app.result; -public class ResponseMertics { +public class ResponseMetrics { private long elapsedTime; private long executionTime; @@ -29,21 +29,21 @@ public class ResponseMertics { private long warnCount; private long diskIoCount; - private ResponseMertics() { + private ResponseMetrics() { } - public static ResponseMertics of(long elapsedTime, long executionTime, long resultCount, long resultSize, + public static ResponseMetrics of(long elapsedTime, long executionTime, long resultCount, long resultSize, long processedObjects, long errorCount, long warnCount, long diskIoCount) { - ResponseMertics mertics = new ResponseMertics(); - mertics.elapsedTime = elapsedTime; - mertics.executionTime = executionTime; - mertics.resultCount = resultCount; - mertics.resultSize = resultSize; - mertics.processedObjects = processedObjects; - mertics.errorCount = errorCount; - mertics.warnCount = warnCount; - mertics.diskIoCount = diskIoCount; - return mertics; + ResponseMetrics metrics = new ResponseMetrics(); + metrics.elapsedTime = elapsedTime; + metrics.executionTime = executionTime; + metrics.resultCount = resultCount; + metrics.resultSize = resultSize; + metrics.processedObjects = processedObjects; + metrics.errorCount = errorCount; + metrics.warnCount = warnCount; + metrics.diskIoCount = diskIoCount; + return metrics; } public long getElapsedTime() { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java index 117441e..21ffe0b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java @@ -23,7 +23,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import org.apache.asterix.api.http.server.ResultUtil; -import org.apache.asterix.app.result.ResponseMertics; +import org.apache.asterix.app.result.ResponseMetrics; import org.apache.asterix.common.api.Duration; import org.apache.asterix.common.api.IResponseFieldPrinter; @@ -51,11 +51,11 @@ public class MetricsPrinter implements IResponseFieldPrinter { } public static final String FIELD_NAME = "metrics"; - private final ResponseMertics mertics; + private final ResponseMetrics metrics; private final Charset resultCharset; - public MetricsPrinter(ResponseMertics mertics, Charset resultCharset) { - this.mertics = mertics; + public MetricsPrinter(ResponseMetrics metrics, Charset resultCharset) { + this.metrics = metrics; this.resultCharset = resultCharset; } @@ -67,35 +67,35 @@ public class MetricsPrinter implements IResponseFieldPrinter { pw.print(FIELD_NAME); pw.print("\": {\n"); pw.print("\t"); - ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), Duration.formatNanos(mertics.getElapsedTime(), useAscii)); + ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), Duration.formatNanos(metrics.getElapsedTime(), useAscii)); pw.print("\n\t"); ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), - Duration.formatNanos(mertics.getExecutionTime(), useAscii)); + Duration.formatNanos(metrics.getExecutionTime(), useAscii)); pw.print("\n\t"); - ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), mertics.getResultCount(), true); + ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), metrics.getResultCount(), true); pw.print("\n\t"); - ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), mertics.getResultSize(), true); + ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), metrics.getResultSize(), true); pw.print("\n\t"); - final boolean hasErrors = mertics.getErrorCount() > 0; - final boolean hasWarnings = mertics.getWarnCount() > 0; - final boolean hasDiskIoStats = mertics.getDiskIoCount() > 0; - ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), mertics.getProcessedObjects(), + final boolean hasErrors = metrics.getErrorCount() > 0; + final boolean hasWarnings = metrics.getWarnCount() > 0; + final boolean hasDiskIoStats = metrics.getDiskIoCount() > 0; + ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), metrics.getProcessedObjects(), hasWarnings || hasErrors || hasDiskIoStats); pw.print("\n"); //TODO move diskIoCount to the profile printer when it is introduced if (hasDiskIoStats) { pw.print("\t"); - ResultUtil.printField(pw, Metrics.DISK_IO_COUNT.str(), mertics.getDiskIoCount(), hasWarnings || hasErrors); + ResultUtil.printField(pw, Metrics.DISK_IO_COUNT.str(), metrics.getDiskIoCount(), hasWarnings || hasErrors); pw.print("\n"); } if (hasWarnings) { pw.print("\t"); - ResultUtil.printField(pw, Metrics.WARNING_COUNT.str(), mertics.getWarnCount(), hasErrors); + ResultUtil.printField(pw, Metrics.WARNING_COUNT.str(), metrics.getWarnCount(), hasErrors); pw.print("\n"); } if (hasErrors) { pw.print("\t"); - ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), mertics.getErrorCount(), false); + ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), metrics.getErrorCount(), false); pw.print("\n"); } pw.print("\t}"); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java index 90b0656..c37db07 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java @@ -61,9 +61,11 @@ public class NcResultPrinter implements IResponseFieldPrinter { IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata(); List<Triple<JobId, ResultSetId, ARecordType>> resultSets = resultMetadata.getResultSets(); if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultSets.isEmpty()) { - stats.setProcessedObjects(responseMsg.getStats().getProcessedObjects()); - stats.setDiskIoCount(responseMsg.getStats().getDiskIoCount()); - stats.setTotalWarningsCount(responseMsg.getStats().getTotalWarningsCount()); + IStatementExecutor.Stats responseStats = responseMsg.getStats(); + stats.setJobProfile(responseStats.getJobProfile()); + stats.setProcessedObjects(responseStats.getProcessedObjects()); + stats.setDiskIoCount(responseStats.getDiskIoCount()); + stats.setTotalWarningsCount(responseStats.getTotalWarningsCount()); for (int i = 0; i < resultSets.size(); i++) { Triple<JobId, ResultSetId, ARecordType> rsmd = resultSets.get(i); ResultReader resultReader = new ResultReader(resultSet, rsmd.getLeft(), rsmd.getMiddle()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/ProfilePrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/ProfilePrinter.java new file mode 100644 index 0000000..d74d25c --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/ProfilePrinter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.result.fields; + +import java.io.IOException; +import java.io.PrintWriter; + +import org.apache.asterix.common.api.IResponseFieldPrinter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.util.DefaultIndenter; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class ProfilePrinter implements IResponseFieldPrinter { + + public static final String FIELD_NAME = "profile"; + private final ObjectNode profile; + private final Logger LOGGER = LogManager.getLogger(); + + public ProfilePrinter(ObjectNode profile) { + this.profile = profile; + } + + @Override + public void print(PrintWriter pw) { + boolean hasProfile = profile != null; + if (hasProfile) { + try { + pw.print("\t\"" + FIELD_NAME + "\" : "); + ObjectMapper om = new ObjectMapper(); + om.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + DefaultIndenter ind = new DefaultIndenter("\t", DefaultIndenter.SYS_LF) { + @Override + public void writeIndentation(JsonGenerator jg, int level) throws IOException { + super.writeIndentation(jg, level + 1); + } + }; + DefaultPrettyPrinter pp = new DefaultPrettyPrinter(); + pp = pp.withArrayIndenter(ind); + pp = pp.withObjectIndenter(ind); + om.writer(pp).writeValue(pw, profile); + } catch (IOException e) { + LOGGER.error("Unable to print job profile", e); + + } + } + } + + @Override + public String getName() { + return FIELD_NAME; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 1838c4c..a53ca88 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -391,6 +391,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen metadataProvider.setResultAsyncMode( resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); metadataProvider.setMaxResultReads(maxResultReads); + if (stats.getType() == Stats.ProfileType.FULL) { + this.jobFlags.add(JobFlag.PROFILE_RUNTIME); + } handleQuery(metadataProvider, (Query) stmt, hcc, resultSet, resultDelivery, outMetadata, stats, requestParameters, stmtParams, stmtRewriter); break; @@ -2549,6 +2552,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen (org.apache.asterix.api.common.ResultMetadata) controllerService.getResultDirectoryService() .getResultMetadata(jobId, rsId); stats.setProcessedObjects(resultMetadata.getProcessedObjects()); + if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) { + stats.setJobProfile(resultMetadata.getJobProfile()); + } stats.setDiskIoCount(resultMetadata.getDiskIoCount()); stats.setTotalWarningsCount(resultMetadata.getTotalWarningsCount()); warningCollector.warn(resultMetadata.getWarnings()); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java index eb708ce..60d44a1 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java @@ -51,6 +51,7 @@ public class ResultExtractor { RESULTS("results"), REQUEST_ID("requestID"), METRICS("metrics"), + PROFILE("profile"), CLIENT_CONTEXT_ID("clientContextID"), SIGNATURE("signature"), STATUS("status"), @@ -93,6 +94,10 @@ public class ResultExtractor { return extract(resultStream, EnumSet.of(ResultField.METRICS), resultCharset).getResult(); } + public static InputStream extractProfile(InputStream resultStream, Charset resultCharset) throws Exception { + return extract(resultStream, EnumSet.of(ResultField.PROFILE), resultCharset).getResult(); + } + public static InputStream extractPlans(InputStream resultStream, Charset resultCharset) throws Exception { return extract(resultStream, EnumSet.of(ResultField.PLANS), resultCharset).getResult(); } @@ -167,6 +172,7 @@ public class ResultExtractor { break; case REQUEST_ID: case METRICS: + case PROFILE: case CLIENT_CONTEXT_ID: case SIGNATURE: case STATUS: diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 55b8146..7319ab5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -164,6 +164,7 @@ public class TestExecutor { public static final String DELIVERY_IMMEDIATE = "immediate"; public static final String DIAGNOSE = "diagnose"; private static final String METRICS_QUERY_TYPE = "metrics"; + private static final String PROFILE_QUERY_TYPE = "profile"; private static final String PLANS_QUERY_TYPE = "plans"; private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>(); @@ -951,6 +952,7 @@ public class TestExecutor { case "parse": case "deferred": case "metrics": + case "profile": case "plans": // isDmlRecoveryTest: insert Crash and Recovery if (isDmlRecoveryTest) { @@ -1298,6 +1300,9 @@ public class TestExecutor { case METRICS_QUERY_TYPE: resultStream = ResultExtractor.extractMetrics(resultStream, responseCharset); break; + case PROFILE_QUERY_TYPE: + resultStream = ResultExtractor.extractProfile(resultStream, responseCharset); + break; case PLANS_QUERY_TYPE: resultStream = ResultExtractor.extractPlans(resultStream, responseCharset); break; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java index 0cfeee7..7695698 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java @@ -218,4 +218,16 @@ public final class TestHelper { objectMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); return objectMapper; } + + public static void main(String[] args) throws Exception { + ObjectMapper om = createObjectMapper(); + String patternFile = args[0]; + String instanceFile = args[1]; + if (equalJson(om.readTree(new File(patternFile)), om.readTree(new File(instanceFile)))) { + System.out.println(instanceFile + " matches " + patternFile); + } else { + System.out.println(instanceFile + " does not match " + patternFile); + System.exit(1); + } + } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java new file mode 100644 index 0000000..7c945fc --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.runtime; + +import java.util.Collection; + +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Runs the cluster state runtime tests with the storage parallelism. + */ +@RunWith(Parameterized.class) +public class ProfiledExecutionTest { + protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf"; + + @BeforeClass + public static void setUp() throws Exception { + LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor()); + } + + @AfterClass + public static void tearDown() throws Exception { + LangExecutionUtil.tearDown(); + } + + @Parameters(name = "ProfiledExecutionTest {index}: {0}") + public static Collection<Object[]> tests() throws Exception { + return LangExecutionUtil.buildTestsInXml("profiled.xml"); + } + + protected TestCaseContext tcCtx; + + public ProfiledExecutionTest(TestCaseContext tcCtx) { + this.tcCtx = tcCtx; + } + + @Test + public void test() throws Exception { + LangExecutionUtil.test(tcCtx); + } +} diff --git a/asterixdb/asterix-app/src/test/resources/cc-single.conf b/asterixdb/asterix-app/src/test/resources/cc-single.conf new file mode 100644 index 0000000..419abf7 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/cc-single.conf @@ -0,0 +1,50 @@ +; Licensed to the Apache Software Foundation (ASF) under one +; or more contributor license agreements. See the NOTICE file +; distributed with this work for additional information +; regarding copyright ownership. The ASF licenses this file +; to you under the Apache License, Version 2.0 (the +; "License"); you may not use this file except in compliance +; with the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, +; software distributed under the License is distributed on an +; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +; KIND, either express or implied. See the License for the +; specific language governing permissions and limitations +; under the License. + +[nc/asterix_nc1] +txn.log.dir=target/tmp/asterix_nc1/txnlog +core.dump.dir=target/tmp/asterix_nc1/coredump +iodevices=target/tmp/asterix_nc1/iodevice1 +nc.api.port=19004 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006 + +[nc] +address=127.0.0.1 +command=asterixnc +app.class=org.apache.asterix.hyracks.bootstrap.NCApplication +jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory" +storage.buffercache.pagesize=32KB +storage.buffercache.size=48MB +storage.memorycomponent.globalbudget=512MB + +[cc] +address = 127.0.0.1 +app.class=org.apache.asterix.hyracks.bootstrap.CCApplication +heartbeat.period=2000 +heartbeat.max.misses=25 + +[common] +log.dir = logs/ +log.level = INFO +compiler.framesize=32KB +compiler.sortmemory=320KB +compiler.groupmemory=160KB +compiler.joinmemory=256KB +compiler.textsearchmemory=160KB +compiler.windowmemory=192KB +messaging.frame.size=4096 +messaging.frame.count=512 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml new file mode 100644 index 0000000..496927e --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" + QueryFileExtension=".sqlpp"> + <test-group name="profile"> + <test-case FilePath="profile"> + <compilation-unit name="full-scan"> + <parameter name="profile" value="true" /> + <output-dir compare="Text">full-scan</output-dir> + </compilation-unit> + </test-case> + </test-group> +</test-suite> \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.1.ddl.sqlpp similarity index 61% copy from hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.1.ddl.sqlpp index 02880dd..441bee4 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.1.ddl.sqlpp @@ -16,21 +16,34 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.dataflow.std.sort; +/* + * Description : Processed objects metrics on full scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ -import java.util.List; +drop dataverse test if exists; +create dataverse test; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; +use test; -/** - * @author pouria - * Interface for the Run Generator - */ -public interface IRunGenerator extends IFrameWriter { +create type test.AddressType as +{ + number : bigint, + street : string, + city : string +}; + +create type test.CustomerType as + closed { + cid : bigint, + name : string, + age : bigint?, + address : AddressType?, + lastorder : { + oid : bigint, + total : float + } +}; - /** - * @return the list of generated (sorted) runs - */ - List<GeneratedRunFileReader> getRuns(); -} +create dataset Customers(CustomerType) primary key cid; \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.2.update.sqlpp similarity index 65% copy from hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.2.update.sqlpp index 02880dd..2c5d5d9 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.2.update.sqlpp @@ -16,21 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.dataflow.std.sort; - -import java.util.List; +/* + * Description : Processed objects metrics on full scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; +use test; -/** - * @author pouria - * Interface for the Run Generator - */ -public interface IRunGenerator extends IFrameWriter { +load dataset Customers using localfs + ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`), + (`format`=`adm`)); - /** - * @return the list of generated (sorted) runs - */ - List<GeneratedRunFileReader> getRuns(); -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.3.profile.sqlpp similarity index 65% copy from hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.3.profile.sqlpp index 02880dd..9db1d12 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.3.profile.sqlpp @@ -16,21 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.dataflow.std.sort; - -import java.util.List; - -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; - -/** - * @author pouria - * Interface for the Run Generator +/* + * Description : Processed objects metrics on full scan + * Expected Res : Success + * Date : 28 Sep 2017 */ -public interface IRunGenerator extends IFrameWriter { - /** - * @return the list of generated (sorted) runs - */ - List<GeneratedRunFileReader> getRuns(); -} +use test; + +select count(*) from Customers; \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.4.ddl.sqlpp similarity index 65% copy from hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.4.ddl.sqlpp index 02880dd..4cccd9f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.4.ddl.sqlpp @@ -16,21 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.dataflow.std.sort; - -import java.util.List; - -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; - -/** - * @author pouria - * Interface for the Run Generator +/* + * Description : Processed objects metrics on full scan + * Expected Res : Success + * Date : 28 Sep 2017 */ -public interface IRunGenerator extends IFrameWriter { - /** - * @return the list of generated (sorted) runs - */ - List<GeneratedRunFileReader> getRuns(); -} +drop dataverse test; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson new file mode 100644 index 0000000..82e7128 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson @@ -0,0 +1,64 @@ +{ + "job-id": "R{[A-Z0-9.:]+}", + "counters": [], + "joblets": [ + { + "node-id": "R{.+}", + "counters": [], + "tasks": [ + { + "activity-id": "R{[A-Z0-9.:]+}", + "partition": "R{[0-9]+}", + "attempt": "R{[0-9]+}", + "partition-send-profile": [ + { + "partition-id": { + "job-id": "R{[A-Z0-9.:]+}", + "connector-id": "R{[A-Z0-9.:]+}", + "sender-index": "R{[0-9]+}", + "receiver-index": "R{[0-9]+}" + }, + "open-time": "R{[0-9]+}", + "close-time": "R{[0-9]+}", + "offset": "R{[0-9]+}", + "frame-times": [ + 0 + ], + "resolution": 1 + } + ], + "counters": [ + { + "name": "Empty Tuple Source", + "time": "R{[0-9.]+}" + }, + { + "name": "Index Search", + "time": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "time": "R{[0-9.]+}" + } + ] + }, + { + "activity-id": "R{[A-Z0-9.:]+}", + "partition": "R{[0-9]+}", + "attempt": "R{[0-9]+}", + "partition-send-profile": [], + "counters": [ + { + "name": "R{.+}", + "time": "R{[0-9.]+}" + }, + { + "name": "Result Writer", + "time": "R{[0-9.]+}" + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java index 6d648b1..ea29643 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java @@ -20,9 +20,7 @@ package org.apache.asterix.builders; import java.io.DataOutput; -import java.io.IOException; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; @@ -55,7 +53,7 @@ public interface IARecordBuilder { * The field name. * @param value * The field value. - * @throws AsterixException + * @throws HyracksDataException * if the field name conflicts with a closed field name */ public void addField(IValueReference name, IValueReference value) throws HyracksDataException; @@ -66,8 +64,7 @@ public interface IARecordBuilder { * @param writeTypeTag * Whether to write a typetag as part of the record's serialized * representation. - * @throws IOException - * @throws AsterixException + * @throws HyracksDataException * if any open field names conflict with each other */ public void write(DataOutput out, boolean writeTypeTag) throws HyracksDataException; diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java index 2977470..a98e2dd 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java @@ -118,4 +118,9 @@ public class OpenRecordConstructorDescriptor extends AbstractScalarFunctionDynam } }; } + + @Override + public String toString() { + return "Open Record Constructor"; + } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java index 7887dde..07d4e94 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java @@ -28,6 +28,7 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.EnforceFrameWriter; +import org.apache.hyracks.api.dataflow.TimedFrameWriter; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobFlag; @@ -60,15 +61,19 @@ public class NestedPlansRunningAggregatorFactory extends AbstractAggregatorDescr public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, final IFrameWriter writer, long memoryBudget) throws HyracksDataException { + final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); + final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length + decorFieldIdx.length, writer); - // should enforce protocol - boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); - IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter; + IFrameWriter fw = outputWriter; + if (profile) { + fw = TimedFrameWriter.time(outputWriter, ctx, "Aggregate Writer"); + } else if (enforce) { + fw = EnforceFrameWriter.enforce(outputWriter); + } final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; for (int i = 0; i < subplans.length; i++) { - pipelines[i] = (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], enforcedWriter, - ctx, null); + pipelines[i] = (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], fw, ctx, null); } final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder(); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index 07365db..b3fef7f 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -121,6 +121,11 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper throw exception; } } + + @Override + public String getDisplayName() { + return "Empty Tuple Source"; + } } private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx, diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index f5477ec..81f5d08 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -56,13 +56,13 @@ public class PipelineAssembler { public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException { // should enforce protocol boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); + boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); // plug the operators IFrameWriter start = writer;// this.writer; IPushRuntimeFactory[] runtimeFactories = pipeline.getRuntimeFactories(); RecordDescriptor[] recordDescriptors = pipeline.getRecordDescriptors(); for (int i = runtimeFactories.length - 1; i >= 0; i--) { - start = enforce ? EnforceFrameWriter.enforce(start) : start; - + start = (enforce && !profile) ? EnforceFrameWriter.enforce(start) : start; IPushRuntimeFactory runtimeFactory = runtimeFactories[i]; IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx); for (int j = 0; j < newRuntimes.length; j++) { @@ -99,12 +99,13 @@ public class PipelineAssembler { IHyracksTaskContext ctx, Map<IPushRuntimeFactory, IPushRuntime> outRuntimeMap) throws HyracksDataException { // should enforce protocol boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); + boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); // plug the operators IFrameWriter start = writer; IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); for (int i = runtimeFactories.length - 1; i >= 0; i--) { - start = enforce ? EnforceFrameWriter.enforce(start) : start; + start = (enforce && !profile) ? EnforceFrameWriter.enforce(start) : start; IPushRuntimeFactory runtimeFactory = runtimeFactories[i]; IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx); IPushRuntime newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntimes[0]) : newRuntimes[0]; diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java index 3cdc5ae..5343069 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java @@ -79,7 +79,7 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor if (i > 0) { sb.append(", "); } - sb.append(evalFactories[i]); + sb.append(evalFactories[i].toString()); } sb.append("]"); return sb.toString(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/counters/Counter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/com/job/profiling/counters/Counter.java similarity index 94% rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/counters/Counter.java rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/com/job/profiling/counters/Counter.java index 0bcb246..0490576 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/counters/Counter.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/com/job/profiling/counters/Counter.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.control.common.job.profiling.counters; +package org.apache.hyracks.api.com.job.profiling.counters; import java.util.concurrent.atomic.AtomicLong; import org.apache.hyracks.api.job.profiling.counters.ICounter; +@SuppressWarnings("squid:S1700") public class Counter implements ICounter { private static final long serialVersionUID = -3935601595055562080L; diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java similarity index 65% copy from hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java index 02880dd..6afbccb 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java @@ -16,21 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.dataflow.std.sort; +package org.apache.hyracks.api.dataflow; -import java.util.List; - -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; +public interface IPassableTimer { + /* + A timer intended to be used for timing the individual components of a + pipelined process. An instance of IPassableTimer is held by each method + in the pipeline, and is paused() when that method passes off control to + a component above it, and is resume()d when the component above it returns. + */ -/** - * @author pouria - * Interface for the Run Generator - */ -public interface IRunGenerator extends IFrameWriter { + void pause(); - /** - * @return the list of generated (sorted) runs - */ - List<GeneratedRunFileReader> getRuns(); + void resume(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedFrameWriter.java new file mode 100644 index 0000000..83a4b34 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedFrameWriter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.dataflow; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.profiling.IStatsCollector; +import org.apache.hyracks.api.job.profiling.counters.ICounter; + +public class TimedFrameWriter implements IFrameWriter, IPassableTimer { + + // The downstream data consumer of this writer. + private final IFrameWriter writer; + private long frameStart = 0; + final ICounter counter; + final IStatsCollector collector; + final String name; + + public TimedFrameWriter(IFrameWriter writer, IStatsCollector collector, String name, ICounter counter) { + this.writer = writer; + this.collector = collector; + this.name = name; + this.counter = counter; + } + + protected TimedFrameWriter(IFrameWriter writer, IStatsCollector collector, String name) { + this(writer, collector, name, collector.getOrAddOperatorStats(name).getTimeCounter()); + } + + @Override + public final void open() throws HyracksDataException { + try { + startClock(); + writer.open(); + } finally { + stopClock(); + } + } + + @Override + public final void nextFrame(ByteBuffer buffer) throws HyracksDataException { + try { + startClock(); + writer.nextFrame(buffer); + } finally { + stopClock(); + } + } + + @Override + public final void flush() throws HyracksDataException { + try { + startClock(); + writer.flush(); + } finally { + stopClock(); + } + } + + @Override + public final void fail() throws HyracksDataException { + writer.fail(); + } + + @Override + public void close() throws HyracksDataException { + try { + startClock(); + writer.close(); + } finally { + stopClock(); + } + } + + private void stopClock() { + pause(); + collector.giveClock(this); + } + + private void startClock() { + if (frameStart > 0) { + return; + } + frameStart = collector.takeClock(this); + } + + @Override + public void resume() { + if (frameStart > 0) { + return; + } + long nt = System.nanoTime(); + frameStart = nt; + } + + @Override + public void pause() { + if (frameStart > 1) { + long nt = System.nanoTime(); + long delta = nt - frameStart; + counter.update(delta); + frameStart = -1; + } + } + + public static IFrameWriter time(IFrameWriter writer, IHyracksTaskContext ctx, String name) + throws HyracksDataException { + return writer instanceof TimedFrameWriter ? writer + : new TimedFrameWriter(writer, ctx.getStatsCollector(), name); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java new file mode 100644 index 0000000..2d46bea --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.dataflow; + +import java.util.HashMap; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.profiling.IStatsCollector; +import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable; + +public class TimedOperatorNodePushable extends TimedFrameWriter implements IOperatorNodePushable, IPassableTimer { + + IOperatorNodePushable op; + HashMap<Integer, IFrameWriter> inputs; + long frameStart; + + TimedOperatorNodePushable(IOperatorNodePushable op, IStatsCollector collector) throws HyracksDataException { + super(null, collector, op.getDisplayName()); + this.op = op; + inputs = new HashMap<>(); + } + + @Override + public void initialize() throws HyracksDataException { + synchronized (collector) { + startClock(); + op.initialize(); + stopClock(); + } + } + + @Override + public void deinitialize() throws HyracksDataException { + synchronized (collector) { + startClock(); + op.deinitialize(); + stopClock(); + } + } + + @Override + public int getInputArity() { + return op.getInputArity(); + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) + throws HyracksDataException { + op.setOutputFrameWriter(index, writer, recordDesc); + } + + @Override + public IFrameWriter getInputFrameWriter(int index) { + IFrameWriter ifw = op.getInputFrameWriter(index); + if (!(op instanceof TimedFrameWriter) && ifw.equals(op)) { + return new TimedFrameWriter(op.getInputFrameWriter(index), collector, op.getDisplayName(), counter); + } + return op.getInputFrameWriter(index); + } + + @Override + public String getDisplayName() { + return op.getDisplayName(); + } + + private void stopClock() { + pause(); + collector.giveClock(this); + } + + private void startClock() { + if (frameStart > 0) { + return; + } + frameStart = collector.takeClock(this); + } + + @Override + public void resume() { + if (frameStart > 0) { + return; + } + long nt = System.nanoTime(); + frameStart = nt; + } + + @Override + public void pause() { + if (frameStart > 0) { + long nt = System.nanoTime(); + long delta = nt - frameStart; + counter.update(delta); + frameStart = -1; + } + } + + public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx) + throws HyracksDataException { + if (!(op instanceof TimedOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) { + return new TimedOperatorNodePushable(op, ctx.getStatsCollector()); + } + return op; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java index 1e73146..8930d34 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java @@ -19,7 +19,9 @@ package org.apache.hyracks.api.job.profiling; import java.io.Serializable; +import java.util.Map; +import org.apache.hyracks.api.dataflow.IPassableTimer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IWritable; @@ -36,13 +38,34 @@ public interface IStatsCollector extends IWritable, Serializable { /** * @param operatorName * @return {@link IOperatorStats} for the operator with name <code>operatorName</code> - * if one exists or else null. + * if it already exists, and adds it if it does not. */ - IOperatorStats getOperatorStats(String operatorName); + IOperatorStats getOrAddOperatorStats(String operatorName); + + /** + * Get every registered operator stats object + * @return All registered operators, and their collected stats, with the names as keys and stats as values + */ + Map<String, IOperatorStats> getAllOperatorStats(); /** * @return A special {@link IOperatorStats} that has the aggregated stats * from all operators in the collection. */ IOperatorStats getAggregatedStats(); + + /** + * Pause an operator's timer, to pass it to another operator + * @param newHolder the timer that is starting execution + * @return the current nanoTime when the clock was taken from the other operator + */ + long takeClock(IPassableTimer newHolder); + + /** + * Resume an operator's timer, when a downstream operator has finished execution of + * the method the upstream operator called + * @param currHolder the timer that needs to be paused + */ + void giveClock(IPassableTimer currHolder); + } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java similarity index 87% rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java index 66fb797..08c1adc 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java @@ -16,15 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.control.common.job.profiling; +package org.apache.hyracks.api.job.profiling; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.hyracks.api.job.profiling.IOperatorStats; +import org.apache.hyracks.api.com.job.profiling.counters.Counter; import org.apache.hyracks.api.job.profiling.counters.ICounter; -import org.apache.hyracks.control.common.job.profiling.counters.Counter; public class OperatorStats implements IOperatorStats { private static final long serialVersionUID = 6401830963367567167L; @@ -85,4 +84,10 @@ public class OperatorStats implements IOperatorStats { timeCounter.set(input.readLong()); diskIoCounter.set(input.readLong()); } + + @Override + public String toString() { + return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"" + tupleCounter.getName() + "\": " + + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + " }"; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 12d696e..168d7dd 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -22,11 +22,13 @@ package org.apache.hyracks.api.rewriter.runtime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -41,6 +43,7 @@ import org.apache.hyracks.api.dataflow.EnforceFrameWriter; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.TimedOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -94,15 +97,22 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable } private void init() throws HyracksDataException { - Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>(); + LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>(); List<IConnectorDescriptor> outputConnectors; final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); + final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); /* * Set up the source operators */ + Set<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> sources = new HashSet<>(); for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) { - IOperatorNodePushable opPushable = - entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions); + IOperatorNodePushable opPushable = null; + if (profile) { + opPushable = TimedOperatorNodePushable + .time(entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions), ctx); + } else { + opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions); + } operatorNodePushablesBFSOrder.add(opPushable); operatorNodePushables.put(entry.getKey(), opPushable); inputArity += opPushable.getInputArity(); @@ -110,6 +120,7 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), Collections.emptyList()); for (IConnectorDescriptor conn : outputConnectors) { childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId())); + sources.add(childQueue.peekLast()); } } @@ -128,8 +139,13 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable IOperatorNodePushable sourceOp = operatorNodePushables.get(sourceId); IOperatorNodePushable destOp = operatorNodePushables.get(destId); if (destOp == null) { - destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition, - nPartitions); + if (profile) { + destOp = TimedOperatorNodePushable.time(channel.getRight().getLeft().createPushRuntime(ctx, + recordDescProvider, partition, nPartitions), ctx); + } else { + destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition, + nPartitions); + } operatorNodePushablesBFSOrder.add(destOp); operatorNodePushables.put(destId, destOp); } @@ -138,7 +154,7 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable * construct the dataflow connection from a producer to a consumer */ IFrameWriter writer = destOp.getInputFrameWriter(inputChannel); - writer = enforce ? EnforceFrameWriter.enforce(writer) : writer; + writer = (enforce && !profile) ? EnforceFrameWriter.enforce(writer) : writer; sourceOp.setOutputFrameWriter(outputChannel, writer, recordDescProvider.getInputRecordDescriptor(destId, inputChannel)); diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java index 6cb90e3..1a878a0 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java @@ -19,7 +19,7 @@ package org.apache.hyracks.client.stats; -import org.apache.hyracks.control.common.job.profiling.counters.Counter; +import org.apache.hyracks.api.com.job.profiling.counters.Counter; public class AggregateCounter extends Counter { private static final long serialVersionUID = 9140555872026977436L; diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java index 0d3affc..84831e2 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java @@ -30,11 +30,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.hyracks.api.com.job.profiling.counters.Counter; import org.apache.hyracks.api.job.profiling.counters.ICounter; import org.apache.hyracks.client.stats.AggregateCounter; import org.apache.hyracks.client.stats.Counters; import org.apache.hyracks.client.stats.IClusterCounterContext; -import org.apache.hyracks.control.common.job.profiling.counters.Counter; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java index 8da19ac..878d7c3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java @@ -21,20 +21,25 @@ package org.apache.hyracks.control.common.job.profiling; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.HashMap; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.LinkedHashMap; import java.util.Map; -import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.dataflow.IPassableTimer; import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.job.profiling.IStatsCollector; +import org.apache.hyracks.api.job.profiling.OperatorStats; public class StatsCollector implements IStatsCollector { - private static final long serialVersionUID = 6858817639895434577L; + private static final long serialVersionUID = 6858817639895434578L; - private final Map<String, IOperatorStats> operatorStatsMap = new HashMap<>(); + private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>(); + private transient Deque<IPassableTimer> clockHolder = new ArrayDeque<>(); @Override - public void add(IOperatorStats operatorStats) throws HyracksDataException { + public void add(IOperatorStats operatorStats) { if (operatorStatsMap.containsKey(operatorStats.getName())) { throw new IllegalArgumentException("Operator with the same name already exists"); } @@ -42,8 +47,13 @@ public class StatsCollector implements IStatsCollector { } @Override - public IOperatorStats getOperatorStats(String operatorName) { - return operatorStatsMap.get(operatorName); + public IOperatorStats getOrAddOperatorStats(String operatorName) { + return operatorStatsMap.computeIfAbsent(operatorName, OperatorStats::new); + } + + @Override + public Map<String, IOperatorStats> getAllOperatorStats() { + return Collections.unmodifiableMap(operatorStatsMap); } public static StatsCollector create(DataInput input) throws IOException { @@ -79,4 +89,24 @@ public class StatsCollector implements IStatsCollector { operatorStatsMap.put(opStats.getName(), opStats); } } + + @Override + public long takeClock(IPassableTimer newHolder) { + if (newHolder != null) { + if (clockHolder.peek() != null) { + clockHolder.peek().pause(); + } + clockHolder.push(newHolder); + } + return System.nanoTime(); + } + + @Override + public void giveClock(IPassableTimer currHolder) { + clockHolder.removeLastOccurrence(currHolder); + if (clockHolder.peek() != null) { + clockHolder.peek().resume(); + } + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java index 6dfb71d..1e7a3b7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java @@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.job.profiling.om; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.text.DecimalFormat; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -29,6 +30,7 @@ import java.util.Set; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.exceptions.Warning; +import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.job.profiling.IStatsCollector; import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.control.common.job.profiling.StatsCollector; @@ -116,10 +118,24 @@ public class TaskProfile extends AbstractProfile { json.set("partition-send-profile", pspArray); } populateCounters(json); - return json; } + @Override + protected void populateCounters(ObjectNode json) { + ObjectMapper om = new ObjectMapper(); + Map<String, IOperatorStats> opTimes = statsCollector.getAllOperatorStats(); + ArrayNode countersObj = om.createArrayNode(); + opTimes.forEach((key, value) -> { + ObjectNode jpe = om.createObjectNode(); + jpe.put("name", key); + jpe.put("time", Double + .parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000))); + countersObj.add(jpe); + }); + json.set("counters", countersObj); + } + public IStatsCollector getStatsCollector() { return statsCollector; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java index 8afdc9e..d9b3961 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.com.job.profiling.counters.Counter; import org.apache.hyracks.api.comm.IPartitionCollector; import org.apache.hyracks.api.comm.PartitionChannel; import org.apache.hyracks.api.context.IHyracksJobletContext; @@ -53,7 +54,6 @@ import org.apache.hyracks.control.common.deployment.DeploymentUtils; import org.apache.hyracks.control.common.job.PartitionRequest; import org.apache.hyracks.control.common.job.PartitionState; import org.apache.hyracks.control.common.job.profiling.StatsCollector; -import org.apache.hyracks.control.common.job.profiling.counters.Counter; import org.apache.hyracks.control.common.job.profiling.om.JobletProfile; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 81b9d5e..00de038 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hyracks.api.com.job.profiling.counters.Counter; import org.apache.hyracks.api.comm.IFrameReader; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.IPartitionCollector; @@ -66,7 +67,6 @@ import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.common.job.PartitionState; import org.apache.hyracks.control.common.job.profiling.StatsCollector; -import org.apache.hyracks.control.common.job.profiling.counters.Counter; import org.apache.hyracks.control.common.job.profiling.om.PartitionProfile; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index 09a4c18..69f1113 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -169,6 +169,7 @@ public class StartTasksWork extends AbstractWork { List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid); if (outputs != null) { final boolean enforce = flags.contains(JobFlag.ENFORCE_CONTRACT); + final boolean profile = flags.contains(JobFlag.PROFILE_RUNTIME); for (int i = 0; i < outputs.size(); ++i) { final IConnectorDescriptor conn = outputs.get(i); RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId()); @@ -179,7 +180,7 @@ public class StartTasksWork extends AbstractWork { LOGGER.trace("input: {}: {}", i, conn.getConnectorId()); IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, td.getPartitionCount(), td.getOutputPartitionCounts()[i]); - writer = enforce ? EnforceFrameWriter.enforce(writer) : writer; + writer = (enforce && !profile) ? EnforceFrameWriter.enforce(writer) : writer; operator.setOutputFrameWriter(i, writer, recordDesc); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java index 7ca01a5..c545e7d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java @@ -30,12 +30,14 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory; import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger; -import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator; import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor; import org.apache.hyracks.dataflow.std.sort.Algorithm; +import org.apache.hyracks.dataflow.std.sort.IRunGenerator; +import org.apache.hyracks.dataflow.std.sort.TimedRunGenerator; /** * This Operator pushes group-by aggregation into the external sort. @@ -122,7 +124,7 @@ public class SortGroupByOperatorDescriptor extends AbstractSorterOperatorDescrip RecordDescriptor outRecordDesc, boolean finalStage) { super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, outRecordDesc); if (framesLimit <= 1) { - throw new IllegalStateException();// minimum of 2 fames (1 in,1 out) + throw new IllegalStateException(); // minimum of 2 frames (1 in,1 out) } this.groupFields = groupFields; @@ -139,12 +141,14 @@ public class SortGroupByOperatorDescriptor extends AbstractSorterOperatorDescrip private static final long serialVersionUID = 1L; @Override - protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx, + protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException { - return new ExternalSortGroupByRunGenerator(ctx, sortFields, + final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); + IRunGenerator runGen = new ExternalSortGroupByRunGenerator(ctx, sortFields, recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit, groupFields, keyNormalizerFactories, comparatorFactories, partialAggregatorFactory, partialAggRecordDesc, ALG); + return profile ? TimedRunGenerator.time(runGen, ctx, "GroupBy (Sort Runs)") : runGen; } }; } @@ -165,4 +169,14 @@ public class SortGroupByOperatorDescriptor extends AbstractSorterOperatorDescrip } }; } + + @Override + public String getDisplayName() { + return "GroupBy (Sort)"; + } + + @Override + public String toString() { + return getDisplayName(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index 2fd17da..6d2b485 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -320,6 +320,11 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD isFailed = true; } + @Override + public String getDisplayName() { + return "Hybrid Hash Join: Build"; + } + }; } } @@ -800,6 +805,11 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD outerReader.close(); } } + + @Override + public String getDisplayName() { + return "Hybrid Hash Join: Probe & Join"; + } }; return op; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java index 8006fc6..a4cfa13 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java @@ -146,6 +146,11 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat sb.append("\"maxReads\": ").append(maxReads).append(" }"); return sb.toString(); } + + @Override + public String getDisplayName() { + return "Result Writer"; + } }; } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java index 6abc064..6ff79b7 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java @@ -102,14 +102,14 @@ public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorD super(id); } - protected abstract AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx, + protected abstract IRunGenerator getRunGenerator(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider) throws HyracksDataException; @Override public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) { return new AbstractUnaryInputSinkOperatorNodePushable() { - private AbstractSortRunGenerator runGen; + private IRunGenerator runGen; @Override public void open() throws HyracksDataException { @@ -139,6 +139,11 @@ public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorD public void fail() throws HyracksDataException { runGen.fail(); } + + @Override + public String getDisplayName() { + return "Sort (Run Generation)"; + } }; } } @@ -155,6 +160,7 @@ public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorD IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames); @Override + @SuppressWarnings("squid:S1188") public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) { return new AbstractUnaryOutputSourceOperatorNodePushable() { @@ -203,6 +209,11 @@ public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorD } } } + + @Override + public String getDisplayName() { + return "Sort (Run Merge)"; + } }; } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java index 8b80a26..654f3a3 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java @@ -30,6 +30,7 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy; @@ -76,10 +77,12 @@ public class ExternalSortOperatorDescriptor extends AbstractSorterOperatorDescri private static final long serialVersionUID = 1L; @Override - protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx, + protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider) throws HyracksDataException { - return new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories, comparatorFactories, - outRecDescs[0], alg, policy, framesLimit, outputLimit); + final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); + IRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories, + comparatorFactories, outRecDescs[0], alg, policy, framesLimit, outputLimit); + return profile ? TimedRunGenerator.time(runGen, ctx, "ExternalSort(Sort)") : runGen; } }; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java index 02880dd..d43d02d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java @@ -33,4 +33,6 @@ public interface IRunGenerator extends IFrameWriter { * @return the list of generated (sorted) runs */ List<GeneratedRunFileReader> getRuns(); + + ISorter getSorter(); } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TimedRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TimedRunGenerator.java new file mode 100644 index 0000000..b3a4aee --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TimedRunGenerator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.sort; + +import java.util.List; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.TimedFrameWriter; +import org.apache.hyracks.api.job.profiling.IStatsCollector; +import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; + +public class TimedRunGenerator extends TimedFrameWriter implements IRunGenerator { + + private final IRunGenerator runGenerator; + + private TimedRunGenerator(IRunGenerator runGenerator, IStatsCollector collector, String name) { + super(runGenerator, collector, name); + this.runGenerator = runGenerator; + } + + @Override + public List<GeneratedRunFileReader> getRuns() { + return runGenerator.getRuns(); + } + + @Override + public ISorter getSorter() { + return runGenerator.getSorter(); + } + + public static IRunGenerator time(IRunGenerator runGenerator, IHyracksTaskContext ctx, String name) { + return runGenerator instanceof TimedRunGenerator ? runGenerator + : new TimedRunGenerator(runGenerator, ctx.getStatsCollector(), name); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java index b29057f..b7ff530 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java @@ -30,6 +30,7 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescriptor { @@ -59,10 +60,12 @@ public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescript private static final long serialVersionUID = 1L; @Override - protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx, + protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider) { - return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, keyNormalizerFactories, - comparatorFactories, outRecDescs[0]); + final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); + IRunGenerator runGen = new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, + keyNormalizerFactories, comparatorFactories, outRecDescs[0]); + return profile ? TimedRunGenerator.time(runGen, ctx, "TopKSort (Sort)") : runGen; } }; @@ -82,4 +85,10 @@ public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescript } }; } + + @Override + public String getDisplayName() { + return "Top K Sort"; + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java index fd985db..6f59ed8 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java @@ -27,7 +27,6 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; -import java.util.EnumSet; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -38,7 +37,6 @@ import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; -import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.result.IResultSet; @@ -138,7 +136,7 @@ public abstract class AbstractIntegrationTest { if (LOGGER.isInfoEnabled()) { LOGGER.info(spec.toJSON().asText()); } - JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME)); + JobId jobId = hcc.startJob(spec); if (LOGGER.isInfoEnabled()) { LOGGER.info(jobId.toString()); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java index 566d8e2..001e250 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java @@ -101,4 +101,9 @@ public class BTreeSearchOperatorDescriptor extends AbstractSingleActivityOperato searchCallbackProceedResultTrueValue); } + @Override + public String getDisplayName() { + return "BTree Search"; + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java index 12dc310..44af086 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java @@ -36,6 +36,7 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.profiling.IStatsCollector; import org.apache.hyracks.api.test.CountAndThrowError; import org.apache.hyracks.api.test.CountAndThrowException; import org.apache.hyracks.api.test.CountAnswer; @@ -307,6 +308,7 @@ public class FramewriterTest { IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class); IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class); INCServiceContext serviceCtx = Mockito.mock(INCServiceContext.class); + IStatsCollector collector = Mockito.mock(IStatsCollector.class); Mockito.when(ctx.allocateFrame()).thenReturn(mockByteBuffer()); Mockito.when(ctx.allocateFrame(Mockito.anyInt())).thenReturn(mockByteBuffer()); Mockito.when(ctx.getInitialFrameSize()).thenReturn(BUFFER_SIZE); @@ -314,6 +316,7 @@ public class FramewriterTest { .thenReturn(mockByteBuffer()); Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx); Mockito.when(jobletCtx.getServiceContext()).thenReturn(serviceCtx); + Mockito.when(ctx.getStatsCollector()).thenReturn(collector); return new IHyracksTaskContext[] { ctx }; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml index 262570a..31463b7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml @@ -92,11 +92,6 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-control-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java index d504a96..142e879 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java @@ -32,7 +32,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.api.util.ExceptionUtils; -import org.apache.hyracks.control.common.job.profiling.OperatorStats; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -93,7 +92,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput protected ArrayTupleBuilder nonFilterTupleBuild; protected final ISearchOperationCallbackFactory searchCallbackFactory; protected boolean failed = false; - private final IOperatorStats stats; + private IOperatorStats stats; // Used when the result of the search operation callback needs to be passed. protected boolean appendSearchCallbackProceedResult; @@ -150,13 +149,13 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput this.appendSearchCallbackProceedResult = appendSearchCallbackProceedResult; this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue; this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue; - stats = new OperatorStats(getDisplayName()); - if (ctx.getStatsCollector() != null) { - ctx.getStatsCollector().add(stats); - } this.tupleFilterFactory = tupleFactoryFactory; this.outputLimit = outputLimit; + if (ctx != null && ctx.getStatsCollector() != null) { + stats = ctx.getStatsCollector().getOrAddOperatorStats(getDisplayName()); + } + if (this.tupleFilterFactory != null && this.retainMissing) { throw new IllegalStateException("RetainMissing with tuple filter is not supported"); } @@ -435,4 +434,9 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput } + @Override + public String getDisplayName() { + return "Index Search"; + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java index 7cbf948..267c69e 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java @@ -21,9 +21,9 @@ package org.apache.hyracks.test.support; import java.util.HashMap; import java.util.Map; +import org.apache.hyracks.api.com.job.profiling.counters.Counter; import org.apache.hyracks.api.job.profiling.counters.ICounter; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; -import org.apache.hyracks.control.common.job.profiling.counters.Counter; public class CounterContext implements ICounterContext { private final String contextName;