guoyuepeng commented on PR #667:
URL: https://github.com/apache/griffin/pull/667#issuecomment-2410586185
@toyboxman
the following is the poc, based on which i am designing the local workflow.
`package demo.workflows;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.transaction.PlatformTransactionManager;
@Component
public class DQCompareFlow {
@Autowired
public JobRepository jobRepository;
@Autowired
public PlatformTransactionManager transactionManager;
Logger logger = LoggerFactory.getLogger(DQCompareFlow.class);
@Bean
public Job dqcFlow() {
return new JobBuilder("DQC_JOB", this.jobRepository)
.start(new StepBuilder("setup",
this.jobRepository).tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution
contribution, ChunkContext chunkContext) throws Exception {
logger.info("PrepareJob was run");
return RepeatStatus.FINISHED;
}
}, transactionManager).build())
.split(new SimpleAsyncTaskExecutor()).add(
new FlowBuilder<Flow>("fetchSourceCount")
.start(new StepBuilder("fetchSourceStep",
this.jobRepository).tasklet(new Tasklet() {
@Override
public RepeatStatus
execute(StepContribution contribution, ChunkContext chunkContext) throws
Exception {
logger.info("FetchSourceStep
started");
Thread.sleep(90000); // Sleep for 90
seconds
logger.info("FetchSourceStep was
run");
return RepeatStatus.FINISHED;
}
}, transactionManager).build())
.build(),
new FlowBuilder<Flow>("fetchTargetCount")
.start(new StepBuilder("fetchTargetStep",
this.jobRepository).tasklet(new Tasklet() {
@Override
public RepeatStatus
execute(StepContribution contribution, ChunkContext chunkContext) throws
Exception {
logger.info("FetchTargetStep was
run");
return RepeatStatus.FINISHED;
}
}, transactionManager).build())
.build()
)
.next(new StepBuilder("compareResult",
this.jobRepository).tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution
contribution, ChunkContext chunkContext) throws Exception {
logger.info("CompareStep was run");
return RepeatStatus.FINISHED;
}
}, transactionManager).build())
.end() // Add this line to end the job definition
.build(); // Add this line to build the Job object
}
}
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]