morningman commented on a change in pull request #368: Add a frontend interface 
for committing RoutineLoadTask
URL: https://github.com/apache/incubator-doris/pull/368#discussion_r238513635
 
 

 ##########
 File path: 
fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 ##########
 @@ -241,4 +303,99 @@ private void checkStateTransform(RoutineLoadJob.JobState 
currentState, RoutineLo
         }
     }
 
+    private void loadTxnCommit(TLoadTxnCommitRequest request) throws 
TException {
+        FrontendServiceImpl frontendService = new 
FrontendServiceImpl(ExecuteEnv.getInstance());
+        frontendService.loadTxnCommit(request);
+    }
+
+    private void updateNumOfData(int numOfErrorData, int numOfTotalData) {
+        currentErrorNum += numOfErrorData;
+        currentTotalNum += numOfTotalData;
+        if (currentTotalNum > 10000) {
+            if (currentErrorNum > maxErrorNum) {
+                LOG.info("current error num {} of job {} is more then max 
error num {}. begin to pause job",
+                        currentErrorNum, id, maxErrorNum);
+                // remove all of task in jobs and change job state to paused
+                // TODO(ml): edit log
+                state = JobState.PAUSED;
+                routineLoadTaskInfoList.clear();
+                needSchedulerTaskInfoList.clear();
+
+            }
+
+            // reset currentTotalNum and currentErrorNum
+            currentErrorNum = 0;
+            currentTotalNum = 0;
+        } else if (currentErrorNum > maxErrorNum) {
+            LOG.info("current error num {} of job {} is more then max error 
num {}. begin to pause job",
+                    currentErrorNum, id, maxErrorNum);
+            // remove all of task in jobs and change job state to paused
+            // TODO(ml): edit log
+            state = JobState.PAUSED;
+            routineLoadTaskInfoList.clear();
+            needSchedulerTaskInfoList.clear();
+            // reset currentTotalNum and currentErrorNum
+            currentErrorNum = 0;
+            currentTotalNum = 0;
+        }
+    }
+
+    abstract RoutineLoadTaskInfo reNewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo) throws AnalysisException,
+            LabelAlreadyExistsException, BeginTransactionException;
+
+    @Override
+    public boolean checkTxnHasRelatedJob(TransactionState txnState) {
 
 Review comment:
   should this method name be called 'checkTxnHasRelatedTask()'?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to