Spring Batch로 CSV파일을 RDB로 이관하는 내용은 많이 나와있지만,
CSV파일의 내용을 Date형식으로 parsing 하는 내용이나, 개행이 포함된 경우,
이관이 완료된 CSV파일을 지우거나 old디렉토리로 이동하는 예시는 잘 없는 것 같아 이 포스팅으로 한 번에 정리하려 합니다.
- build.gradle 설정
- application.yml 설정
- 저장할 Entity, Repository 정의
- BeanWrapperFieldSetMapper 작성
- AfterJobListener 작성
- BatchJobConfig 작성
- Schedule 작성
build.gradle dependencies에 아래 패키지를 추가합니다.
implementation 'org.springframework.boot:spring-boot-starter-batch'
spring.batch.job.enabled를 false로 설정하여 SpringBoot 서버가 실행될 때, 자동으로 Batch가 실행되는 걸 막습니다.
저는 CSV파일을 읽어올 경로를 yml파일에 작성하고 이 부분을 BatchJobConfig 에서 활용했습니다.
spring:
batch:
job:
enabled: false
jdbc:
initialize-schema: always
file:
download:
path:
arms: file:/data/arms/data/
Entity의 모든코드를 작성하지 않았지만, Entity내에 field 이름들을 array로 return할 수 있도록 getFieldNames 메서드를 만들어 줍니다. 저는 첫번째 필드는 id라 array에서 제거했습니다. 그리고 Entity에 ZonedDateTime 속성이 포함되어 있다는 것을 유념해 주세요!
그 아래는 CSV로 읽어온 레코드를 DB에 저장하기위한 Repository 입니다.
static public List<String> getFieldNames(int start){
Field[] fields = Arms.class.getDeclaredFields();
List<String> fieldNames = new ArrayList<>();
for (int i= 0; i < fields.length; i++) {
fieldNames.add(fields[i].getName());
}
for(int u=0; u<start; u++){
fieldNames.remove(0);
}
return fieldNames;
}
//CUD를 담당
public interface ArmsRepository extends CrudRepository<Arms, Long> {
}
CSV파일을 Object로 매핑할 때, 문자열이나 숫자 타입이 아닌, 다른 타입으로 매핑하려면 BeanWrapperFieldSetMapper를 상속받아와 정의해주어야 합니다.
public class BeanWrapperFieldSetMapperCustom<T> extends BeanWrapperFieldSetMapper<T> {
@Override
protected void initBinder(DataBinder binder) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
binder.registerCustomEditor(ZonedDateTime.class, new PropertyEditorSupport() {
@Override
public void setAsText(String text) throws IllegalArgumentException {
if (StringUtils.isNotEmpty(text)) {
ZonedDateTime date = LocalDate.parse(text, formatter).atStartOfDay(ZoneOffset.UTC);
setValue(date);
} else {
setValue(null);
}
}
@Override
public String getAsText() throws IllegalArgumentException {
Object date = getValue();
if (date != null) {
return formatter.format((ZonedDateTime) date);
} else {
return "";
}
}
});
}
}
CSV파일을 한 차례 로드해왔다면, Batch가 완료된 이후에 CSV파일을 이동하거나 지워줘야 합니다. 똑같은 파일을 다시 읽어 처리되지 않게 하기 위함입니다!
JobExecutionListenerSupport를 상속해와 아래와 같이 afterJob 메서드에 정의해 줍니다.
@Slf4j
@Component
public class CsvJobListener{
@Component
public class ArmsJobListener extends JobExecutionListenerSupport {
@Value("${file.download.path.arms}")
private String armsDataPath;
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
moveAllCsvFile(jobExecution, armsDataPath);
}
}
private void moveAllCsvFile(JobExecution jobExecution, String dataPath){
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info(String.format("%s end. Start all csv files move to old directory", jobExecution.getJobId()));
}
Resource srcDir = new PathMatchingResourcePatternResolver().getResource(dataPath);
Resource dstDir = new PathMatchingResourcePatternResolver().getResource(dataPath + "old/");
try {
if(!dstDir.exists()){
directoryConfirmAndMake(dataPath + "old/");
}
File dir = new File(srcDir.getURI().getPath());
String[] filenames = dir.list((f, name) -> name.endsWith(".csv"));
if(filenames != null){
for (int i = 0; i < filenames.length; i++) {
Files.move(
Paths.get(srcDir.getURI().getPath() + filenames[i]),
Paths.get(dstDir.getURI().getPath() + filenames[i])
);
}
}
} catch (IOException e) {
log.error(e.getMessage());
}
log.info("Move csv files complete.");
}
}
실제 Batch 로직에 핵심 부분인데요,
여기서 Batch job과 step 설정합니다.
저는 Jobconfig 파일 내에 Step 로직을 같이 넣어두었는데요,
여기서 유념해야 될 사항은 Step이 되는 로직에 반드시 @StepScope 어노테이션을 명시해주어야 합니다.
그렇지 않으면, Schedule된 batch가 실행될 때마다 Step으로 인식하지 못해 초기 한번만 실행되고,
다음 Schedule시에 실행되지 않는 문제가 있습니다.
그리고 컬럼내용에 개행문자가 포한되어 있는 경우, tokenizing이 제대로 이루어지지 않기 때문에 FlatFileItemReader에서 반드시 DefaultRecordSeparatorPolicy를 설정하도록 합니다.
@Slf4j
@Configuration
@EnableScheduling
@EnableBatchProcessing
@RequiredArgsConstructor
public class ArmsJobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final ArmsWriter armsWriter;
private final CsvJobListener.ArmsJobListener csvJobListener;
private static final int chunkSize = 1000;
@Bean
public Job armsFileItemReaderJob() {
return jobBuilderFactory.get("armsFileItemReaderJob")
.start(armsFileItemReaderStep())
.listener(csvJobListener)
.build();
}
@Bean
public Step armsFileItemReaderStep() {
return stepBuilderFactory.get("armsFileItemReaderStep")
.<Arms, Arms>chunk(chunkSize)
.reader(armsFileItemReader(null))
.writer(armsWriter)
.build();
}
@Bean
@StepScope
public MultiResourceItemReader<Arms> armsFileItemReader(@Value("${file.download.path.arms}") String armsDataPath) {
/* file read */
FlatFileItemReader<Arms> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setLinesToSkip(1); // header line skip
flatFileItemReader.setEncoding("UTF-8"); // encoding
flatFileItemReader.setRecordSeparatorPolicy(new DefaultRecordSeparatorPolicy());
/* read하는 데이터를 내부적으로 LineMapper을 통해 Mapping */
DefaultLineMapper<Arms> defaultLineMapper = new DefaultLineMapper<>();
/* delimitedLineTokenizer : setNames를 통해 각각의 데이터의 이름 설정 */
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setNames( Arms.getFieldNames(1).toArray(String []:: new));
defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
/* beanWrapperFieldSetMapper : Tokenizer에서 가지고온 데이터들을 VO로 바인드하는 역할 */
BeanWrapperFieldSetMapperCustom<Arms> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapperCustom<>();
beanWrapperFieldSetMapper.setTargetType(Arms.class);
defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
/* lineMapper 지정 */
flatFileItemReader.setLineMapper(defaultLineMapper);
MultiResourceItemReader<Arms> resourceItemReader = new MultiResourceItemReader<>();
Resource[] armsResources = null;
try {
armsResources = new PathMatchingResourcePatternResolver().getResources(armsDataPath + "*.csv");
} catch (IOException e) {
log.error(e.getMessage());
}
resourceItemReader.setResources(armsResources);
resourceItemReader.setDelegate(flatFileItemReader);
return resourceItemReader;
}
@Configuration
@StepScope
@RequiredArgsConstructor
class ArmsWriter implements ItemWriter<Arms> {
private final ArmsRepository armsRepository;
@Override
public void write(List<? extends Arms> list) throws Exception {
armsRepository.saveAll(new ArrayList<Arms>(list));
}
}
}
위에서 작성한 BachJobConfig파일을 Schedule에 등록하여 실행합니다.
@Slf4j
@Configuration
@RequiredArgsConstructor
public class VocDumpScheduler {
private final JobLauncher jobLauncher;
private final ArmsJobConfig armsJobConfig;
private final GbisJobConfig gbisJobConfig;
@Scheduled(cron = "0 5 * * * *")
public void runJob(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try{
jobLauncher.run(armsJobConfig.armsFileItemReaderJob(), jobParameters);
jobLauncher.run(gbisJobConfig.gbisFileItemReaderJob(), jobParameters);
} catch (JobInstanceAlreadyCompleteException | JobExecutionAlreadyRunningException | JobParametersInvalidException | JobRestartException e) {
log.error("VOC batch job error: {}", e.getMessage());
}
}
}
이상으로 예외가 많은 SpringBatch 포스팅을 마칩니다.
사실 별거 아닌 내용이지만, 저는 너무 헤매었어서 정리가 필요했습니다.ㅋㅋㅋ;
'SpringBoot' 카테고리의 다른 글
Spring Security 여러개 적용하고, Admin page에서 일관된 password로 로그인하기 (0) | 2022.06.22 |
---|