๐ JobLauncher
์ง๋ ์๊ฐ์๋ JobRepository์ ๋ํด ์ดํด๋ณด์์ต๋๋ค.
JobRepository๋ Spring Batch ์์ ์ ์งํํ๋ฉด์ JobInstance, JobExecution, StepExeuction ๋ฑ Batch์ ๊ด๋ จ๋ ๋๋ฉ์ธ์ CRUD ์ฒ๋ฆฌ๋ฅผ ํ๋ ์ญํ ์ ํฉ๋๋ค.
์ด๋ฒ ์๊ฐ์๋ Spring Batch Job์ ์คํ ์ฃผ์ฒด์ธ JobLauncher์ ๋ํด ์ดํด๋ณด๊ฒ ์ต๋๋ค.
๐ JobLauncher
๊ธฐ๋ณธ ๊ฐ๋
- Spring Batch Job์ ์คํ์ํค๋ ์ญํ
- Job & JobParameters๋ฅผ ์ธ์๋ก ๋ฐ์ผ๋ฉฐ ์์ฒญ๋ ๋ฐฐ์น ์์ ์ ์ํํ ํ JobExecution์ ๋ฐํ
- Spring Batch์์๋ BatchAutoConfiguration ํด๋์ค ๋ด์ ์กด์ฌํ๋ JobLauncherApplicationRunner ํด๋์ค๊ฐ ์๋์ผ๋ก JobLauncher์ ์คํํฉ๋๋ค.
Spring Batch Job์ SyncTaskExecutor(๋๊ธฐ), SipmleAsyncTaskExecutor(๋น๋๊ธฐ), ์ปค์คํ ๊ตฌํ์ฒด ๋ฑ์ ์ค์ ์ ํตํด ๋๊ธฐ ๋ฐ ๋น๋๊ธฐ ์คํ์ด ๊ฐ๋ฅํฉ๋๋ค.
๐ JobLauncher ๋์ ๊ณผ์
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | @RequiredArgsConstructor @Configuration public class JobLauncherConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job jobLauncherJob() { return this.jobBuilderFactory.get("jobLauncherJob") .start(jobLauncherStep1()) .next(jobLauncherStep2()) .incrementer(new RunIdIncrementer()) .build(); } @Bean public Step jobLauncherStep1() { return stepBuilderFactory.get("jobLauncherStep1") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { Thread.sleep(3000); return RepeatStatus.FINISHED; } }) .build(); } @Bean public Step jobLauncherStep2() { return stepBuilderFactory.get("jobLauncherStep2") .tasklet((contribution, chunkContext) -> null) .build(); } } | cs |
์์ ๊ฐ๋จํ Job, Step ์ฝ๋๋ฅผ ํตํด JobLauncher์ ๋ํด ์ดํด๋ณด๊ฒ ์ต๋๋ค.
Job์ ์คํ์ํค๋ JobLauncherApplicationRunner ๊ฐ์ฒด๋ BatchAutoConfiguration ํด๋์ค์์ ์์ฑ์ด ๋ฉ๋๋ค.
์ ์ฌ์ง์ JobLauncherApplicationRunner ํด๋์ค์์ Job์ด ์คํ๋๋ ๊ณผ์ ์ ๋๋ค.
run() ๋ฉ์๋์์ args ์ธ์๋ ํ๋ผ๋ฏธํฐ๋ก ๋ฐ์ ๋ณ์๊ฐ ์กด์ฌํ๋ฉด ํด๋น ๊ฐ์ ์ค์ ํ๋ ์ญํ ์ ํฉ๋๋ค.
launchJobFromProperties() ๋ฉ์๋๊ฐ ํธ์ถํ๋ executeLocalJobs() ๋ฉ์๋์ ๋ก์ง์ ๋๋ค.
์ ์ฝ๋์์ for ๋ฌธ ๋ด๋ถ์ ์กด์ฌํ๋ this.jobs๋ ์ ํ๋ฆฌ์ผ์ด์ ์ฝ๋ ์ ์ฒด์ ์กด์ฌํ๋ Job์ ๋ํ๋ด๊ณ ์์ต๋๋ค.
์ ๋ ์์ ๊ฐ์ด ์ด์ ์์ ์ฝ๋๋ ์ ๋ถ ํฌํจ๋์ด ์๊ธฐ์ this.jobs์ size๊ฐ 8์ด๊ณ , ํด๋น job ์ค์์ ํ์ฌ ์คํํ job์ name๊ณผ ๋์ผํ์ง ๋น๊ตํ์ฌ ๋์ผํ ๊ฒฝ์ฐ execute(job, jobParameters) ๋ฉ์๋๋ฅผ ํธ์ถํฉ๋๋ค.
execute() ๋ฉ์๋ ๋ด์์๋ JobParameters ๊ฐ์ฒด๋ฅผ ์์ฑํ๊ณ , jobLauncher.run() ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ Job์ ์คํ์ํต๋๋ค.
์ run() ๋ฉ์๋๋ฅผ ํธ์ถํ ๋ JobLauncher ์ธํฐํ์ด์ค์ ๊ตฌํ์ฒด์ธ SimpleJobLauncher ํด๋์ค์์ run() ๋ฉ์๋๋ฅผ ์ค๋ฒ๋ผ์ด๋ ํ๊ณ ์์ต๋๋ค.
์ ์ฝ๋์์ JobRepository์์ jobName + jobKey ์กฐํฉ์ผ๋ก JobInstance, JobExecution์ ์กฐํํ๊ธฐ ๋๋ฌธ์ lastExecution์ null๋ก ๋ฆฌํดํ๊ฒ ๋ฉ๋๋ค.
SimpleJobLauncher ํด๋์ค์ run() ์ ์ฒด ๋ฉ์๋๋ฅผ ์ดํด๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | /** * Run the provided job with the given {@link JobParameters}. The * {@link JobParameters} will be used to determine if this is an execution * of an existing job instance, or if a new one should be created. * * @param job the job to be run. * @param jobParameters the {@link JobParameters} for this particular * execution. * @return the {@link JobExecution} if it returns synchronously. If the * implementation is asynchronous, the status might well be unknown. * @throws JobExecutionAlreadyRunningException if the JobInstance already * exists and has an execution already running. * @throws JobRestartException if the execution would be a re-start, but a * re-start is either not allowed or not needed. * @throws JobInstanceAlreadyCompleteException if this instance has already * completed successfully * @throws JobParametersInvalidException thrown if jobParameters is invalid. */ @Override public JobExecution run(final Job job, final JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { Assert.notNull(job, "The Job must not be null."); Assert.notNull(jobParameters, "The JobParameters must not be null."); final JobExecution jobExecution; JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters); if (lastExecution != null) { if (!job.isRestartable()) { throw new JobRestartException("JobInstance already exists and is not restartable"); } /* * validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING * retrieve the previous execution and check */ for (StepExecution execution : lastExecution.getStepExecutions()) { BatchStatus status = execution.getStatus(); if (status.isRunning() || status == BatchStatus.STOPPING) { throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " + lastExecution); } else if (status == BatchStatus.UNKNOWN) { throw new JobRestartException( "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. " + "The last execution ended with a failure that could not be rolled back, " + "so it may be dangerous to proceed. Manual intervention is probably necessary."); } } } // Check the validity of the parameters before doing creating anything // in the repository... job.getJobParametersValidator().validate(jobParameters); /* * There is a very small probability that a non-restartable job can be * restarted, but only if another process or thread manages to launch * <i>and</i> fail a job execution for this instance between the last * assertion and the next method returning successfully. */ jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters); try { taskExecutor.execute(new Runnable() { @Override public void run() { try { if (logger.isInfoEnabled()) { logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters + "]"); } job.execute(jobExecution); if (logger.isInfoEnabled()) { Duration jobExecutionDuration = BatchMetrics.calculateDuration(jobExecution.getStartTime(), jobExecution.getEndTime()); logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters + "] and the following status: [" + jobExecution.getStatus() + "]" + (jobExecutionDuration == null ? "" : " in " + BatchMetrics.formatDuration(jobExecutionDuration))); } } catch (Throwable t) { if (logger.isInfoEnabled()) { logger.info("Job: [" + job + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters + "]", t); } rethrow(t); } } private void rethrow(Throwable t) { if (t instanceof RuntimeException) { throw (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } throw new IllegalStateException(t); } }); } catch (TaskRejectedException e) { jobExecution.upgradeStatus(BatchStatus.FAILED); if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) { jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e)); } jobRepository.update(jobExecution); } return jobExecution; } | cs |
์ค์ Job์ ์คํํ๊ธฐ ์ํด ์ฌ์ฉ๋๋ ์ธํฐํ์ด์ค๋ TaskExecutor ์ด๊ณ , ์ต๋ช ํด๋์ค๋ก ๊ตฌํํ์ฌ ๋๊ธฐ ์คํ์ ํ๊ณ ์์ต๋๋ค.
๋ง์ฝ Job์ ๋น๋๊ธฐ๋ก ์คํํ๋ ค๋ฉด setTaskExecutor() ๋ฉ์๋๋ฅผ ํตํด ์ง์ ์ค์ ํด์ฃผ๋ฉด ๋น๋๊ธฐ ์คํ์ด ๊ฐ๋ฅํฉ๋๋ค.
์ฐธ๊ณ ๋ฌธ์
'Spring > Spring Batch' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Spring Batch (10) @JobScope, @StepScope (0) | 2022.12.08 |
---|---|
Spring Batch (9) JobParametersValidator (0) | 2022.11.13 |
Spring Batch (7) JobRepository (0) | 2022.11.08 |
Spring Batch (6) ExecutionContext (0) | 2022.11.05 |
Spring Batch (5) Step, StepExecution (0) | 2022.11.03 |
๋๊ธ