>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20523?usp=email )

Change subject: [ASTERIXDB-3667][OTH] Log counts of currently running jobs 
based on kind
......................................................................

[ASTERIXDB-3667][OTH] Log counts of currently running jobs based on kind

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-69127

Change-Id: I80e1270a514fb9683f3f429e54aeaa2c5ee77a9a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20523
Reviewed-by: Ali Alsuliman <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Ali Alsuliman <[email protected]>
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
18 files changed, 231 insertions(+), 51 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved
  Ali Alsuliman: Looks good to me, but someone else must approve; Verified
  Jenkins: Verified




diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index d8775c8..1910103 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -68,7 +68,9 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.job.HyracksJobProperty;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobKind;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.IResultSetReader;
 import org.apache.hyracks.api.result.ResultSetId;
@@ -281,7 +283,7 @@
             compiler.optimize();

             JobSpecification jobSpec = compiler.createJob(appCtx, new 
JobEventListenerFactory(newTxnId, false));
-
+            jobSpec.setProperty(HyracksJobProperty.JOB_KIND, 
JobKind.SYS_QUERY);
             JobId jobId = JobUtils.runJobIfActive(appCtx.getHcc(), jobSpec, 
true);

             IResultSetReader resultSetReader = 
appCtx.getResultSet().createReader(jobId, resultSetId);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 01e47b5..347a49e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.utils.AsterixJobProperty;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.commons.lang3.tuple.Pair;
@@ -54,7 +55,6 @@

     private static final Logger LOGGER = LogManager.getLogger();
     private static final Level level = Level.DEBUG;
-    public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
     private final Map<EntityId, IActiveEntityEventsListener> 
entityEventListeners;
     private final Map<JobId, EntityId> jobId2EntityId;
     private boolean suspended = false;
@@ -97,10 +97,10 @@
     @Override
     public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification,
             IJobCapacityController.JobSubmissionStatus status) throws 
HyracksDataException {
-        Object property = 
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
+        Object property = 
jobSpecification.getProperty(AsterixJobProperty.ACTIVE_ENTITY);
         if (!(property instanceof EntityId)) {
             if (property != null) {
-                LOGGER.debug("{} is not an ingestion job. job property={}", 
jobId, property);
+                LOGGER.debug("{} is not an ingestion job. found entity={}", 
jobId, property);
             }
             return;
         }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 4674f7e..7f045a3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.app.active;

+import static org.apache.asterix.common.utils.AsterixJobProperty.ACTIVE_ENTITY;
+import static org.apache.hyracks.api.job.HyracksJobProperty.JOB_KIND;
+
 import java.util.EnumSet;
 import java.util.List;

@@ -45,6 +48,7 @@
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobKind;
 import org.apache.hyracks.api.job.JobSpecification;

 public class FeedEventsListener extends ActiveEntityEventsListener {
@@ -103,7 +107,8 @@
             Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> 
jobInfo =
                     FeedOperations.buildStartFeedJob(mdProvider, feed, 
feedConnections, statementExecutor, hcc);
             JobSpecification feedJob = jobInfo.getLeft();
-            
feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, 
entityId);
+            feedJob.setProperty(ACTIVE_ENTITY, entityId);
+            feedJob.setProperty(JOB_KIND, JobKind.INGESTION);
             // TODO(Yingyi): currently we do not check IFrameWriter protocol 
violations for Feed jobs.
             // We will need to design general exception handling mechanism for 
feeds.
             setLocations(jobInfo.getRight());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index d509ba6..2ae8a0a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.asterix.common.utils.AsterixJobProperty;
 import org.apache.asterix.common.utils.StorageConstants;
 import 
org.apache.asterix.transaction.management.service.transaction.GlobalTransactionContext;
 import 
org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo;
@@ -56,7 +57,6 @@
     private final Map<JobId, IGlobalTransactionContext> txnContextRepository = 
new ConcurrentHashMap<>();
     private final ICCServiceContext serviceContext;
     private final IOManager ioManager;
-    public static final String GlOBAL_TX_PROPERTY_NAME = "GlobalTxProperty";

     public GlobalTxManager(ICCServiceContext serviceContext, IOManager 
ioManager) {
         this.serviceContext = serviceContext;
@@ -242,7 +242,7 @@
     @Override
     public void notifyJobCreation(JobId jobId, JobSpecification spec, 
IJobCapacityController.JobSubmissionStatus status)
             throws HyracksException {
-        GlobalTxInfo globalTxInfo = (GlobalTxInfo) 
spec.getProperty(GlOBAL_TX_PROPERTY_NAME);
+        GlobalTxInfo globalTxInfo = (GlobalTxInfo) 
spec.getProperty(AsterixJobProperty.GLOBAL_TX);
         if (globalTxInfo != null) {
             beginTransaction(jobId, globalTxInfo.getNumNodes(), 
globalTxInfo.getNumPartitions(),
                     globalTxInfo.getDatasetIds());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java
index f32b8ef..bd9b322 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java
@@ -42,6 +42,8 @@
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.HyracksJobProperty;
+import org.apache.hyracks.api.job.JobKind;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;

@@ -66,6 +68,10 @@

         JobSpecification abortJobSpec = createLibraryAbortJobSpec(namespace, 
libraryName, appCtx, splitsAndConstraint);

+        prepareJobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL);
+        commitJobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL);
+        abortJobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL);
+
         return new Triple<>(prepareJobSpec, commitJobSpec, abortJobSpec);
     }

