본문 바로가기

Spring

[Spring] 비동기 통신과 데이터 동기화를 위한 카프카 활용 (2)

EC2 서버에서 Kafka, zookeeper 컨테이너 실행하기

version: '2'
services:
    zookeeper:
        image: wurstmeister/zookeeper
        ports:
            - "2181:2181"
        networks:
            my-network:
                ipv4_address: 172.18.0.100
    kafka:
        image: wurstmeister/kafka
        ports:
            - "9092:9092"
        environment:
            KAFKA_ADVERTISED_HOST_NAME: 172.18.0.101
            KAFKA_CREATE_TOPICS: "test:1:1"
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock
        depends_on:
            - zookeeper
        networks:
            my-network:
                ipv4_address: 172.18.0.101

networks:
    my-network:
        name: seeyouagain-network

 

Kafka를 도커 컨테이너화 시켜서 기동하기 위해 서비스를 배포한 ec2 서버에서 docker-compose.yml 파일을 생성하고 위 코드를 작성해 줍니다. 위 코드는 Kafka 브로커와 zookeeper를 각각 한대씩 생성하는 코드이고 각 줄의 의미는 아래와 같습니다.

 

zookeeper와 kafka에 공통적으로 작성된 image, ports, networks는 각각 어떤 이미지에서 컨테이너를 생성할 것인지, 포트포워딩 설정, 전에 서비스를 배포할때 생성했던 docker-network에서 할당받을 ip 주소입니다.

 

Kafka 설정에서 environment는 위에서 부터 순서대로 Kafka 서버의 ip 주소, 디폴트 TOPIC의 이름, 그리고 위에서 생성한 zookeeper 서버의 포트번호를 의미합니다.

 

이렇게 docker-compose.yml 파일을 생성한 후 다음과 같은 명령어를 입력해주면

 

docker-compose -f docker-compose.yml up -d

(사전에 docker-compose가 ec2서버에 설치되어 있어야 함)

 

아래와 같이 컨테이너도 잘 생성된 것을 확인할 수 있고 network에도 등록된 것을 확인할 수 있습니다.

 

 

Kafka 적용하기

build.gradle

// kafka
implementation 'org.springframework.kafka:spring-kafka'

 

먼저 Consumer와 Producer 코드를 작성할 마이크로 서비스에 위 dependency를 추가해줍니다. 

Consumer

KafkaConsumerConfig

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.101:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }
}

 

메세지를 소비할 Consumer의 configure 파일을 생성하고 토픽에 접속하기 위한 정보를 저장하는 ConsumerFactory Bean과 토픽에 어떤 변경사항이 있는지 계속 리스닝하고 있는 ConcurrentKafkaListenerContainerFactory Bean을 생성해줍니다.

 

  • BOOTSTRAP_SERVERS_CONFIG : 사용하고자 하는 Kafka 서버의 주소
  • GROUP_ID_CONFIG : KafkaConsumer를 그룹핑화하기 위한 설정
  • KEY_DESERIALIZER_CLASS_CONFIG : Producer에서 보낸 Json 데이터의 KEY값을 deserailize하기 위한 설정
  • VALUE_DESERIALIZER_CLASS_CONFIG : Producer에서 보낸 Json 데이터의 VALUE값을 deserailize하기 위한 설정 

KafkaConsumer

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {

    private final ParticipantRepository participantRepository;

    @KafkaListener(topics = "example-participant-topic")
    @Transactional
    public void updateProfileImg(String kafkaMessage) {
        log.info("Kafka Message ->" + kafkaMessage);

        // 역직렬화
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        Object userIdObj = map.get("userId");
        if (userIdObj instanceof Integer) {
            Integer userIdInt = (Integer) userIdObj;
            Long userId = Long.valueOf(userIdInt);
            participantRepository.updateProfileImg(userId, (String)map.get("profileImg"));
        }

    }
}

 

@KafkaListener 어노테이션을 사용해서 토픽을 지정해줍니다. Producer에서 해당 토픽에 메세지를 전달하여 변동사항이 생기면 Config 클래스에서 작성했던 ConcurrentKafkaListenerContainerFactory Bean이 변동사항을 감지하고 메세지에서 데이터를 가져옵니다. 그 후 Json 포맷으로 넘어온 데이터를 역직렬화하고 데이터베이스에 변경사항을 반영해줍니다. 

Producer

KafkaProducerConfig

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.101:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

ProducerFacotry Bean 설정은 ConsumerFactory와 같고 Kafka에 데이터를 보내기 위한 객체로 KafkaTemplate Bean을 생성해줍니다.

 

KafkaProducer

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, ProfileImgRequestDto requestDto) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(requestDto);
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Producer sent data from User microservice: " + requestDto);
    }
}

 

try ~ catch 문의 코드를 사용해서 데이터를 Json 포맷으로 전달하기 위해 변환과정을 거치고 Config 파일에서 생성한 KafkaTemplate의 send() 메서드에 topic과 데이터를 담아 전달해줍니다. 이때 topic은 KafkaConsumer 클래스에서 작성했던 @KafkaListener(topics = "example-participant-topic") 토픽을 넣어줍니다.


여기까지 설정하고 카프카 메세지를 전달해야하는 시점에 아래와 같이 KafkaProducer의 send() 메서드를 호출하면 데이터가 잘 업데이트된 것을 확인할 수 있습니다.

@Service
@RequiredArgsConstructor
public class AuthServiceImpl implements AuthService {

    private final UserRepository userRepository;

    private final AmazonS3Service amazonS3Service;

    private final KafkaProducer kafkaProducer;

    @Override
    @Transactional
    public ProfileResponseDto updateProfile(Long userId, ProfileUpdateRequestDto requestDto, MultipartFile profileImg) {
        User user = getUser(userId);

        String location = requestDto.getLocation();
        String description = requestDto.getDescription();

        if (profileImg == null) {
            user.updateProfile(location, description);
        } else {
            deleteS3Img(user);
            String profileImgKey = saveS3Img(profileImg);
            String profileImgUrl = amazonS3Service.getFileUrl(profileImgKey);
            user.updateProfile(profileImgKey, profileImgUrl, location, description);
            kafkaProducer.send("example-participant-topic", new ProfileImgRequestDto(userId, profileImgUrl));
        }

        return ProfileResponseDto.from(user);
    }

    private String saveS3Img(MultipartFile profileImg) {
        try {
            return amazonS3Service.upload(profileImg, "UserProfile");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void deleteS3Img(User user) {
        if (user.getProfileImgKey() != null && !user.getProfileImgKey().isBlank())
            amazonS3Service.delete(user.getProfileImgKey());
    }

    private User getUser(Long userId) {
        return userRepository.findById(userId)
                .orElseThrow(() -> new ApiException(ExceptionEnum.MEMBER_NOT_EXIST_EXCEPTION));
    }

}