Xikui Wang has posted comments on this change. Change subject: [ASTERIXDB-1911][HYR,RT,CLUS] Fixes and Improvements for PreDistributed Jobs ......................................................................
Patch Set 14: (30 comments) I added a bound of comments. Hope that's not too much. Also, I'm not very clear about the listener interface change and its uses. Maybe @Till or others should look at it too. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/PredistributedJobService.java File asterixdb/asterix-active/src/main/java/org/apache/asterix/active/PredistributedJobService.java: Line 1: /* This class is not used by anyone. Don't you need to start this in CCApplication? If this is a BAD only service, maybe move it to BAD? (I prefer the first option, since this could be useful for others too) Line 43: IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> jobParameters, EntityId entityId) { What is this duration for? Execution interval? Line 44: ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); Change it to private static int and give it a propername. Line 78: Date checkStartTime = new Date(); see end time Line 82: byte[] jobIdParameter = JOB_ID_PARAMETER_NAME.getBytes(); Instead of getBytes everytime, maybe use JOB_ID_... = "fooname".getbytes() in the definition. Line 86: hcc.waitForCompletion(jobId); Are we certain that all predistributed job calls are synchronized? maybe add this to function signature? Line 87: Date checkEndTime = new Date(); you don't really need to create two date objects here. Use Instant.now() and get the diffs. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java File asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java: Line 743: public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt, > MAJOR SonarQube violation: Used by BAD? Maybe we need a better solution here rather than simply change its signature. @Till Line 1660: String functionName = cfs.getSignature().getName(); change to getFunctionSignature() in function https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java File asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java: Line 25: * an interface for JobEventListenerFactories to add Asterix transaction id API to add Asterix transaction id API? https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java File asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java: Line 51: protected final DataOutput dataOutput = resultStorage.getDataOutput(); reason? https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java File asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java: Line 36: public class GetJobParameterDescriptor extends AbstractScalarFunctionDynamicDescriptor { get_job_parameter by parameter name? https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java File asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java: Line 53: return new JobEventListenerFactory(jobId, transactionalWrite); Do you need to create a new jobId here? It's passing reference to the new jobEventListenerFactory whose content can be changed... Line 58: byte[] jobIdParameter = jobIdParameterName.getBytes(); jobIdParameter or jobParameterName? Line 61: if (jobIdString.length() > 0) { I'm a bit confused here. The method seems to return the parameter value, but here we cast it to string JobId? https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java File asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java: Line 52: return new MultiTransactionJobletEventListenerFactory(jobIds, transactionalWrite); same comment from jobEventListener https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java File asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java: Line 52: fact instanceof IJobEventListenerFactory ? ((IJobEventListenerFactory) fact).getJobId() : jobId; I'm not sure this is the right way to get JobId. @Till needs to look at this. Together with the two interface changes. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java File asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java: Line 62: fact instanceof IJobEventListenerFactory ? ((IJobEventListenerFactory) fact).getJobId() : jobId; same comments for all of them... https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java File hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java: Line 38: IJobletEventListenerFactory getEventListenerFactory(); getJobLetEventListenenrFactory https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java File hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java: Line 64: return "ACID:" + ":" + id; remove this extra colon https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java File hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java: Line 65: public synchronized byte[] getNonpureSingletonValue(String functionName) { What's this method used for? It's not used by others... https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java: Line 367: public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException { Minor suggestion, can we merge this into PreDistriubtedJobStore too. This can be organized better if we have future things to add into predistributed jobs. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java: Line 34: private final Map<Long, PreDistributedJobDescriptor> preDistributedJobDescriptorMap; PredistributedId? https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java: Line 517: byte[] jagBytes = changed ? acgBytes : null; this line can be combined with line 506. if it's changed or it's not pre~ job, we send serialized acg. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java: Line 55: ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(preDistributedId); change the method to return boolean and throw the exception here. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java: Line 136: private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = new HashMap<>(); should this be a map from predistributedJobId to JobParameterByteStore too? If the intention here is to cache the parameter, probably map from JobId to parameter won't help as well, since the jobId is changed for every invocation. no? https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java: Line 54: ncs.removeJobParameterByteStore(jobId); if we map predistributedJobId to jobParameter, this remove wont be necessary. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java: Line 44: ncs.removeActivityClusterGraph(preDistributedId); So we will remove the parameter store here https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java: Line 48: ncs.checkForDuplicateDistributedJob(preDistributedId); throw the exception here. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java File hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java: Line 196: add one more test. it would be better to have an insert jobspec. distribute it once, invoke 100 times (at the same time) and check the result. if that's not feasible, try to distribute the union test and issue it 100 times. -- To view, visit https://asterix-gerrit.ics.uci.edu/2045 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473 Gerrit-PatchSet: 14 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <[email protected]> Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Ildar Absalyamov <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Steven Jacobs <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Xikui Wang <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]> Gerrit-HasComments: Yes