@@ -110,7 +116,7 @@
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, 
opDesc,
                 splitsAndConstraint.second);
         jobSpec.addRoot(opDesc);
-
+        jobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL);
         return jobSpec;
     }

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 5e4706c..731f411 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -22,6 +22,7 @@
 import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
 import static org.apache.asterix.common.utils.IdentifierUtil.dataverse;
 import static 
org.apache.asterix.lang.common.statement.CreateFullTextFilterStatement.FIELD_TYPE_STOPWORDS;
+import static org.apache.hyracks.api.job.HyracksJobProperty.JOB_KIND;
 import static org.apache.hyracks.control.nc.result.ResultState.UNLIMITED_READS;

 import java.io.FileInputStream;
@@ -63,7 +64,6 @@
 import org.apache.asterix.app.active.ActiveEntityEventsListener;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.active.FeedEventsListener;
-import org.apache.asterix.app.cc.GlobalTxManager;
 import org.apache.asterix.app.external.ExternalLibraryJobUtils;
 import org.apache.asterix.app.result.ExecutionError;
 import org.apache.asterix.app.result.ResultHandle;
@@ -108,6 +108,7 @@
 import org.apache.asterix.common.metadata.MetadataConstants;
 import org.apache.asterix.common.metadata.MetadataUtil;
 import org.apache.asterix.common.metadata.Namespace;
+import org.apache.asterix.common.utils.AsterixJobProperty;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.common.utils.StorageConstants;
@@ -286,6 +287,7 @@
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobKind;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.result.IResultSet;
@@ -4136,6 +4138,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null && !isCompileOnly()) {
+                spec.setProperty(JOB_KIND, JobKind.DML);
                 runJob(hcc, spec);
             }
         } catch (Exception e) {
@@ -4223,13 +4226,14 @@
                             
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
                     List<Integer> participatingDatasetIds = new ArrayList<>();
                     participatingDatasetIds.add(dataset.getDatasetId());
-                    spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, 
new GlobalTxInfo(participatingDatasetIds,
+                    spec.setProperty(AsterixJobProperty.GLOBAL_TX, new 
GlobalTxInfo(participatingDatasetIds,
                             numParticipatingNodes, 
numParticipatingPartitions));
                 }
                 String reqId = 
requestParameters.getRequestReference().getUuid();
                 final IRequestTracker requestTracker = 
appCtx.getRequestTracker();
                 final ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
-                jobId = runTrackJob(hcc, spec, jobFlags, reqId, 
requestParameters.getClientContextId(), clientRequest);
+                jobId = runTrackJob(hcc, spec, jobFlags, reqId, 
requestParameters.getClientContextId(), clientRequest,
+                        JobKind.DML);
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4357,7 +4361,7 @@
         };

         deliverResult(hcc, resultSet, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats,
-                requestParameters, true, null, clientRequest);
+                requestParameters, true, null, clientRequest, JobKind.DML);
     }

     public JobSpecification handleInsertUpsertStatement(MetadataProvider 
metadataProvider, Statement stmt,
@@ -4409,7 +4413,7 @@
         ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats,
-                    reqParams, true, stmt, clientRequest);
+                    reqParams, true, stmt, clientRequest, JobKind.DML);
         } else {
             locker.lock();
             JobId jobId = null;
@@ -4429,10 +4433,11 @@
                             
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
                     List<Integer> participatingDatasetIds = new ArrayList<>();
                     participatingDatasetIds.add(ds.getDatasetId());
-                    
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
-                            participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
+                    jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new 
GlobalTxInfo(participatingDatasetIds,
+                            numParticipatingNodes, 
numParticipatingPartitions));
                 }
