Xikui Wang has posted comments on this change.

Change subject: [ASTERIXDB-1911][HYR,RT,CLUS] Fixes and Improvements for 
PreDistributed Jobs
......................................................................


Patch Set 14:

(30 comments)

I added a bound of comments. Hope that's not too much. Also, I'm not very clear 
about the listener interface change and its uses. Maybe @Till or others should 
look at it too.

https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/PredistributedJobService.java
File 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/PredistributedJobService.java:

Line 1: /*
This class is not used by anyone. Don't you need to start this in 
CCApplication? If this is a BAD only service, maybe move it to BAD? (I prefer 
the first option, since this could be useful for others too)


Line 43:             IHyracksClientConnection hcc, long duration, Map<byte[], 
byte[]> jobParameters, EntityId entityId) {
What is this duration for? Execution interval?


Line 44:         ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(1);
Change it to private static int and give it a propername.


Line 78:         Date checkStartTime = new Date();
see end time


Line 82:         byte[] jobIdParameter = JOB_ID_PARAMETER_NAME.getBytes();
Instead of getBytes everytime, maybe use JOB_ID_... = "fooname".getbytes() in 
the definition.


Line 86:         hcc.waitForCompletion(jobId);
Are we certain that all predistributed job calls are synchronized? maybe add 
this to function signature?


Line 87:         Date checkEndTime = new Date();
you don't really need to create two date objects here. Use Instant.now() and 
get the diffs.


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
File 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java:

Line 743:     public void handleCreateIndexStatement(MetadataProvider 
metadataProvider, Statement stmt,
> MAJOR SonarQube violation:
Used by BAD? Maybe we need a better solution here rather than simply change its 
signature. @Till


Line 1660:         String functionName = cfs.getSignature().getName();
change to getFunctionSignature() in function


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
File 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java:

Line 25:  * an interface for JobEventListenerFactories to add Asterix 
transaction id API
to add Asterix transaction id API?


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
File 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java:

Line 51:     protected final DataOutput dataOutput = 
resultStorage.getDataOutput();
reason?


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
File 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java:

Line 36: public class GetJobParameterDescriptor extends 
AbstractScalarFunctionDynamicDescriptor {
get_job_parameter by parameter name?


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
File 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java:

Line 53:         return new JobEventListenerFactory(jobId, transactionalWrite);
Do you need to create a new jobId here? It's passing reference to the new 
jobEventListenerFactory whose content can be changed...


Line 58:         byte[] jobIdParameter = jobIdParameterName.getBytes();
jobIdParameter or jobParameterName?


Line 61:         if (jobIdString.length() > 0) {
I'm a bit confused here. The method seems to return the parameter value, but 
here we cast it to string JobId?


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
File 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java:

Line 52:         return new MultiTransactionJobletEventListenerFactory(jobIds, 
transactionalWrite);
same comment from jobEventListener


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
File 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java:

Line 52:                     fact instanceof IJobEventListenerFactory ? 
((IJobEventListenerFactory) fact).getJobId() : jobId;
I'm not sure this is the right way to get JobId. @Till needs to look at this. 
Together with the two interface changes.


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
File 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java:

Line 62:                 fact instanceof IJobEventListenerFactory ? 
((IJobEventListenerFactory) fact).getJobId() : jobId;
same comments for all of them...


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
File 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java:

Line 38:     IJobletEventListenerFactory getEventListenerFactory();
getJobLetEventListenenrFactory


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
File 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java:

Line 64:         return "ACID:" + ":" + id;
remove this extra colon


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
File 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java:

Line 65:     public synchronized byte[] getNonpureSingletonValue(String 
functionName) {
What's this method used for? It's not used by others...


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
File 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java:

Line 367:     public JobParameterByteStore 
createOrGetJobParameterByteStore(JobId jobId) throws HyracksException {
Minor suggestion, can we merge this into PreDistriubtedJobStore too. This can 
be organized better if we have future things to add into predistributed jobs.


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
File 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java:

Line 34:     private final Map<Long, PreDistributedJobDescriptor> 
preDistributedJobDescriptorMap;
PredistributedId?


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
File 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java:

Line 517:                     byte[] jagBytes = changed ? acgBytes : null;
this line can be combined with line 506. if it's changed or it's not pre~ job, 
we send serialized acg.


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
File 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java:

Line 55:             
ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(preDistributedId);
change the method to return boolean and throw the exception here.


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
File 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java:

Line 136:     private final Map<JobId, JobParameterByteStore> 
jobParameterByteStoreMap = new HashMap<>();
should this be a map from predistributedJobId to JobParameterByteStore too?
If the intention here is to cache the parameter, probably map from JobId to 
parameter won't help as well, since the jobId is changed for every invocation. 
no?


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
File 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java:

Line 54:         ncs.removeJobParameterByteStore(jobId);
if we map predistributedJobId to jobParameter, this remove wont be necessary.


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
File 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java:

Line 44:             ncs.removeActivityClusterGraph(preDistributedId);
So we will remove the parameter store here


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
File 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java:

Line 48:             ncs.checkForDuplicateDistributedJob(preDistributedId);
throw the exception here.


https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
File 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java:

Line 196: 
add one more test. it would be better to have an insert jobspec. distribute it 
once, invoke 100 times (at the same time) and check the result. if that's not 
feasible, try to distribute the union test and issue it 100 times.


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2045
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473
Gerrit-PatchSet: 14
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>
Gerrit-Reviewer: Dmitry Lychagin <[email protected]>
Gerrit-Reviewer: Ildar Absalyamov <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Steven Jacobs <[email protected]>
Gerrit-Reviewer: Till Westmann <[email protected]>
Gerrit-Reviewer: Xikui Wang <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>
Gerrit-HasComments: Yes

Reply via email to