>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20523?usp=email )
Change subject: [ASTERIXDB-3667][OTH] Log counts of currently running jobs based on kind ...................................................................... [ASTERIXDB-3667][OTH] Log counts of currently running jobs based on kind - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-69127 Change-Id: I80e1270a514fb9683f3f429e54aeaa2c5ee77a9a Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20523 Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Ali Alsuliman <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java 18 files changed, 231 insertions(+), 51 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Ali Alsuliman: Looks good to me, but someone else must approve; Verified Jenkins: Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java index d8775c8..1910103 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java @@ -68,7 +68,9 @@ import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.job.HyracksJobProperty; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.result.IResultSetReader; import org.apache.hyracks.api.result.ResultSetId; @@ -281,7 +283,7 @@ compiler.optimize(); JobSpecification jobSpec = compiler.createJob(appCtx, new JobEventListenerFactory(newTxnId, false)); - + jobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.SYS_QUERY); JobId jobId = JobUtils.runJobIfActive(appCtx.getHcc(), jobSpec, true); IResultSetReader resultSetReader = appCtx.getResultSet().createReader(jobId, resultSetId); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 01e47b5..347a49e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -33,6 +33,7 @@ import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.utils.AsterixJobProperty; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.commons.lang3.tuple.Pair; @@ -54,7 +55,6 @@ private static final Logger LOGGER = LogManager.getLogger(); private static final Level level = Level.DEBUG; - public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob"; private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners; private final Map<JobId, EntityId> jobId2EntityId; private boolean suspended = false; @@ -97,10 +97,10 @@ @Override public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification, IJobCapacityController.JobSubmissionStatus status) throws HyracksDataException { - Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); + Object property = jobSpecification.getProperty(AsterixJobProperty.ACTIVE_ENTITY); if (!(property instanceof EntityId)) { if (property != null) { - LOGGER.debug("{} is not an ingestion job. job property={}", jobId, property); + LOGGER.debug("{} is not an ingestion job. found entity={}", jobId, property); } return; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index 4674f7e..7f045a3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.app.active; +import static org.apache.asterix.common.utils.AsterixJobProperty.ACTIVE_ENTITY; +import static org.apache.hyracks.api.job.HyracksJobProperty.JOB_KIND; + import java.util.EnumSet; import java.util.List; @@ -45,6 +48,7 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; public class FeedEventsListener extends ActiveEntityEventsListener { @@ -103,7 +107,8 @@ Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo = FeedOperations.buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc); JobSpecification feedJob = jobInfo.getLeft(); - feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); + feedJob.setProperty(ACTIVE_ENTITY, entityId); + feedJob.setProperty(JOB_KIND, JobKind.INGESTION); // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. // We will need to design general exception handling mechanism for feeds. setLocations(jobInfo.getRight()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java index d509ba6..2ae8a0a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java @@ -32,6 +32,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.transactions.IGlobalTransactionContext; +import org.apache.asterix.common.utils.AsterixJobProperty; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.transaction.management.service.transaction.GlobalTransactionContext; import org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo; @@ -56,7 +57,6 @@ private final Map<JobId, IGlobalTransactionContext> txnContextRepository = new ConcurrentHashMap<>(); private final ICCServiceContext serviceContext; private final IOManager ioManager; - public static final String GlOBAL_TX_PROPERTY_NAME = "GlobalTxProperty"; public GlobalTxManager(ICCServiceContext serviceContext, IOManager ioManager) { this.serviceContext = serviceContext; @@ -242,7 +242,7 @@ @Override public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status) throws HyracksException { - GlobalTxInfo globalTxInfo = (GlobalTxInfo) spec.getProperty(GlOBAL_TX_PROPERTY_NAME); + GlobalTxInfo globalTxInfo = (GlobalTxInfo) spec.getProperty(AsterixJobProperty.GLOBAL_TX); if (globalTxInfo != null) { beginTransaction(jobId, globalTxInfo.getNumNodes(), globalTxInfo.getNumPartitions(), globalTxInfo.getDatasetIds()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java index f32b8ef..bd9b322 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java @@ -42,6 +42,8 @@ import org.apache.hyracks.algebricks.common.utils.Triple; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.job.HyracksJobProperty; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; @@ -66,6 +68,10 @@ JobSpecification abortJobSpec = createLibraryAbortJobSpec(namespace, libraryName, appCtx, splitsAndConstraint); + prepareJobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + commitJobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + abortJobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + return new Triple<>(prepareJobSpec, commitJobSpec, abortJobSpec); } @@ -110,7 +116,7 @@ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, splitsAndConstraint.second); jobSpec.addRoot(opDesc); - + jobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); return jobSpec; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 5e4706c..731f411 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -22,6 +22,7 @@ import static org.apache.asterix.common.utils.IdentifierUtil.dataset; import static org.apache.asterix.common.utils.IdentifierUtil.dataverse; import static org.apache.asterix.lang.common.statement.CreateFullTextFilterStatement.FIELD_TYPE_STOPWORDS; +import static org.apache.hyracks.api.job.HyracksJobProperty.JOB_KIND; import static org.apache.hyracks.control.nc.result.ResultState.UNLIMITED_READS; import java.io.FileInputStream; @@ -63,7 +64,6 @@ import org.apache.asterix.app.active.ActiveEntityEventsListener; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.active.FeedEventsListener; -import org.apache.asterix.app.cc.GlobalTxManager; import org.apache.asterix.app.external.ExternalLibraryJobUtils; import org.apache.asterix.app.result.ExecutionError; import org.apache.asterix.app.result.ResultHandle; @@ -108,6 +108,7 @@ import org.apache.asterix.common.metadata.MetadataConstants; import org.apache.asterix.common.metadata.MetadataUtil; import org.apache.asterix.common.metadata.Namespace; +import org.apache.asterix.common.utils.AsterixJobProperty; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; import org.apache.asterix.common.utils.StorageConstants; @@ -286,6 +287,7 @@ import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.result.IResultSet; @@ -4136,6 +4138,7 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; if (spec != null && !isCompileOnly()) { + spec.setProperty(JOB_KIND, JobKind.DML); runJob(hcc, spec); } } catch (Exception e) { @@ -4223,13 +4226,14 @@ LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(dataset.getDatasetId()); - spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds, + spec.setProperty(AsterixJobProperty.GLOBAL_TX, new GlobalTxInfo(participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); } String reqId = requestParameters.getRequestReference().getUuid(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); - jobId = runTrackJob(hcc, spec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest); + jobId = runTrackJob(hcc, spec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest, + JobKind.DML); clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { @@ -4357,7 +4361,7 @@ }; deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - requestParameters, true, null, clientRequest); + requestParameters, true, null, clientRequest, JobKind.DML); } public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, @@ -4409,7 +4413,7 @@ ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); if (stmtInsertUpsert.getReturnExpression() != null) { deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - reqParams, true, stmt, clientRequest); + reqParams, true, stmt, clientRequest, JobKind.DML); } else { locker.lock(); JobId jobId = null; @@ -4429,10 +4433,11 @@ LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(ds.getDatasetId()); - jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( - participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); + jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new GlobalTxInfo(participatingDatasetIds, + numParticipatingNodes, numParticipatingPartitions)); } - jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, reqParams.getClientContextId(), clientRequest); + jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, reqParams.getClientContextId(), clientRequest, + JobKind.DML); clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { @@ -4497,14 +4502,14 @@ LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(ds.getDatasetId()); - jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( - participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); + jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new GlobalTxInfo(participatingDatasetIds, + numParticipatingNodes, numParticipatingPartitions)); } String reqId = requestParameters.getRequestReference().getUuid(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), - clientRequest); + clientRequest, JobKind.DML); clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { @@ -4533,8 +4538,9 @@ } private static JobId runTrackJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, - String reqId, String clientCtxId, ClientRequest clientRequest) throws Exception { + String reqId, String clientCtxId, ClientRequest clientRequest, JobKind jobKind) throws Exception { jobSpec.setRequestId(reqId); + jobSpec.setProperty(JOB_KIND, jobKind); JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false); LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, reqId, clientCtxId); clientRequest.setJobId(jobId); @@ -5541,19 +5547,19 @@ } }; deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - requestParameters, true, null, clientRequest); + requestParameters, true, null, clientRequest, JobKind.USER_QUERY); } private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler, MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable, - Statement atomicStmt, ClientRequest clientRequest) throws Exception { + Statement atomicStmt, ClientRequest clientRequest, JobKind jobKind) throws Exception { final ResultSetId resultSetId = metadataProvider.getResultSetId(); switch (resultDelivery) { case ASYNC: MutableBoolean printed = new MutableBoolean(false); executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery, - requestParameters, cancellable, resultSetId, printed, metadataProvider, atomicStmt)); + requestParameters, cancellable, resultSetId, printed, metadataProvider, atomicStmt, jobKind)); synchronized (printed) { while (!printed.booleanValue()) { printed.wait(); @@ -5567,7 +5573,7 @@ responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader, metadataProvider.findOutputRecordType(), stats, sessionOutput)); responsePrinter.printResults(); - }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt); + }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt, jobKind); break; case DEFERRED: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { @@ -5584,7 +5590,7 @@ outMetadata.getResultSets() .add(new ResultSetInfo(id, resultSetId, metadataProvider.findOutputRecordType())); } - }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt); + }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt, jobKind); break; default: break; @@ -5616,7 +5622,8 @@ private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable, - ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider, Statement atomicStmt) { + ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider, Statement atomicStmt, + JobKind jobKind) { Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID); final CompletableFuture<JobId> jobIdFuture = new CompletableFuture<>(); Future<?> jobSubmitFuture = executorService.submit(() -> { @@ -5632,7 +5639,7 @@ printed.setTrue(); printed.notify(); } - }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt); + }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt, jobKind); } catch (Exception e) { jobIdFuture.completeExceptionally(e); throw new RuntimeException(e); @@ -5717,7 +5724,7 @@ private void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId, IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx, - MetadataProvider metadataProvider, Statement atomicStatement) throws Exception { + MetadataProvider metadataProvider, Statement atomicStatement, JobKind jobKind) throws Exception { String reqId = requestParameters.getRequestReference().getUuid(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); @@ -5750,12 +5757,13 @@ LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(ds.getDatasetId()); - jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( - participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); + jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new GlobalTxInfo(participatingDatasetIds, + numParticipatingNodes, numParticipatingPartitions)); } } - jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest); + jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest, + jobKind); if (jId != null) { jId.setValue(jobId); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java index d132b6b..fd3e59b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java @@ -23,6 +23,8 @@ import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.runtime.utils.RuntimeUtils; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.api.job.HyracksJobProperty; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor; @@ -47,6 +49,7 @@ new FileRemoveOperatorDescriptor(jobSpec, pp.getSplitsProvider(), false, pp.getComputeStorageMap()); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, pp.getConstraints()); jobSpec.addRoot(frod); + jobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); return jobSpec; } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index a2cbaa5..ca9f1d4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -63,6 +63,8 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.HyracksJobProperty; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor; @@ -348,6 +350,7 @@ spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0, commitOp, 0); // Executes the job. + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DML); JobUtils.forceRunJob(hcc, spec, true); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index 9ff0514..53b4796 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -43,6 +43,7 @@ import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.common.metadata.MetadataUtil; +import org.apache.asterix.common.utils.AsterixJobProperty; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -98,7 +99,7 @@ // Mock JobSpecification JobSpecification jobSpec = Mockito.mock(JobSpecification.class); - Mockito.when(jobSpec.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)).thenReturn(entityId); + Mockito.when(jobSpec.getProperty(AsterixJobProperty.ACTIVE_ENTITY)).thenReturn(entityId); // Mock MetadataProvider CCExtensionManager extensionManager = (CCExtensionManager) appCtx.getExtensionManager(); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java index 883a0cb..9ea8573 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java @@ -23,6 +23,7 @@ import org.apache.asterix.active.ActiveEvent; import org.apache.asterix.active.EntityId; import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.common.utils.AsterixJobProperty; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.hyracks.api.job.JobId; @@ -48,8 +49,7 @@ protected void doExecute(MetadataProvider actorMdProvider) throws Exception { // succeed JobSpecification jobSpecification = Mockito.mock(JobSpecification.class); - Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)) - .thenReturn(entityId); + Mockito.when(jobSpecification.getProperty(AsterixJobProperty.ACTIVE_ENTITY)).thenReturn(entityId); handler.notifyJobCreation(jobId, jobSpecification, IJobCapacityController.JobSubmissionStatus.EXECUTE); handler.notifyJobStart(jobId, null); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java new file mode 100644 index 0000000..760c5f7 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.utils; + +import org.apache.hyracks.api.job.IJobProperty; + +public enum AsterixJobProperty implements IJobProperty { + ACTIVE_ENTITY, + GLOBAL_TX +} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index c39395b..e637f28 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -94,6 +94,8 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.job.HyracksJobProperty; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; @@ -322,10 +324,11 @@ public static JobSpecification dropDatasetJobSpec(Dataset dataset, MetadataProvider metadataProvider, Set<IndexDropOperatorDescriptor.DropOption> options) throws AlgebricksException, ACIDException { LOGGER.info("DROP DATASET: " + dataset); - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - return RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - } JobSpecification specPrimary = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); + specPrimary.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + return specPrimary; + } PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), @@ -370,6 +373,7 @@ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, partitioningProperties.getConstraints()); spec.addRoot(indexCreateOp); + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); return spec; } @@ -766,6 +770,7 @@ IOperatorDescriptor truncateOp = new TruncateOperatorDescriptor(job, nc2Resources); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(job, truncateOp, nodeSet); hcc = metadataProvider.getApplicationContext().getHcc(); + job.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DML); JobUtils.runJobIfActive(hcc, job, true); } else { // check should have been done by caller diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index dfefb8f..79db1a0 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -75,7 +75,9 @@ import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.job.HyracksJobProperty; import org.apache.hyracks.api.job.IJobletEventListenerFactory; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory; import org.apache.hyracks.storage.am.common.impls.NoOpTupleProjectorFactory; @@ -162,21 +164,27 @@ Dataset dataset, SourceLocation sourceLoc) throws AlgebricksException { ISecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, sourceLoc); - return secondaryIndexHelper.buildDropJobSpec(EnumSet.noneOf(DropOption.class)); + JobSpecification spec = secondaryIndexHelper.buildDropJobSpec(EnumSet.noneOf(DropOption.class)); + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + return spec; } public static JobSpecification buildDropIndexJobSpec(Index index, MetadataProvider metadataProvider, Dataset dataset, Set<DropOption> options, SourceLocation sourceLoc) throws AlgebricksException { ISecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, sourceLoc); - return secondaryIndexHelper.buildDropJobSpec(options); + JobSpecification spec = secondaryIndexHelper.buildDropJobSpec(options); + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + return spec; } public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset dataset, Index index, MetadataProvider metadataProvider, SourceLocation sourceLoc) throws AlgebricksException { ISecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, sourceLoc); - return secondaryIndexHelper.buildCreationJobSpec(); + JobSpecification spec = secondaryIndexHelper.buildCreationJobSpec(); + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + return spec; } public static JobSpecification buildSecondaryIndexLoadingJobSpec(Dataset dataset, Index index, @@ -195,7 +203,9 @@ secondaryIndexHelper = SecondaryTreeIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, sourceLoc); } - return secondaryIndexHelper.buildLoadingJobSpec(); + JobSpecification spec = secondaryIndexHelper.buildLoadingJobSpec(); + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DML); + return spec; } private static boolean supportsCorrelated(DatasetConfig.IndexType indexType) { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java new file mode 100644 index 0000000..d8bf5cb --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +public enum HyracksJobProperty implements IJobProperty { + JOB_KIND +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java new file mode 100644 index 0000000..6d57579 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +import java.io.Serializable; + +public interface IJobProperty extends Serializable { +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java new file mode 100644 index 0000000..4c5e869 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +import java.io.Serializable; + +public enum JobKind implements Serializable { + USER_QUERY, + SYS_QUERY, + DDL, + DML, + INGESTION +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java index f644703..7761cec 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java @@ -70,7 +70,7 @@ private transient Map<Object, String> logical2PhysicalMap; - private final Map<String, Serializable> properties; + private final Map<IJobProperty, Serializable> properties; private final Set<Constraint> userConstraints; @@ -155,12 +155,12 @@ Pair.of(Pair.of(producerOp, producerPort), Pair.of(consumerOp, consumerPort))); } - public void setProperty(String name, Serializable value) { - properties.put(name, value); + public void setProperty(IJobProperty property, Serializable value) { + properties.put(property, value); } - public Serializable getProperty(String name) { - return properties.get(name); + public Serializable getProperty(IJobProperty property) { + return properties.get(property); } private <T> void extend(List<T> list, int index) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 9ecd165..269478a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -19,6 +19,7 @@ package org.apache.hyracks.control.cc.job; +import java.io.Serializable; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.time.ZoneId; @@ -38,7 +39,9 @@ import org.apache.hyracks.api.exceptions.IError; import org.apache.hyracks.api.exceptions.IFormattedException; import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.HyracksJobProperty; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.IClusterCapacity; @@ -75,6 +78,11 @@ private final AtomicLong totalFailedJobs; private final AtomicLong totalCancelledJobs; private final AtomicLong totalRejectedJobs; + private long queryJobs; + private long sysQueryJobs; + private long ddlJobs; + private long dmlJobs; + private long ingestionJobs; private IJobQueue jobQueue; public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) { @@ -244,9 +252,9 @@ JobId jobId = run.getJobId(); Throwable caughtException = null; CCServiceContext serviceCtx = ccs.getContext(); + JobSpecification spec = run.getJobSpecification(); try { - serviceCtx.notifyJobFinish(jobId, run.getJobSpecification(), run.getPendingStatus(), - run.getPendingExceptions()); + serviceCtx.notifyJobFinish(jobId, spec, run.getPendingStatus(), run.getPendingExceptions()); } catch (Exception e) { LOGGER.error("Exception notifying job finish {}", jobId, e); caughtException = e; @@ -254,6 +262,7 @@ run.setStatus(run.getPendingStatus(), run.getPendingExceptions()); run.setEndTime(System.currentTimeMillis()); if (activeRunMap.remove(jobId) != null) { + updateActiveJobCounts(spec, -1); incrementJobCounters(run, successful); // non-active jobs have zero capacity @@ -283,6 +292,31 @@ } } + private void updateActiveJobCounts(JobSpecification spec, int delta) { + Serializable property = spec.getProperty(HyracksJobProperty.JOB_KIND); + if (property instanceof JobKind) { + switch ((JobKind) property) { + case USER_QUERY: + queryJobs += delta; + break; + case SYS_QUERY: + sysQueryJobs += delta; + break; + case DDL: + ddlJobs += delta; + break; + case DML: + dmlJobs += delta; + break; + case INGESTION: + ingestionJobs += delta; + break; + default: + break; + } + } + } + /** * Increments the job counters depending on the status * @@ -394,6 +428,7 @@ run.setStartTimeZoneId(ZoneId.systemDefault().getId()); JobId jobId = run.getJobId(); logJobCapacity(run, "running", Level.INFO); + updateActiveJobCounts(run.getJobSpecification(), 1); activeRunMap.put(jobId, run); run.setStatus(JobStatus.RUNNING, null); executeJobInternal(run); @@ -452,10 +487,10 @@ } IReadOnlyClusterCapacity clusterCapacity = jobCapacityController.getClusterCapacity(); LOGGER.log(lvl, - "{} {}, job memory={}, cpu={}, (new) cluster memory={}, cpu={}, currently running={}, queued={}", + "{} {}, job memory={} & cpu={}, (new) cluster memory={}, cpu={}, queued={}, currently running={}, user queries={}, system queries={}, ddls={}, dmls={}, ingestions={}", jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs, - clusterCapacity.getAggregatedMemoryByteSize(), clusterCapacity.getAggregatedCores(), - getRunningJobsCount(), jobQueue.size()); + clusterCapacity.getAggregatedMemoryByteSize(), clusterCapacity.getAggregatedCores(), jobQueue.size(), + getRunningJobsCount(), queryJobs, sysQueryJobs, ddlJobs, dmlJobs, ingestionJobs); } private void handleException(HyracksException ex) { -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20523?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: I80e1270a514fb9683f3f429e54aeaa2c5ee77a9a Gerrit-Change-Number: 20523 Gerrit-PatchSet: 4 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