-                jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
reqParams.getClientContextId(), clientRequest);
+                jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
reqParams.getClientContextId(), clientRequest,
+                        JobKind.DML);
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4497,14 +4502,14 @@
                             
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
                     List<Integer> participatingDatasetIds = new ArrayList<>();
                     participatingDatasetIds.add(ds.getDatasetId());
-                    
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
-                            participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
+                    jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new 
GlobalTxInfo(participatingDatasetIds,
+                            numParticipatingNodes, 
numParticipatingPartitions));
                 }
                 String reqId = 
requestParameters.getRequestReference().getUuid();
                 final IRequestTracker requestTracker = 
appCtx.getRequestTracker();
                 final ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
                 jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
requestParameters.getClientContextId(),
-                        clientRequest);
+                        clientRequest, JobKind.DML);
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4533,8 +4538,9 @@
     }

     private static JobId runTrackJob(IHyracksClientConnection hcc, 
JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
-            String reqId, String clientCtxId, ClientRequest clientRequest) 
throws Exception {
+            String reqId, String clientCtxId, ClientRequest clientRequest, 
JobKind jobKind) throws Exception {
         jobSpec.setRequestId(reqId);
+        jobSpec.setProperty(JOB_KIND, jobKind);
         JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false);
         LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, 
reqId, clientCtxId);
         clientRequest.setJobId(jobId);
@@ -5541,19 +5547,19 @@
             }
         };
         deliverResult(hcc, resultSet, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats,
-                requestParameters, true, null, clientRequest);
+                requestParameters, true, null, clientRequest, 
JobKind.USER_QUERY);
     }

     private void deliverResult(IHyracksClientConnection hcc, IResultSet 
resultSet, IStatementCompiler compiler,
             MetadataProvider metadataProvider, IMetadataLocker locker, 
ResultDelivery resultDelivery,
             ResultMetadata outMetadata, Stats stats, IRequestParameters 
requestParameters, boolean cancellable,
-            Statement atomicStmt, ClientRequest clientRequest) throws 
Exception {
+            Statement atomicStmt, ClientRequest clientRequest, JobKind 
jobKind) throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
                 MutableBoolean printed = new MutableBoolean(false);
                 executorService.submit(() -> asyncCreateAndRunJob(hcc, 
compiler, locker, resultDelivery,
-                        requestParameters, cancellable, resultSetId, printed, 
metadataProvider, atomicStmt));
+                        requestParameters, cancellable, resultSetId, printed, 
metadataProvider, atomicStmt, jobKind));
                 synchronized (printed) {
                     while (!printed.booleanValue()) {
                         printed.wait();
@@ -5567,7 +5573,7 @@
                     responsePrinter.addResultPrinter(new 
ResultsPrinter(appCtx, resultReader,
                             metadataProvider.findOutputRecordType(), stats, 
sessionOutput));
                     responsePrinter.printResults();
-                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt);
+                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt, jobKind);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, 
resultDelivery, id -> {
@@ -5584,7 +5590,7 @@
                         outMetadata.getResultSets()
                                 .add(new ResultSetInfo(id, resultSetId, 
metadataProvider.findOutputRecordType()));
                     }
-                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt);
+                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt, jobKind);
                 break;
             default:
                 break;
