EC2 서버에서 Kafka, zookeeper 컨테이너 실행하기
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
my-network:
ipv4_address: 172.18.0.100
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.18.0.101
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
networks:
my-network:
ipv4_address: 172.18.0.101
networks:
my-network:
name: seeyouagain-network
Kafka를 도커 컨테이너화 시켜서 기동하기 위해 서비스를 배포한 ec2 서버에서 docker-compose.yml 파일을 생성하고 위 코드를 작성해 줍니다. 위 코드는 Kafka 브로커와 zookeeper를 각각 한대씩 생성하는 코드이고 각 줄의 의미는 아래와 같습니다.
zookeeper와 kafka에 공통적으로 작성된 image, ports, networks는 각각 어떤 이미지에서 컨테이너를 생성할 것인지, 포트포워딩 설정, 전에 서비스를 배포할때 생성했던 docker-network에서 할당받을 ip 주소입니다.
Kafka 설정에서 environment는 위에서 부터 순서대로 Kafka 서버의 ip 주소, 디폴트 TOPIC의 이름, 그리고 위에서 생성한 zookeeper 서버의 포트번호를 의미합니다.
이렇게 docker-compose.yml 파일을 생성한 후 다음과 같은 명령어를 입력해주면
docker-compose -f docker-compose.yml up -d
(사전에 docker-compose가 ec2서버에 설치되어 있어야 함)
아래와 같이 컨테이너도 잘 생성된 것을 확인할 수 있고 network에도 등록된 것을 확인할 수 있습니다.
Kafka 적용하기
build.gradle
// kafka
implementation 'org.springframework.kafka:spring-kafka'
먼저 Consumer와 Producer 코드를 작성할 마이크로 서비스에 위 dependency를 추가해줍니다.
Consumer
KafkaConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.101:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
메세지를 소비할 Consumer의 configure 파일을 생성하고 토픽에 접속하기 위한 정보를 저장하는 ConsumerFactory Bean과 토픽에 어떤 변경사항이 있는지 계속 리스닝하고 있는 ConcurrentKafkaListenerContainerFactory Bean을 생성해줍니다.
- BOOTSTRAP_SERVERS_CONFIG : 사용하고자 하는 Kafka 서버의 주소
- GROUP_ID_CONFIG : KafkaConsumer를 그룹핑화하기 위한 설정
- KEY_DESERIALIZER_CLASS_CONFIG : Producer에서 보낸 Json 데이터의 KEY값을 deserailize하기 위한 설정
- VALUE_DESERIALIZER_CLASS_CONFIG : Producer에서 보낸 Json 데이터의 VALUE값을 deserailize하기 위한 설정
KafkaConsumer
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {
private final ParticipantRepository participantRepository;
@KafkaListener(topics = "example-participant-topic")
@Transactional
public void updateProfileImg(String kafkaMessage) {
log.info("Kafka Message ->" + kafkaMessage);
// 역직렬화
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
Object userIdObj = map.get("userId");
if (userIdObj instanceof Integer) {
Integer userIdInt = (Integer) userIdObj;
Long userId = Long.valueOf(userIdInt);
participantRepository.updateProfileImg(userId, (String)map.get("profileImg"));
}
}
}
@KafkaListener 어노테이션을 사용해서 토픽을 지정해줍니다. Producer에서 해당 토픽에 메세지를 전달하여 변동사항이 생기면 Config 클래스에서 작성했던 ConcurrentKafkaListenerContainerFactory Bean이 변동사항을 감지하고 메세지에서 데이터를 가져옵니다. 그 후 Json 포맷으로 넘어온 데이터를 역직렬화하고 데이터베이스에 변경사항을 반영해줍니다.
Producer
KafkaProducerConfig
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.101:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
ProducerFacotry Bean 설정은 ConsumerFactory와 같고 Kafka에 데이터를 보내기 위한 객체로 KafkaTemplate Bean을 생성해줍니다.
KafkaProducer
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, ProfileImgRequestDto requestDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(requestDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from User microservice: " + requestDto);
}
}
try ~ catch 문의 코드를 사용해서 데이터를 Json 포맷으로 전달하기 위해 변환과정을 거치고 Config 파일에서 생성한 KafkaTemplate의 send() 메서드에 topic과 데이터를 담아 전달해줍니다. 이때 topic은 KafkaConsumer 클래스에서 작성했던 @KafkaListener(topics = "example-participant-topic") 토픽을 넣어줍니다.
여기까지 설정하고 카프카 메세지를 전달해야하는 시점에 아래와 같이 KafkaProducer의 send() 메서드를 호출하면 데이터가 잘 업데이트된 것을 확인할 수 있습니다.
@Service
@RequiredArgsConstructor
public class AuthServiceImpl implements AuthService {
private final UserRepository userRepository;
private final AmazonS3Service amazonS3Service;
private final KafkaProducer kafkaProducer;
@Override
@Transactional
public ProfileResponseDto updateProfile(Long userId, ProfileUpdateRequestDto requestDto, MultipartFile profileImg) {
User user = getUser(userId);
String location = requestDto.getLocation();
String description = requestDto.getDescription();
if (profileImg == null) {
user.updateProfile(location, description);
} else {
deleteS3Img(user);
String profileImgKey = saveS3Img(profileImg);
String profileImgUrl = amazonS3Service.getFileUrl(profileImgKey);
user.updateProfile(profileImgKey, profileImgUrl, location, description);
kafkaProducer.send("example-participant-topic", new ProfileImgRequestDto(userId, profileImgUrl));
}
return ProfileResponseDto.from(user);
}
private String saveS3Img(MultipartFile profileImg) {
try {
return amazonS3Service.upload(profileImg, "UserProfile");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void deleteS3Img(User user) {
if (user.getProfileImgKey() != null && !user.getProfileImgKey().isBlank())
amazonS3Service.delete(user.getProfileImgKey());
}
private User getUser(Long userId) {
return userRepository.findById(userId)
.orElseThrow(() -> new ApiException(ExceptionEnum.MEMBER_NOT_EXIST_EXCEPTION));
}
}
'Spring' 카테고리의 다른 글
[Spring] 트랜잭션 전파(Transaction Propagation) (0) | 2023.06.24 |
---|---|
[Spring] @Transactional 호출 시 주의사항 (0) | 2023.06.23 |
[Spring] CORS 이란? CORS 에러 해결방법 (1) | 2023.04.13 |
[Spring] 비동기 통신과 데이터 동기화를 위한 카프카 활용 (1) (0) | 2023.04.11 |
[Spring] MSA 프로젝트 만들기 (3) (0) | 2023.03.24 |