이번 글에서는 JpaPagingItemReader
로 엔티티를 조회한 뒤, 값을 변경할 경우 발생하는 문제점을 살펴보고, 해결 방법에 대해서 알아보겠습니다.
이 글에서 사용된 예제는 Github에서 확인할 수 있습니다.
JpaPagingItemReader
JpaPagingItemReader
는 다음과 같이 동작합니다.
EntityManagerFactory
를 사용해서 별도의EntityManager
인스턴스를 생성해서 사용한다.
protected void doOpen() throws Exception {
// ...
entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
// ...
}
- 페이지를 읽을때 트랜잭션을 시작하고,
EntityManager
의flush()
메소드를 호출한다.
protected void doReadPage() {
EntityTransaction tx = null;
if (transacted) {
tx = entityManager.getTransaction();
tx.begin();
entityManager.flush();
entityManager.clear();
}
// ...
}
- 페이지를 조회한 뒤 트랜잭션을 커밋한다.
이로 인해서 다음과 같은 특성을 갖습니다.
JpaPagingItemReader
를 사용해서 조회한 엔티티와,JpaRepository
를 사용해서 조회한 엔티티는 동일하지 않다.- 마지막 페이지에 속한 엔티티의 변경사항은 데이터베이스에 반영되지 않는다.
- Chunk 단위의 트랜잭션과 다르게 동작한다.
문제점
실제로 발생할 수 있는 상황을 예로 들어보겠습니다. 다음과 같이 JpaPagingItemReader
를 정의합니다.
@Bean
public ItemReader<Product> jpaPagingItemReader(EntityManagerFactory emf) {
return new JpaPagingItemReaderBuilder<Product>()
.queryString("SELECT p FROM Product p")
.pageSize(PAGE_SIZE)
.entityManagerFactory(emf)
.name("product-reader")
.build();
}
스텝을 다음과 같이 구성합니다.
- 데이터베이스에 상태값이
NEW
인 상품이 7개 존재한다. - 스텝의
chunkSize
는 5로 지정한다. - ItemReader :
JpaPagingItemReader
를 사용해서 5개씩 상품을 읽는다. - ItemProcessor : 상품의 상태값을
PROCESS
로 변경한다. - ItemWriter : 상품의 상태값을
DONE
으로 변경한다.
Chunk 단위의 트랜잭션이 커밋되는 시점에, Dirty Checking을 통해서 상품의 변경사항이 DB에 반영되기를 의도했습니다.
스텝을 실행하면 다음과 같은 순서로 실행됩니다.
Executing step: [jpa-paging-item-reader-step]
// 첫번째 Chunk
↓ Chunk start
// 첫번째 페이지 조회
Process : Product(id=1, state=NEW) > PROCESS
Process : Product(id=2, state=NEW) > PROCESS
Process : Product(id=3, state=NEW) > PROCESS
Process : Product(id=4, state=NEW) > PROCESS
Process : Product(id=5, state=NEW) > PROCESS
Write : Product(id=1, state=PROCESS) > DONE
Write : Product(id=2, state=PROCESS) > DONE
Write : Product(id=3, state=PROCESS) > DONE
Write : Product(id=4, state=PROCESS) > DONE
Write : Product(id=5, state=PROCESS) > DONE
↑ Chunk end
// 두번째 Chunk
↓ Chunk start
// 두번째 페이지를 읽기 전에 flush 실행되면서 변경사항이 DB에 반영됨
Updated : Product(id=1, state=DONE)
Updated : Product(id=2, state=DONE)
Updated : Product(id=3, state=DONE)
Updated : Product(id=4, state=DONE)
Updated : Product(id=5, state=DONE)
Process : Product(id=6, state=NEW) > PROCESS
Process : Product(id=7, state=NEW) > PROCESS
Write : Product(id=6, state=PROCESS) > DONE
Write : Product(id=7, state=PROCESS) > DONE
↑ Chunk end
// 두번째 Chunk 종료 이후에 flush 실행되지 않음
// 6번, 7번 상품의 상태 변경이 누락됨
// 상품 상태 출력
Step Finished
Product(id=1, state=DONE)
Product(id=2, state=DONE)
Product(id=3, state=DONE)
Product(id=4, state=DONE)
Product(id=5, state=DONE)
Product(id=6, state=NEW)
Product(id=7, state=NEW)
Step: [jpa-paging-item-reader-step] executed in 11ms
상품의 최종 상태를 보면, 6번과 7번 상품의 상태를 DONE
으로 변경했지만, DB에 반영되지 않은 것을 확인할 수 있습니다.
해결 방법
스프링 배치에서 제공하는 RepositoryItemReader
를 사용합니다.
@Bean
public ItemReader<Product> repositoryItemReader() {
return new RepositoryItemReaderBuilder<Product>()
.repository(productRepository)
.methodName("findAll")
.pageSize(PAGE_SIZE)
.saveState(false)
.sorts(Collections.singletonMap("id", Sort.Direction.ASC))
.name("repository-item-reader")
.build();
}
ProductRepository
는 JpaRepository
를 상속받아서 정의합니다.
public interface ProductRepository extends JpaRepository<Product, Integer> {
}
JpaRepository
는 PagingAndSortingRepository
를 상속받기 때문에 RepositoryItemReader
에서 사용할 수 있습니다.
public interface JpaRepository<T, ID> extends PagingAndSortingRepository<T, ID>, QueryByExampleExecutor<T> {
// ...
}
RepositoryItemReader
를 사용해서 동일한 스텝을 실행하면 다음과 같은 순서로 실행이 됩니다.
Executing step: [repository-item-reader-step]
// 첫번째 Chunk
↓ Chunk start
Process : Product(id=1, state=NEW) > PROCESS
Process : Product(id=2, state=NEW) > PROCESS
Process : Product(id=3, state=NEW) > PROCESS
Process : Product(id=4, state=NEW) > PROCESS
Process : Product(id=5, state=NEW) > PROCESS
Write : Product(id=1, state=PROCESS) > DONE
Write : Product(id=2, state=PROCESS) > DONE
Write : Product(id=3, state=PROCESS) > DONE
Write : Product(id=4, state=PROCESS) > DONE
Write : Product(id=5, state=PROCESS) > DONE
Updated : Product(id=1, state=DONE)
Updated : Product(id=2, state=DONE)
Updated : Product(id=3, state=DONE)
Updated : Product(id=4, state=DONE)
Updated : Product(id=5, state=DONE)
↑ Chunk end
// 두번째 Chunk
↓ Chunk start
Process : Product(id=6, state=NEW) > PROCESS
Process : Product(id=7, state=NEW) > PROCESS
Write : Product(id=6, state=PROCESS) > DONE
Write : Product(id=7, state=PROCESS) > DONE
// 6번, 7번 상품의 상태도 DB에 반영됨
Updated : Product(id=6, state=DONE)
Updated : Product(id=7, state=DONE)
↑ Chunk end
// 상품 상태 출력
Step Finished
Product(id=1, state=DONE)
Product(id=2, state=DONE)
Product(id=3, state=DONE)
Product(id=4, state=DONE)
Product(id=5, state=DONE)
Product(id=6, state=DONE)
Product(id=7, state=DONE)
Step: [repository-item-reader-step] executed in 13ms
전체 상품의 상태값이 DONE
으로 변경된 것을 확인할 수 있습니다.
결론
JpaPagingItemReader
는 엔티티를 조회할 경우만 사용합니다. 조회한 엔티티의 값을 변경할 필요가 있을 경우 RepositoryItemReader
를 사용합니다.
주의!
RepositoryItemReader
를 사용할 때, 엔티티의 조회의 조건문에 사용되는 필드의 값이 변경되는 경우, 전체 데이터가 처리되지 않는 문제가 발생하므로 주의해서 사용해야합니다.
참고
- JpaPagingItemReader.java 파일의 주석