@@ -5616,7 +5622,8 @@

     private void asyncCreateAndRunJob(IHyracksClientConnection hcc, 
IStatementCompiler compiler, IMetadataLocker locker,
             ResultDelivery resultDelivery, IRequestParameters 
requestParameters, boolean cancellable,
-            ResultSetId resultSetId, MutableBoolean printed, MetadataProvider 
metadataProvider, Statement atomicStmt) {
+            ResultSetId resultSetId, MutableBoolean printed, MetadataProvider 
metadataProvider, Statement atomicStmt,
+            JobKind jobKind) {
         Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
         final CompletableFuture<JobId> jobIdFuture = new CompletableFuture<>();
         Future<?> jobSubmitFuture = executorService.submit(() -> {
@@ -5632,7 +5639,7 @@
                         printed.setTrue();
                         printed.notify();
                     }
-                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt);
+                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt, jobKind);
             } catch (Exception e) {
                 jobIdFuture.completeExceptionally(e);
                 throw new RuntimeException(e);
@@ -5717,7 +5724,7 @@
     private void createAndRunJob(IHyracksClientConnection hcc, 
EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, 
ResultDelivery resultDelivery, IResultPrinter printer,
             IRequestParameters requestParameters, boolean cancellable, 
ICcApplicationContext appCtx,
-            MetadataProvider metadataProvider, Statement atomicStatement) 
throws Exception {
+            MetadataProvider metadataProvider, Statement atomicStatement, 
JobKind jobKind) throws Exception {
         String reqId = requestParameters.getRequestReference().getUuid();
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
@@ -5750,12 +5757,13 @@
                             
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
                     List<Integer> participatingDatasetIds = new ArrayList<>();
                     participatingDatasetIds.add(ds.getDatasetId());
-                    
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
-                            participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
+                    jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new 
GlobalTxInfo(participatingDatasetIds,
+                            numParticipatingNodes, 
numParticipatingPartitions));
                 }
             }

-            jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
requestParameters.getClientContextId(), clientRequest);
+            jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
requestParameters.getClientContextId(), clientRequest,
+                    jobKind);
             if (jId != null) {
                 jId.setValue(jobId);
             }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
index d132b6b..fd3e59b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -23,6 +23,8 @@
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.api.job.HyracksJobProperty;
+import org.apache.hyracks.api.job.JobKind;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;

@@ -47,6 +49,7 @@
                 new FileRemoveOperatorDescriptor(jobSpec, 
pp.getSplitsProvider(), false, pp.getComputeStorageMap());
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, 
frod, pp.getConstraints());
         jobSpec.addRoot(frod);
+        jobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL);
         return jobSpec;
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index a2cbaa5..ca9f1d4 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -63,6 +63,8 @@
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.HyracksJobProperty;
+import org.apache.hyracks.api.job.JobKind;
 import org.apache.hyracks.api.job.JobSpecification;
 import 
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import 
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -348,6 +350,7 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0, 
commitOp, 0);

         // Executes the job.
+        spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DML);
         JobUtils.forceRunJob(hcc, spec, true);
     }

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 9ff0514..53b4796 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -43,6 +43,7 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.metadata.MetadataUtil;
+import org.apache.asterix.common.utils.AsterixJobProperty;
 import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -98,7 +99,7 @@

         // Mock JobSpecification
         JobSpecification jobSpec = Mockito.mock(JobSpecification.class);
-        
Mockito.when(jobSpec.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)).thenReturn(entityId);
+        
Mockito.when(jobSpec.getProperty(AsterixJobProperty.ACTIVE_ENTITY)).thenReturn(entityId);

         // Mock MetadataProvider
         CCExtensionManager extensionManager = (CCExtensionManager) 
appCtx.getExtensionManager();
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
index 883a0cb..9ea8573 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
@@ -23,6 +23,7 @@
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.common.utils.AsterixJobProperty;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.hyracks.api.job.JobId;
@@ -48,8 +49,7 @@
             protected void doExecute(MetadataProvider actorMdProvider) throws 
Exception {
                 // succeed
                 JobSpecification jobSpecification = 
Mockito.mock(JobSpecification.class);
-                
Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
-                        .thenReturn(entityId);
+                
Mockito.when(jobSpecification.getProperty(AsterixJobProperty.ACTIVE_ENTITY)).thenReturn(entityId);
                 handler.notifyJobCreation(jobId, jobSpecification, 
IJobCapacityController.JobSubmissionStatus.EXECUTE);
                 handler.notifyJobStart(jobId, null);
             }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java
new file mode 100644
index 0000000..760c5f7
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import org.apache.hyracks.api.job.IJobProperty;
+
+public enum AsterixJobProperty implements IJobProperty {
+    ACTIVE_ENTITY,
+    GLOBAL_TX
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index c39395b..e637f28 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -94,6 +94,8 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.HyracksJobProperty;
+import org.apache.hyracks.api.job.JobKind;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -322,10 +324,11 @@
     public static JobSpecification dropDatasetJobSpec(Dataset dataset, 
MetadataProvider metadataProvider,
             Set<IndexDropOperatorDescriptor.DropOption> options) throws 
AlgebricksException, ACIDException {
         LOGGER.info("DROP DATASET: " + dataset);
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        }
         JobSpecification specPrimary = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
+        specPrimary.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL);
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return specPrimary;
+        }
         PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
         IIndexDataflowHelperFactory indexHelperFactory =
                 new 
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
@@ -370,6 +373,7 @@
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
indexCreateOp,
                 partitioningProperties.getConstraints());
         spec.addRoot(indexCreateOp);
+        spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL);
         return spec;
     }

@@ -766,6 +770,7 @@
             IOperatorDescriptor truncateOp = new 
TruncateOperatorDescriptor(job, nc2Resources);
             
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(job, 
truncateOp, nodeSet);
             hcc = metadataProvider.getApplicationContext().getHcc();
+            job.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DML);
             JobUtils.runJobIfActive(hcc, job, true);
         } else {
             // check should have been done by caller
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index dfefb8f..79db1a0 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -75,7 +75,9 @@
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.job.HyracksJobProperty;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobKind;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpTupleProjectorFactory;
@@ -162,21 +164,27 @@
             Dataset dataset, SourceLocation sourceLoc) throws 
AlgebricksException {
         ISecondaryIndexOperationsHelper secondaryIndexHelper =
                 
SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, 
metadataProvider, sourceLoc);
-        return 
secondaryIndexHelper.buildDropJobSpec(EnumSet.noneOf(DropOption.class));
+        JobSpecification spec = 
secondaryIndexHelper.buildDropJobSpec(EnumSet.noneOf(DropOption.class));
+        spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL);
+        return spec;
     }

     public static JobSpecification buildDropIndexJobSpec(Index index, 
MetadataProvider metadataProvider,
             Dataset dataset, Set<DropOption> options, SourceLocation 
sourceLoc) throws AlgebricksException {
         ISecondaryIndexOperationsHelper secondaryIndexHelper =
                 
SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, 
metadataProvider, sourceLoc);
-        return secondaryIndexHelper.buildDropJobSpec(options);
+        JobSpecification spec = secondaryIndexHelper.buildDropJobSpec(options);
+        spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL);
+        return spec;
     }

     public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset 
dataset, Index index,
             MetadataProvider metadataProvider, SourceLocation sourceLoc) 
throws AlgebricksException {
         ISecondaryIndexOperationsHelper secondaryIndexHelper =
                 
SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, 
metadataProvider, sourceLoc);
-        return secondaryIndexHelper.buildCreationJobSpec();
+        JobSpecification spec = secondaryIndexHelper.buildCreationJobSpec();
+        spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL);
+        return spec;
     }

     public static JobSpecification buildSecondaryIndexLoadingJobSpec(Dataset 
dataset, Index index,
@@ -195,7 +203,9 @@
             secondaryIndexHelper = 
SecondaryTreeIndexOperationsHelper.createIndexOperationsHelper(dataset, index,
                     metadataProvider, sourceLoc);
         }
-        return secondaryIndexHelper.buildLoadingJobSpec();
+        JobSpecification spec = secondaryIndexHelper.buildLoadingJobSpec();
+        spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DML);
+        return spec;
     }

     private static boolean supportsCorrelated(DatasetConfig.IndexType 
indexType) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java
new file mode 100644
index 0000000..d8bf5cb
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+public enum HyracksJobProperty implements IJobProperty {
+    JOB_KIND
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java
new file mode 100644
index 0000000..6d57579
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.Serializable;
+
+public interface IJobProperty extends Serializable {
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java
new file mode 100644
index 0000000..4c5e869
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.Serializable;
+
+public enum JobKind implements Serializable {
+    USER_QUERY,
+    SYS_QUERY,
+    DDL,
+    DML,
+    INGESTION
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index f644703..7761cec 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -70,7 +70,7 @@

     private transient Map<Object, String> logical2PhysicalMap;

-    private final Map<String, Serializable> properties;
+    private final Map<IJobProperty, Serializable> properties;

     private final Set<Constraint> userConstraints;

@@ -155,12 +155,12 @@
                 Pair.of(Pair.of(producerOp, producerPort), Pair.of(consumerOp, 
consumerPort)));
     }

-    public void setProperty(String name, Serializable value) {
-        properties.put(name, value);
+    public void setProperty(IJobProperty property, Serializable value) {
+        properties.put(property, value);
     }

-    public Serializable getProperty(String name) {
-        return properties.get(name);
+    public Serializable getProperty(IJobProperty property) {
+        return properties.get(property);
     }

     private <T> void extend(List<T> list, int index) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 9ecd165..269478a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -19,6 +19,7 @@

 package org.apache.hyracks.control.cc.job;

+import java.io.Serializable;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.time.ZoneId;
@@ -38,7 +39,9 @@
 import org.apache.hyracks.api.exceptions.IError;
 import org.apache.hyracks.api.exceptions.IFormattedException;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.HyracksJobProperty;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobKind;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
@@ -75,6 +78,11 @@
     private final AtomicLong totalFailedJobs;
     private final AtomicLong totalCancelledJobs;
     private final AtomicLong totalRejectedJobs;
+    private long queryJobs;
+    private long sysQueryJobs;
+    private long ddlJobs;
+    private long dmlJobs;
+    private long ingestionJobs;
     private IJobQueue jobQueue;

     public JobManager(CCConfig ccConfig, ClusterControllerService ccs, 
IJobCapacityController jobCapacityController) {
@@ -244,9 +252,9 @@
         JobId jobId = run.getJobId();
         Throwable caughtException = null;
         CCServiceContext serviceCtx = ccs.getContext();
+        JobSpecification spec = run.getJobSpecification();
         try {
-            serviceCtx.notifyJobFinish(jobId, run.getJobSpecification(), 
run.getPendingStatus(),
-                    run.getPendingExceptions());
+            serviceCtx.notifyJobFinish(jobId, spec, run.getPendingStatus(), 
run.getPendingExceptions());
         } catch (Exception e) {
             LOGGER.error("Exception notifying job finish {}", jobId, e);
             caughtException = e;
@@ -254,6 +262,7 @@
         run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
         run.setEndTime(System.currentTimeMillis());
         if (activeRunMap.remove(jobId) != null) {
+            updateActiveJobCounts(spec, -1);
             incrementJobCounters(run, successful);

             // non-active jobs have zero capacity
@@ -283,6 +292,31 @@
         }
     }

+    private void updateActiveJobCounts(JobSpecification spec, int delta) {
+        Serializable property = spec.getProperty(HyracksJobProperty.JOB_KIND);
+        if (property instanceof JobKind) {
+            switch ((JobKind) property) {
+                case USER_QUERY:
+                    queryJobs += delta;
+                    break;
+                case SYS_QUERY:
+                    sysQueryJobs += delta;
+                    break;
+                case DDL:
+                    ddlJobs += delta;
+                    break;
+                case DML:
+                    dmlJobs += delta;
+                    break;
+                case INGESTION:
+                    ingestionJobs += delta;
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
     /**
      * Increments the job counters depending on the status
      *
@@ -394,6 +428,7 @@
         run.setStartTimeZoneId(ZoneId.systemDefault().getId());
         JobId jobId = run.getJobId();
         logJobCapacity(run, "running", Level.INFO);
+        updateActiveJobCounts(run.getJobSpecification(), 1);
         activeRunMap.put(jobId, run);
         run.setStatus(JobStatus.RUNNING, null);
         executeJobInternal(run);
@@ -452,10 +487,10 @@
         }
         IReadOnlyClusterCapacity clusterCapacity = 
jobCapacityController.getClusterCapacity();
         LOGGER.log(lvl,
-                "{} {}, job memory={}, cpu={}, (new) cluster memory={}, 
cpu={}, currently running={}, queued={}",
+                "{} {}, job memory={} & cpu={}, (new) cluster memory={}, 
cpu={}, queued={}, currently running={}, user queries={}, system queries={}, 
ddls={}, dmls={}, ingestions={}",
                 jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs,
-                clusterCapacity.getAggregatedMemoryByteSize(), 
clusterCapacity.getAggregatedCores(),
-                getRunningJobsCount(), jobQueue.size());
+                clusterCapacity.getAggregatedMemoryByteSize(), 
clusterCapacity.getAggregatedCores(), jobQueue.size(),
+                getRunningJobsCount(), queryJobs, sysQueryJobs, ddlJobs, 
dmlJobs, ingestionJobs);
     }

     private void handleException(HyracksException ex) {

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20523?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: I80e1270a514fb9683f3f429e54aeaa2c5ee77a9a
Gerrit-Change-Number: 20523
Gerrit-PatchSet: 4
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>

Reply via email to