Spring 프레임워크에서 제공하는 @Transactional 어노테이션을 사용하면 트랜잭션 AOP가 적용된다. 이 트랜잭션 AOP는 프록시 방식의 AOP를 사용하며 프록시 객체가 먼저 클라이언트의 요청을 받고 트랜잭션 처리를 한 후 실제 객체를 호출해준다. 만약 프록시 객체를 거치지 않고 실제 객체를 직접적으로 호출하게 되면 트랜잭션 AOP가 적용되지 않고 트랜잭션도 시작하지 않는다.

 

일반적으로 @Transactional 어노테이션을 선언하면 프록시 객체가 빈으로 등록되고 의존성 주입할 때도 프록시 객체를 주입하기 때문에 위와 같은 상황이 발생하지 않지만 실제 객체 내부에서 메서드 호출이 일어나면 프록시 객체를 거치지 않고 대상 객체를 직접 호출하기 때문에 트랜잭션이 적용되지 않는 경우가 발생한다.

예시 코드

@Slf4j
@SpringBootTest
public class InternalCallV1Test {

    @Autowired
    CallService callService;

    @Test
    void printProxy() {
        log.info("callService class = {}", callService.getClass());
    }

    @Test
    void internalCall() {
        callService.internal();
    }

    @Test
    void externalCall() {
        callService.external();
    }

    @TestConfiguration
    static class InternalCallV1TestConfig {

        @Bean
        CallService callService() {
            return new CallService();
        }
    }

    static class CallService {

        public void external() {
            log.info("call external");
            printTxInfo();
            internal();
        }

        @Transactional
        public void internal() {
            log.info("call internal");
            printTxInfo();
        }

        private void printTxInfo() {
            boolean txActive = TransactionSynchronizationManager.isSynchronizationActive();
            log.info("tx active = {}", txActive);
        }
    }
}

 

internalCall() 메서드를 실행하여 직접 @Transactional 어노테이션이 선언된 internal() 메서드를 호출해보자.

아래와 같이 정상적으로 트랜잭션이 시작하고 printTxInfo() 메서드를 호출하여 트랜잭션이 활성 상태인 것을 확인한 후 마지막으로 트랜잭션이 종료된 것 까지 확인할 수 있다.

 

 

이번에는 externalCall() 메서드를 실행하여 external() 메서드를 호출해보자.

external() 메서드는 @Transactional 어노테이션이 선언되어 있지 않기 때문에 트랜잭션이 비활성 상태인 것을 확인한 후 internal() 메서드를 호출한다. internal() 메서드에는 트랜잭션이 선언되어 있기 때문에 internalCall()을 통해 호출했을 때처럼 활성 상태로 출력될 줄 알았지만 결과는 아래와 같이 트랜잭션도 시작하지 않고 로그도 false로 출력됐다.

 

 

왜 이런 현상이 발생하는 것일까?

결론부터 말하면 실제 객체 내부에서 메서드를 호출하면 프록시 객체가 아닌 현재 인스턴스에서 internal() 메서드를 직접 호출하기 때문에 트랜잭션이 적용되지 않는다.

 

 

자바에서 메서드 앞에 별도의 참조가 없으면 this가 생략되어 있다. 예) internal() == this.internal()

현재 external() 메서드는 @Transactional 어노테이션이 선언되어 있지 않기 때문에 프록시 객체가 아닌 실제 객체를 통해 호출한다. 이때 internal() 메서드를 호출하면 this가 생략되어 있으므로 현재 인스턴스 즉, 실제 객체에서 internal() 메서드를 호출하고 결과적으로 프록시 객체를 거치지 않기 때문에 트랜잭션이 적용되지 않는다.

 

이를 해결하기 위한 방법으로 internal() 메서드를 별도의 클래스로 분리하는 방법이 있다.

수정 코드

@Slf4j
@SpringBootTest
public class InternalCallV2Test {

    @Autowired
    CallService callService;

    @Test
    void printProxy() {
        log.info("callService class = {}", callService.getClass());
    }

    @Test
    void externalCall() {
        callService.external();
    }

    @TestConfiguration
    static class InternalCallV1TestConfig {

        @Bean
        CallService callService() {
            return new CallService(internalService());
        }

        @Bean
        InternalService internalService() {
            return new InternalService();
        }
    }

    @RequiredArgsConstructor
    static class CallService {

        private final InternalService internalService;

        public void external() {
            log.info("call external");
            printTxInfo();
            internalService.internal();
        }

        private void printTxInfo() {
            boolean txActive = TransactionSynchronizationManager.isSynchronizationActive();
            log.info("tx active = {}", txActive);
        }
    }

    static class InternalService {

        @Transactional
        public void internal() {
            log.info("call internal");
            printTxInfo();
        }

        private void printTxInfo() {
            boolean txActive = TransactionSynchronizationManager.isSynchronizationActive();
            log.info("tx active = {}", txActive);
        }
    }
}

 

internal() 메서드를 별도의 클래스로 분리하고 스프링 빈으로 등록하여 external() 메서드가 있는 클래스에 의존성 주입 해주었다. 이때 internal() 메서드에 @Transactional 어노테이션이 선언되어 있으므로 프록시 객체가 주입된다. 이러면 프록시 객체를 통해 internal() 메서드를 호출하므로 externalCall() 메서드를 실행했을 때 아래와 같이 정상적으로 internal() 메서드에 트랜잭션이 적용된 것을 확인할 수 있다.

 

EC2 서버에서 Kafka, zookeeper 컨테이너 실행하기

version: '2'
services:
    zookeeper:
        image: wurstmeister/zookeeper
        ports:
            - "2181:2181"
        networks:
            my-network:
                ipv4_address: 172.18.0.100
    kafka:
        image: wurstmeister/kafka
        ports:
            - "9092:9092"
        environment:
            KAFKA_ADVERTISED_HOST_NAME: 172.18.0.101
            KAFKA_CREATE_TOPICS: "test:1:1"
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock
        depends_on:
            - zookeeper
        networks:
            my-network:
                ipv4_address: 172.18.0.101

networks:
    my-network:
        name: seeyouagain-network

 

Kafka를 도커 컨테이너화 시켜서 기동하기 위해 서비스를 배포한 ec2 서버에서 docker-compose.yml 파일을 생성하고 위 코드를 작성해 줍니다. 위 코드는 Kafka 브로커와 zookeeper를 각각 한대씩 생성하는 코드이고 각 줄의 의미는 아래와 같습니다.

 

zookeeper와 kafka에 공통적으로 작성된 image, ports, networks는 각각 어떤 이미지에서 컨테이너를 생성할 것인지, 포트포워딩 설정, 전에 서비스를 배포할때 생성했던 docker-network에서 할당받을 ip 주소입니다.

 

Kafka 설정에서 environment는 위에서 부터 순서대로 Kafka 서버의 ip 주소, 디폴트 TOPIC의 이름, 그리고 위에서 생성한 zookeeper 서버의 포트번호를 의미합니다.

 

이렇게 docker-compose.yml 파일을 생성한 후 다음과 같은 명령어를 입력해주면

 

docker-compose -f docker-compose.yml up -d

(사전에 docker-compose가 ec2서버에 설치되어 있어야 함)

 

아래와 같이 컨테이너도 잘 생성된 것을 확인할 수 있고 network에도 등록된 것을 확인할 수 있습니다.

 

 

Kafka 적용하기

build.gradle

// kafka
implementation 'org.springframework.kafka:spring-kafka'

 

먼저 Consumer와 Producer 코드를 작성할 마이크로 서비스에 위 dependency를 추가해줍니다. 

Consumer

KafkaConsumerConfig

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.101:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }
}

 

메세지를 소비할 Consumer의 configure 파일을 생성하고 토픽에 접속하기 위한 정보를 저장하는 ConsumerFactory Bean과 토픽에 어떤 변경사항이 있는지 계속 리스닝하고 있는 ConcurrentKafkaListenerContainerFactory Bean을 생성해줍니다.

 

  • BOOTSTRAP_SERVERS_CONFIG : 사용하고자 하는 Kafka 서버의 주소
  • GROUP_ID_CONFIG : KafkaConsumer를 그룹핑화하기 위한 설정
  • KEY_DESERIALIZER_CLASS_CONFIG : Producer에서 보낸 Json 데이터의 KEY값을 deserailize하기 위한 설정
  • VALUE_DESERIALIZER_CLASS_CONFIG : Producer에서 보낸 Json 데이터의 VALUE값을 deserailize하기 위한 설정 

KafkaConsumer

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {

    private final ParticipantRepository participantRepository;

    @KafkaListener(topics = "example-participant-topic")
    @Transactional
    public void updateProfileImg(String kafkaMessage) {
        log.info("Kafka Message ->" + kafkaMessage);

        // 역직렬화
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        Object userIdObj = map.get("userId");
        if (userIdObj instanceof Integer) {
            Integer userIdInt = (Integer) userIdObj;
            Long userId = Long.valueOf(userIdInt);
            participantRepository.updateProfileImg(userId, (String)map.get("profileImg"));
        }

    }
}

 

@KafkaListener 어노테이션을 사용해서 토픽을 지정해줍니다. Producer에서 해당 토픽에 메세지를 전달하여 변동사항이 생기면 Config 클래스에서 작성했던 ConcurrentKafkaListenerContainerFactory Bean이 변동사항을 감지하고 메세지에서 데이터를 가져옵니다. 그 후 Json 포맷으로 넘어온 데이터를 역직렬화하고 데이터베이스에 변경사항을 반영해줍니다. 

Producer

KafkaProducerConfig

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.101:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

ProducerFacotry Bean 설정은 ConsumerFactory와 같고 Kafka에 데이터를 보내기 위한 객체로 KafkaTemplate Bean을 생성해줍니다.

 

KafkaProducer

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, ProfileImgRequestDto requestDto) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(requestDto);
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Producer sent data from User microservice: " + requestDto);
    }
}

 

try ~ catch 문의 코드를 사용해서 데이터를 Json 포맷으로 전달하기 위해 변환과정을 거치고 Config 파일에서 생성한 KafkaTemplate의 send() 메서드에 topic과 데이터를 담아 전달해줍니다. 이때 topic은 KafkaConsumer 클래스에서 작성했던 @KafkaListener(topics = "example-participant-topic") 토픽을 넣어줍니다.


여기까지 설정하고 카프카 메세지를 전달해야하는 시점에 아래와 같이 KafkaProducer의 send() 메서드를 호출하면 데이터가 잘 업데이트된 것을 확인할 수 있습니다.

@Service
@RequiredArgsConstructor
public class AuthServiceImpl implements AuthService {

    private final UserRepository userRepository;

    private final AmazonS3Service amazonS3Service;

    private final KafkaProducer kafkaProducer;

    @Override
    @Transactional
    public ProfileResponseDto updateProfile(Long userId, ProfileUpdateRequestDto requestDto, MultipartFile profileImg) {
        User user = getUser(userId);

        String location = requestDto.getLocation();
        String description = requestDto.getDescription();

        if (profileImg == null) {
            user.updateProfile(location, description);
        } else {
            deleteS3Img(user);
            String profileImgKey = saveS3Img(profileImg);
            String profileImgUrl = amazonS3Service.getFileUrl(profileImgKey);
            user.updateProfile(profileImgKey, profileImgUrl, location, description);
            kafkaProducer.send("example-participant-topic", new ProfileImgRequestDto(userId, profileImgUrl));
        }

        return ProfileResponseDto.from(user);
    }

    private String saveS3Img(MultipartFile profileImg) {
        try {
            return amazonS3Service.upload(profileImg, "UserProfile");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void deleteS3Img(User user) {
        if (user.getProfileImgKey() != null && !user.getProfileImgKey().isBlank())
            amazonS3Service.delete(user.getProfileImgKey());
    }

    private User getUser(Long userId) {
        return userRepository.findById(userId)
                .orElseThrow(() -> new ApiException(ExceptionEnum.MEMBER_NOT_EXIST_EXCEPTION));
    }

}

프로젝트를 할때마다 CORS 설정을 해주고 있지만 매번 CORS 에러를 만나는게 정확한 이해없이 사용하고 있는 것 같아 한번 정리하고 넘어가려고 합니다.

CORS 란?

CORSCross Origin Resource Sharing의 약자로 출처가 다른 리소스들을 공유하는 것을 의미합니다. 여기서 출처란 프로토콜, 호스트, 포트로 구성된 서버 위치를 의미하는데 두 리소스들 간에 프로토콜, 호스트, 포트 중 하나만 달라도 다른 리소스에서 요청을 보냈을 때 CORS 에러가 발생합니다.

 

ex) https://spring.io:8080 

  • 프로토콜 : https, http
  • 호스트 : spring.io
  • 포트 : 8080

그렇다면 왜 이처럼 출처가 다른 리소스의 요청을 막는 것일까?

출처가 다른 리소스를 공유하여 브라우저를 실행하는 행위는 보안상 매우 위험합니다. 사용자의 개인 정보가 유출될 수 있으며, XSS(Cross Site Scripting) 같은 방식으로 조작된 데이터가 클라이언트에 전달될 수 있기 때문인데요. 그래서 브라우저들은 같은 출처의 리소스만 사용하도록 제한하는 방식인 SOP(Same Origin Policy) 정책을 따르도록 되어 있습니다.

 

하지만 SOP는 다음과 같은 예외 사항이 있으며 CORS 인증 과정을 거치면 다른 출처의 리소스도 사용할 수 있습니다.

  • <img> 태그를 사용하여 다른 출처의 이미지 파일을 요청하는 경우
  • <link> 태그를 사용하여 다른 출처의 CSS 파일을 요청하는 경우
  • <script> 태그를 사용하여 다른 출처의 자바 스크립트 파일을 요청하는 경우

CORS 인증과정

CORS 인증과정을 통과하려면 프리플라이트(preflight)라는 과정을 거쳐야 합니다. preflight는 다른 출처의 리소스를 요청하기 전에 사용 가능 여부를 물어보는 과정으로 클라이언트는 preflight의 응답 메시지에 포함된 헤더와 상태 코드를 읽고 CORS 사용 여부를 판단합니다.

 

 

CORS 인증은 위와 같은 과정을 거칩니다. 먼저 웹 브라우저가 http://www.springtour.io 호스트에서 Html 리소스를 받아갑니다. Html 리소스를 받아올 때 api.js JavaScript 문서도 함께 받아오고 js 문서에 포함된 hotels라는 REST-API를 사용해 화면에 보여줄 데이터를 http://api.springtour.io 호스트로부터 받아옵니다. 

 

하지만 이때 두 호스트의 출처가 다르기 때문에 먼저 프리플라이트 요청을 보내는데 요청 메시지에는 다음과 같은 정보들이 담겨 있습니다.

  • Host : 요청 대상 (http://api.springtour.io)
  • Origin : 요청을 보내는 출처 (www.springtour.io)
  • Access-Control-Request-Method : 특정 메서드에 대한 접근 권한 요청
  • Access-Control-Request-Headers : 특정 헤더에 대한 접근 권한 요청

이렇게 요청을 보냈을 때 REST-API 애플리케이션(http://api.springtour.io)에서 CORS 설정이 되어있으면 프리플라이트 요청에 대한 응답 메세지를 보내줍니다.

  • Access-Control-Allow-Origin : CORS를 허용하는 출처를 의미, 모든 출처를 허용하는 경우 *으로 응답
  • Access-Control-Allow-Method : CORS를 허용하는 출처에서 사용할 수 있는 HTTP 메서드
  • Access-Control-Allow-Headers : CORS를 허용하는 출처에서 사용할 수 있는 HTTP 헤더
  • Access-Control-Max-Age : 최초 CORS 인증 후 유효 시간, 이 시간동안에는 또 다시 CORS 인증을 하지 않아도 된다.

CORS 설정 방법

1. addCorsMappings

WebMvcConfigurer 에서 제공하는 addCorsMappings() 메서드를 사용해서 CORS 설정을 할 수 있습니다. addCorsMappings() 메서드는 CorsRegistry를 인자로 받아 사용합니다.

  • addMapping("/**") : 모든 리소스에 대해 CORS를 적용
  • allowedOrigins("www.springtour.io") : www.springtour.io 출처에 대해서만 CORS 허용
  • allowedMethod("GET", "POST", "PUT") : GET, POST, PUT Http Method에 대해서만 CORS 허용
  • maxAge(24 * 60 * 60) : 하루동안 CORS 인증 유효
@Configuration
public class MvcConfig implements WebMvcConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
    	registry.addMapping("/**")
                .allowedOrigins("www.springtour.io")
                .allowedMethod("GET", "POST", "PUT", "PATCH", "DELETE")
                .allowedHeaders("*")
                .maxAge(24 * 60 * 60);
    }
}

2. CorsConfigurationSource

Spring Security를 사용하면 HttpSecurity에 체이닝 메서드로 cors() 설정을 해줄 수 있습니다. 이때 CorsConfigurationSource를 사용해서 CORS의 속성을 정의해 줄 수 있습니다.

 

아래 예시에서 Origin, Method, Header 관련 메서드들은 addCorsMapping() 에서 사용한 것과 같으니 다른 부분만 집고 넘어가겠습니다.

  • setAllowCredentials : 자격 증명과 함께 요청을 할 수 있는지 여부, 해당 서버에서 Authorization으로 사용자 인증을 요청할 것이라면 true로 설정해야 한다.
  • addAllowedOriginPattern : allowCredentials가 true일 때 allowedOrigins에 특수 값인 "*" 추가할 수 없게 됨, 대신 addAllowedOriginPattern을 사용
  • addExposedHeader : 특정 응답 헤더 허용
@Configuration
@EnableWebSecurity
@EnableGlobalMethodSecurity(securedEnabled = true, prePostEnabled = true)
@RequiredArgsConstructor
public class SecurityConfig {

    @Bean
    public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
        http
                .httpBasic().disable()
                .cors().configurationSource(corsConfigurationSource());

        return http.build();
    }
    
    @Bean
    public CorsConfigurationSource corsConfigurationSource() {
        CorsConfiguration configuration = new CorsConfiguration();
        // configuration.addAllowedOrigin("*");
        configuration.addAllowedOriginPattern("*");
        configuration.addAllowedMethod("*");
        configuration.addAllowedHeader("*");
        configuration.addExposedHeader("Authorization");
        configuration.setAllowCredentials(true);
        configuration.setMaxAge(3600L);
        UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
        source.registerCorsConfiguration("/**", configuration);
        return source;
    }

}

3. Spring Cloud Gateway 에서 CORS 설정

@Configuration
public class CorsConfiguration {
    private static final String ALLOWED_HEADERS = "authorization, Content-Type, Content-Length, Authorization, credential, X-XSRF-TOKEN, refreshtoken, RefreshToken";
    private static final String ALLOWED_METHODS = "GET, PUT, POST, DELETE, OPTIONS, PATCH";
    private static final String EXPOSE_HEADERS = "*, Authorization";
    private static final String MAX_AGE = "7200"; //2 hours (2 * 60 * 60)

    @Bean
    public WebFilter corsFilter() {
        return (ServerWebExchange ctx, WebFilterChain chain) -> {
            ServerHttpRequest request = ctx.getRequest();
            String origin = request.getHeaders().getOrigin();

            if (CorsUtils.isCorsRequest(request)) {
                ServerHttpResponse response = ctx.getResponse();
                HttpHeaders headers = response.getHeaders();

                if (origin.startsWith("http://localhost:3000") || origin.startsWith("http://j8c209.p.ssafy.io")) {
                    headers.add("Access-Control-Allow-Origin", origin);
                }

                headers.add("Access-Control-Allow-Methods", ALLOWED_METHODS);
                headers.add("Access-Control-Max-Age", MAX_AGE);
                headers.add("Access-Control-Allow-Headers", ALLOWED_HEADERS);
                headers.add("Access-Control-Expose-Headers", EXPOSE_HEADERS);
                headers.setAccessControlAllowCredentials(true);
                if (request.getMethod() == HttpMethod.OPTIONS) {
                    response.setStatusCode(HttpStatus.OK);
                    return Mono.empty();
                }
            }
            return chain.filter(ctx);
        };
    }
}

2023.03.19 - [Spring] - [Spring] MSA 프로젝트 만들기 (1)

 

[Spring] MSA 프로젝트 만들기 (1)

2023.02.28 - [Server] - [Spring] 마이크로서비스 아키텍쳐 (Micro Service Architecture, MSA) [Spring] 마이크로서비스 아키텍쳐 (Micro Service Architecture, MSA) 이번에 블록체인 기반의 프로젝트를 진행하게 되었는데

keylog.tistory.com

이전에 MSA 프로젝트를 개발 할때 OpenFeign 라이브러리를 사용하여 동기 방식으로 마이크로 서비스들 간에 내부 통신을 구현했는데 이제 비동기 방식으로 통신하고 동일한 기능을 가진 마이크로 서비스들 간에 데이터를 동기화 하기 위해 카프카에 대해서 정리하고 적용해보려고 합니다.

Apache Kafka

Apache Kafka는 Apache Software Foundation의 Scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트 입니다. 메시지 브로커란 특정한 리소스(서비스, 시스템)에서 다른 리소스로 메시지를 전달할 때 사용하는 메시지 서버로 단순한 텍스트 뿐만 아니라 json, xml, Object 등 다양한 형태의 데이터를 전달할 수 있습니다.

 

Kafka를 사용하기 전에는 아래 예시와 같이 End-to-End 방식으로 아키텍처들이 연결되어 있었는데 이렇게 되면 데이터 연동의 복잡성이 증가하고 확장이 어려워집니다.

 

그래서 모든 리소스로 부터 데이터를 실시간으로 전송하여 처리할 수 있고 확장을 용이하게 할수 있게 하기 위해 관계형 데이터베이스나 애플리케이션과 같은 다른 여러 시스템 사이에 Kafka를 두고 이 Kafka를 통해 데이터를 전달해 주는 구조로 변경할 계획입니다.

Kafka 설치하기

http://kafka.apache.org

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

먼저 위 링크를 통해 kafka를 설치해줍니다.

kafka는 Window 버전 Linux 버전 mac OS 버전 등 OS에 따라 따로 배포하지 않습니다. 대신 최신 버전을 설치하면 그 안에 Window, Linux, max OS 커맨드가 모두 들어가 있습니다.

 

 

설치를 완료했으면 위와 같이 kafka가 설치된 폴더에 접근하고 tar svf kafka_2.13-2.7.0.tgz 명령어를 사용해 압축을 해제해 줍니다.
압축을 해제한 후 해당 폴더에서 config 폴더에 접근하면 아래와 같은 설정 파일 목록을 확인할 수 있습니다.

 

 

이번에는 bin 폴더를 확인해 보면 아래와 같이 sh 파일과 windows 폴더를 확인할 수 있습니다. 이전에 말씀 드렸듯이 kafka는 os에 따라 따로 배포를 하지 않기 때문에 아래와 같이 Linux와 mac OS 를 위한 sh 파일과 Windows 를 위한 bat 파일을 한번에 다운 받습니다.

bat 파일은 windows 폴더 안에 있습니다.

 

카프카 사용해보기

카프카를 사용 하기 위해서는 먼저 카프카를 관리해주는 zookeeper 서버를 실행해주어야 합니다.

zookeeper란 분산 애플리케이션을 관리하기 위한 코디네이션 시스템입니다. 분산 애플리케이션이 안정적인 서비스를 할 수 있도록 분산되어 있는 각 애플리케이션의 정보를 중앙에 집중하고 구성 관리, 그룹 관리 네이밍, 동기화 등의 서비스를 제공해줍니다.

 

Broker를 하나의 kafka 서버라고 했을 때 위 사진과 같이 zookeeper가 여러대의 kafka application server를 관리해주고 메시지를 공유해 줌으로써 하나의 kafka 서버에서 문제가 생겼을 때 다른 kafka 서버를 통해 메시지를 사용할 수 있도록 해줍니다.


그러면 터미널에서 Kafka를 사용해보기 위해 아래 명령어들을 입력하여 zookeeper 서버, kafka 서버, 메시지를 보내는 역할의 Producer와 받는 역할을 할 Consumer를 실행시켜 줍니다.

 

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

./bin/kafka-server-start.sh ./config/server.properties

 

카프카는 기본적으로 Producer가 메시지를 보내면 토픽에 메시지가 저장됩니다. 그리고 나서 토픽에 저장된 메시지는 해당 토픽을 등록한 Consumer에게 일괄적으로 전달됩니다. 그렇기 때문에 Producer와 Consumer를 생성하기 전에 먼저 아래 명령어로 토픽을 생성해 줍니다.

 

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic quickstart-events  --partitions 1

이때 뒤에 --partitions 속성은 멀티 클러스트링을 구성했을 때 토픽에 저장되어 있는 메시지를 몇군데 저장할 건지 지정하는 옵션입니다.

 

토픽 생성 후 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list 명령어로 리스트를 확인해 보면 아래와 같이 quickstart-events 라는 이름이로 토픽이 하나 생성된 것을 확인할 수 있습니다.

토픽을 생성 했으면 이어서 Producer와 Consumer를 생성해줍니다.

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning

 

위 명령어들을 제대로 입력했으면 아래와 같이 4개의 터미널이 실행중인 것을 확인할 수 있습니다.

(왼쪽 위부터 시계방향으로 zookeeper server, kafka server, consumer, producer)

 

 

그런 다음 producer에서 메시지를 입력하면 아래와 같이 producer와 같은 토픽에 연결된 consumer가 메시지를 받는 것을 확인할 수 있습니다.

 

 

여기까지 kafka를 설치하고 터미널을 사용해서 zookeeper, kafka, producer, consumer를 실행한 뒤 간단한 메시지를 주고받는 것 까지 해보았습니다. 다음 글에서는 카프카와 데이터베이스를 연동하기 위한 kafka connect를 사용해보고 데이터를 동기화 해준 뒤 테스트하는 과정까지 한번 알아보겠습니다.

2023.03.22 - [Spring] - [Spring] MSA 프로젝트 만들기 (2)

 

[Spring] MSA 프로젝트 만들기 (2)

2023.03.19 - [Spring] - [Spring] MSA 프로젝트 만들기 (1) [Spring] MSA 프로젝트 만들기 (1) 2023.02.28 - [Server] - [Spring] 마이크로서비스 아키텍쳐 (Micro Service Architecture, MSA) [Spring] 마이크로서비스 아키텍쳐 (Micro

keylog.tistory.com

이전 글에서 gateway-service에서 Custom Filter를 구현하여 권한이 필요한 요청에 인증된 회원의 요청만 전달되도록 하는 방법과 Spring Cloud Gateway에서 CORS 설정을 하는 방법에 대해 알아보았습니다. 이번 글에서는 마이크로 서비스들 간에 통신하는 방법인 RestTemplate FeignClient에 대해서 알아보겠습니다.

 

RestTemplate

RestTemplate 클래스는 다른 서버의 REST-API를 호출할 수 있도록 URI를 설정하고 GET, POST 같은 HTTP 메서드를 사용할 수 있는 메서드를 제공해줍니다. 또한 요청 메시지의 헤더, 바디를 구성할 수 있는 클래스를 사용할 수 있으며, 응답 메시지의 헤더, 상태 코드, 바디를 조회할 수 있는 클래스를 사용할 수 있습니다.

 

@SpringBootApplication
@EnableDiscoveryClient
public class UserServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserServiceApplication.class, args);
    }
    
    @Bean
    @LoadBalanced
    public RestTemplate getRestTemplate() {
    	return new RestTemplate();
    }

}

  

또한 RestTemplate은 멀티 스레드 환경에 안전한 클래스이므로 스프링 빈으로 객체를 생성하고 필요한 곳에 주입해서 사용할 수 있습니다. UserServiceApplication 클래스에서 RestTemplate 반환 값을 처리할 수 있는 getRestTemplate() 메서드를 빈으로 등록해 줍니다. 

 

여기서 @LoadBalanced 어노테이션은 밑에서 RestTemplate으로 challenge-service API를 호출할때 마이크로 서비스의 이름으로 호출하기 위해서 사용했습니다.

 

/**
 * 현재 user가 참가중인 챌린지 개수 반환
 */
@GetMapping("/challengeInfo/users/{userId}")
public ResponseEntity<UserChallengeInfoResponseDto> userChallengeInfo(@PathVariable Long userId) {
    return ResponseEntity.status(HttpStatus.OK).body(challengeService.myChallengeList(userId));
}

 

그런 다음 위와 같이 현재 user가 참가중인 챌린지 개수를 반환해주는 API가 challenge-service에 작성되어있다고 했을 때 아래와 같이 userChallengeInfo API를 호출하는 RestTemplate 코드를 작성해줍니다.

 

@Override
@Transactional(readOnly = true)
public UserResponseDto getUserInfo(String userId) {
    ...

    // Using as restTemplate
    String challengeUrl = String.format(env.getProperty("challenge_service.url"), userId);
    ResponseEntity<UserChallengeInfoResponseDto> userChallengeInfoResponse =
            restTemplate.exchange(challengeUrl, HttpMethod.GET, null,
                    new ParameterizedTypeReference<UserChallengeInfoResponseDto>() {
            });

    UserChallengeInfoResponseDto userChallengeInfoResponseDto = userChallengeInfoResponse.getBody();
        
    ...
}

 

application.yml

...

challenge_service:
  url: http://challenge_service/challenge_service/%s/orders

 

호출할 API의 url은 위와 같이 application.yml 파일에 작성한 뒤  Environment 객체를 사용해 조회해 왔습니다. 그리고 restTemplate 객체에 url, method, body를 작성해주고 ParameterizedTypeReference 객체의 Generic 타입을 지정하여 challenge-service에서 작성한 API와 리턴 타입을 맞춰줍니다.

 

여기까지 작성해준 뒤 user-service에서 사용자 정보를 조회해오는 API를 호출해주면 아래와 같이 challenge-service에서 조회해 온 정보가 포함되어서 리턴되는 것을 확인할 수 있습니다.

FeignClient

FeignClient는 Rest call을 추상화한 Spring Cloud Netflix에서 제공하는 라이브러리입니다. FeignClient 방식은 호출하고자 하는 마이크로 서비스의 API를 인터페이스로 정의하고 @FeignClient 어노테이션을 적용해주는 것 만으로 쉽게 사용할 수 있습니다. 그렇기 때문에 RestTemplate를 사용하기 위해 작성했던 코드 같이 중복되는 코드를 줄여주며 RestTemplate 보다 간편하게 사용할 수 있습니다.

사용방법

build.gradle

dependencies {
    ...

    // Feign Client
    implementation group: 'org.springframework.cloud', name: 'spring-cloud-starter-openfeign', version: '4.0.1'

    ...
}

 

먼저 build.gradle에서 위와 같이 FeignClient dependency를 추가해줍니다.

그런다음 user-service에서 FeignClient를 사용하겠다는 의미로 UserServiceApplication 클래스에서 @EnableFeignClients 어노테이션을 선언해줍니다.

 

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class UserServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserServiceApplication.class, args);
    }

}

 

ChallengeServiceClient

@FeignClient(name = "challenge-service")
public interface ChallengeServiceClient {

    @GetMapping("/challenges/challengeInfo/users/{userId}")
    BaseResponseDto<ChallengeResponseDto> getChallengeInfo(@PathVariable Long userId);
}

 

challenge-service에서 호출할 API를 정의하기 위해 인터페이스를 생성해줍니다. 그런다음 @FeignClient 어노테이션을 추가해준 뒤 name 속성에 호출할 마이크로 서비스의 이름을 작성해 줍니다. 저희는 challenge-service에서 getChallengeInfo() 라는 API를 호출할 것이기 때문에 challenge-service로 작성하겠습니다.

 

그런다음 호출하고자 하는 API와 method와 url 그리고 파라미터 등을 맞춰서 메소드를 생성해줍니다.

 

이후 사용방법은 모놀리스식 서비스에서 다른 서비스의 메소드를 호출하는 방법과 같습니다. 작성한 Client 인터페이스를 의존성 주입받아서 challengeServiceClient.getChallengeInfo(userId); 이런식으로 메서드를 호출해서 사용하시면 됩니다.

 

@Service
@RequiredArgsConstructor
public class AuthServiceImpl implements AuthService {

    ...

    private final PayServiceClient payServiceClient;

    private final ChallengeServiceClient challengeServiceClient;

	...

    @Override
    @Transactional(readOnly = true)
    public MypageResponseDto getMypageInfo(Long userId) {
        User user = getUser(userId);
        ChallengeResponseDto challengeResponseDto = challengeServiceClient.getChallengeInfo(userId);
        MoneyResponseDto moneyResponseDto = payServiceClient.getMoneyInfo(userId);
        return MypageResponseDto.of(user, challengeResponseDto, moneyResponseDto);
    }

    private User getUser(Long userId) {
        return userRepository.findById(userId)
                .orElseThrow(() -> new ApiException(ExceptionEnum.MEMBER_NOT_EXIST_EXCEPTION));
    }

}

 

참고로 RestTemplateFeignClient 두가지 방식 모두 마이크로 서비스간 내부에서 통신할때는 gateway-service를 통과하지 않습니다. 물론 API Gateway를 통해 호출해도 되지만, 내부적인 통신을 사용할 때 굳이 외부에서 다시 접속 요청을 할 필요는 없기 때문에 권한이 필요한 요청의 경우 토큰에 userId를 담아서 전달하지 않고 @PathVariable이나 @RequestBody를 사용해서 userId를 전송해주는 방식으로 호출했습니다.

 

지금까지 마이크로 서비스간 내부통신을 하기 위한 방법으로 RestTemplate과 FeignClient에 대해서 알아보았습니다. 다음 글에서는 JenkinsDocker를 사용해서 CI-CD 파이프라인을 구축하고 파이프라인 스크립트 코드를 작성하여 변경된 마이크로 서비스만 재배포 되도록 병렬처리 하는 방법에 대해서 알아보겠습니다.

2023.03.19 - [Spring] - [Spring] MSA 프로젝트 만들기 (1)

 

[Spring] MSA 프로젝트 만들기 (1)

2023.02.28 - [Server] - [Spring] 마이크로서비스 아키텍쳐 (Micro Service Architecture, MSA) [Spring] 마이크로서비스 아키텍쳐 (Micro Service Architecture, MSA) 이번에 블록체인 기반의 프로젝트를 진행하게 되었는데

keylog.tistory.com

이전 글에서 gateway 프로젝트와 micro service 프로젝트를 생성하고 이를 Eureka Server에 등록하는 방법에 대해 알아보았습니다. 이번 글에서는 모든 클라이언트의 요청이 지나는 gateway-service에서 Custom Filter를 구현하여 권한이 필요한 요청에 인증된 회원의 요청만 전달되도록 하는 방법에 대해 알아보겠습니다.

먼저 MSA 구조에서는 세션 자체가 서버의 자원을 많이 소모하기도 하고 여러 마이크로 서비스들 간에 세션을 공유해주어야 하기 때문에 세션 방식을 권장하지 않습니다. 그렇기 때문에 gateway-service에서 JWT 토큰을 사용하여 사용자의 권한을 체크하는 방식으로 구현하겠습니다.

Dependency

gateway-service 프로젝트에 JWT 토큰을 사용하기 위한 jsonwebtoken Dependency와 Request에 포함된 Header 정보로 부터 전달된 토큰을 Base64로 인코딩/디코딩 할 때 사용하기 위한 jaxb-api Dependency를 추가해줍니다.

// JsonWebToken
implementation group: 'io.jsonwebtoken', name: 'jjwt', version: '0.9.1'

// Jaxb-api
implementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.4'

JWTUtil

Request Header로 부터 JWT 토큰을 가져오기 위한 메서드들을 정리해 놓은 JWTUtil 클래스를 작성해줍니다.

중간에 resolveToken() 메서드에서 HttpServletRequest가 아닌 ServerHttpRequest를 사용한 이유는 Spring Cloud Gateway 기존의 임베디드 톰켓 기반의 Spring Boot Web 애플리케이션과는 다르게 Netty 기반의 비동기 통신을 지원하는 Spring WebFlux 구현되어 있습니다. 그렇기 때문에 이렇게 비동기 방식으로 데이터를 사용할때는 ServerHttpRequest 객체를 사용합니다.

 

@Component
public class JWTUtil {

    @Value("${token.secret}")
    private String secretKey;

    @PostConstruct
    protected void init() {
        secretKey = Base64.getEncoder().encodeToString(secretKey.getBytes());
    }

    // 토큰에서 UserPk 추출
    public String getUserPk(String token) {
        return Jwts.parser().setSigningKey(secretKey).parseClaimsJws(token).getBody().getSubject();
    }

    // 헤더에서 토큰 추출
    public String resolveToken(ServerHttpRequest request) {
        String bearerToken = request.getHeaders().get(HttpHeaders.AUTHORIZATION).get(0);
        if (bearerToken != null && bearerToken.startsWith("Bearer ")) {
            return bearerToken.substring(7);
        }
        return null;
    }

    // 토큰의 유효성 + 만료일자 확인
    public void validateToken(String jwtToken) {
        Jws<Claims> claims = Jwts.parser().setSigningKey(secretKey).parseClaimsJws(jwtToken);
        claims.getBody().getExpiration();
    }
}

JwtAuthenticationFilter

AbstractGatewayFilterFactory를 상속받아 구현한 Custom Filter를 작성해 줍니다. 그리고 apply()라는 메서드에서 해당 필터에서 동작시키고자 하는 로직을 작성하여 Override 해줍니다. 

 

apply() 메서드의 매개변수는 각각 ServerWebExchange와 GatewayFilterChain이고 ServerWebExchange에서 ServerHttpRequest 객체를 가져올 수 있습니다. 그리고나서 JWTUtil 클래스를 이용하여 전달받은 ServerHttpRequest에서 JWT Token을 가져오고 유효성검사를 통과하면 토큰으로부터 userId 값을 추출합니다. 추출된 userId 값은 Request Header에 실려 마이크로 서비스로 전달됩니다.

 

Spring Cloud Gateway는 MVC의 Controller를 사용하지 않으므로 우리가 스프링 단에서 에러를 커스텀 할 때 사용했던 @RestControllerAdvice 나 @ExceptionHandler 를 사용할 수 없습니다. 대신에 아래와 같이 webflux에서 사용할 수 있는 ErrorWebExceptionHandler 인터페이스를 구현하여 에러처리를 할 수 있습니다.

 

  • ExpiredJwtException : JWT의 유효기간이 초과
  • MalformedJwtException : 잘못된 JWT 구조
  • SignatureException : JWT의 서명실패(변조 데이터)
  • UnsupportedJwtException : JWT가 예상하는 형식과 다른 형식이거나 구성
@Component
@Slf4j
public class JwtAuthenticationFilter extends AbstractGatewayFilterFactory<JwtAuthenticationFilter.Config> {

    private JWTUtil jwtUtil;

    @Autowired
    public JwtAuthenticationFilter(JWTUtil jwtUtil) {
        super(Config.class);
        this.jwtUtil = jwtUtil;
    }

    public static class Config {

    }

    @Override
    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
            ServerHttpRequest request = exchange.getRequest();

            // 헤더에서 토큰 뽑아오기
            String accessToken = jwtUtil.resolveToken(request);

            // 유효한 토큰인지 확인합니다.
            jwtUtil.validateToken(accessToken);

            String userId = jwtUtil.getUserPk(accessToken);
            exchange.getRequest().mutate()
                    .headers(httpHeaders -> httpHeaders.add("userId", userId)).build();

            return chain.filter(exchange);
        };
    }

    @Bean
    public ErrorWebExceptionHandler tokenValidation() {
        return new JwtTokenExceptionHandler();
    }

    public class JwtTokenExceptionHandler implements ErrorWebExceptionHandler {

        private String getErrorCode(int errorCode) {
            return "{\"errorCode\":" + errorCode +"}";
        }

        @Override
        public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        
            int errorCode = 500;
            if (ex.getClass() == NullPointerException.class) {
                errorCode = 401;
            } else if (ex.getClass() == ExpiredJwtException.class) {
                errorCode = 402;
            } else if (ex.getClass() == MalformedJwtException.class) {
                errorCode = 403;
            } else if (ex.getClass() == SignatureException.class) {
                errorCode = 404;
            } else if (ex.getClass() == UnsupportedJwtException.class) {
                errorCode = 405;
            }

            byte[] bytes = getErrorCode(errorCode).getBytes(StandardCharsets.UTF_8);
            DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
            return exchange.getResponse().writeWith(Flux.just(buffer));
        }
    }

}

CorsConfiguration

@Configuration
public class CorsConfiguration {
    private static final String ALLOWED_HEADERS = "authorization, Content-Type, Content-Length, Authorization, credential, X-XSRF-TOKEN";
    private static final String ALLOWED_METHODS = "GET, PUT, POST, DELETE, OPTIONS, PATCH";
    private static final String EXPOSE_HEADERS = "*, Authorization";
    private static final String MAX_AGE = "7200"; //2 hours (2 * 60 * 60)

    @Bean
    public WebFilter corsFilter() {
        return (ServerWebExchange ctx, WebFilterChain chain) -> {
            ServerHttpRequest request = ctx.getRequest();
            String origin = request.getHeaders().getOrigin();

            if (CorsUtils.isCorsRequest(request)) {
                ServerHttpResponse response = ctx.getResponse();
                HttpHeaders headers = response.getHeaders();

                if (origin.startsWith("http://localhost:3000") || origin.startsWith("http://j8c209.p.ssafy.io")) {
                    headers.add("Access-Control-Allow-Origin", origin);
                }

                headers.add("Access-Control-Allow-Methods", ALLOWED_METHODS);
                headers.add("Access-Control-Max-Age", MAX_AGE);
                headers.add("Access-Control-Allow-Headers", ALLOWED_HEADERS);
                headers.add("Access-Control-Expose-Headers", EXPOSE_HEADERS);
                headers.setAccessControlAllowCredentials(true);
                if (request.getMethod() == HttpMethod.OPTIONS) {
                    response.setStatusCode(HttpStatus.OK);
                    return Mono.empty();
                }
            }
            return chain.filter(ctx);
        };
    }
}

 

서로 다른 출처(origin)의 요청을 허용해 주기 위한 CorsConfiguration 클래스입니다. 처음에 Spring Cloud Gateway 공식 문서 처럼 application.yml 파일에서 아래와 같이 cors 설정을 했었는데 그래도 cors 에러가 떴습니다. 

 

개발자 도구에서 network를 열어보니 응답 헤더에서 Access-Control-Allow-Origin 값이 빠져있었습니다. 그래서 위와 같이 직접 Config 클래스를 생성해서 헤더에 해당 값을 더해주었고 Access-Control-Allow-Credentials 값을 true로 설정해 주었습니다.

 

추가로 프론트 단에서도 아래와 같이 Credentials 값을 true로 설정해줌으로써 문제를 해결할 수 있었습니다.

 

 

https://cloud.spring.io/spring-cloud-gateway/reference/html/#cors-configuration

 

Spring Cloud Gateway

This project provides an API Gateway built on top of the Spring Ecosystem, including: Spring 5, Spring Boot 2 and Project Reactor. Spring Cloud Gateway aims to provide a simple, yet effective way to route to APIs and provide cross cutting concerns to them

cloud.spring.io

spring :
  cloud :
    gateway:
      default-filters:
        - DedupeResponseHeader=Access-Control-Allow-Origin Access-Control-Allow-Credentials
      globalcors:
        corsConfigurations:
          '[/**]':
            allowedOrigins:
              - "http://j8c209.p.ssafy.io/"
              - "http://localhost:3000/"
            allow-credentials: true
            allowedHeaders: '*'
            exposedHeaders: '*'
            allowedMethods:
              - POST
              - GET
              - PUT
              - OPTIONS
              - DELETE
 

application.yml

마지막으로 작성한 필터를 아래와 같이 권한 검증이 필요한 API에 JwtAuthenticationFilter를 추가하여 마이크로 서비스로 요청이 전달되기 전에 토큰의 유효성 검사를 실시하도록 합니다.

 

server:
  port: 8000

eureka:
  instance:
    prefer-ip-address: true
    instance-id: ${spring.application.name}:${spring.application.instance_id:${server.port}}
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://localhost:8761/eureka

spring:
  profiles:
    include: jwt
  application:
    name: apigateway-service
  cloud:
    gateway:
      routes:
        - id: user-service
          uri: lb://USER-SERVICE
          predicates:
            - Path=/user-service/auth/**
            - Method=GET, POST, PATCH, DELETE
          filters:
            - RemoveRequestHeader=Cookie
            - RewritePath=/user-service/(?<segment>.*), /$\{segment}
            - JwtAuthenticationFilter
        - id: user-service
          uri: lb://USER-SERVICE
          predicates:
            - Path=/user-service/**
            - Method=GET, POST, PATCH
          filters:
            - RemoveRequestHeader=Cookie
            - RewritePath=/user-service/(?<segment>.*), /$\{segment}

 

이번 글에서는 Spring Cloud Gateway에서 AuthenticationFilter를 구현하는 방법과 CORS 설정을 추가하는 방법까지 알아보았습니다. 다음 글에서는 마이크로 서비스 간에 통신하기 위해 사용하는 RestTemplateFeignClient에 대해서 알아보겠습니다.

2023.02.28 - [Server] - [Spring] 마이크로서비스 아키텍쳐 (Micro Service Architecture, MSA)

 

[Spring] 마이크로서비스 아키텍쳐 (Micro Service Architecture, MSA)

이번에 블록체인 기반의 프로젝트를 진행하게 되었는데 팀원들과 MSA 방식으로 프로젝트를 개발해보자는 얘기가 나와서 개발에 들어가기 전에 MSA가 뭔지 모놀리식 방식과는 어떤 차이가 있는지

keylog.tistory.com

이전 글에서 MSA가 뭔지 모놀리스 아키텍처 방식과는 어떤 차이점이 있는지 알아봤는데 이번글에서는 인프런 강의인 Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) 에서 배운 내용으로간단한 MSA 프로젝트를 만들고 마이크로 서비스들 간에 통신하는 방법까지 알아보겠습니다.

Eureka Server

Eureka Server는 넷플릭스에서 스프링 재단에 기부한 OSS Service Registry입이다. 하나의 서버를 사용하면 마이크로 서비스마다 포트번호가 달라지고 여러대의 서버를 사용하는 경우 서비스 주소가 달라지게 됩니다. 그래서 각각의 마이크로 서비스들의 주소값을 Eureka Server에 Key-Value 형태로 등록해 놓고 사용자 요청이 들어왔을 때 조회해서 사용합니다.

Gateway

Gateway는 일종의 Load Balancer로 사용자 요청이 들어왔을때 Eureka Server에서 사용자 요청을 처리할 수 있는 마이크로 서비스의 주소를 조회해와서 요청을 전달해 주는 역할을 합니다. 그 밖에 Gateway에서 처리할 수 있는 일은 다음과 같습니다.

  • 인증 및 권한 부여
  • 서비스 검색 통합
  • 응답 캐싱
  • 정책, 회로 차단기 및 QoS 다시 시도
  • 속도 제한
  • 부하 분산
  • 로깅, 추적, 상관 관계
  • 헤더, 쿼리 문자열 및 청구 변환
  • IP 허용 목록에 추가

Eureka Client

Eureka Client는 해당 프로젝트를 마이크로 서비스로써 사용하겠다는 의미로 Eureka Server에 등록하여 사용합니다. 이번 글에서는 아래와 같이 Eureka ServerAPI Gateway Server를 만들고 User Service, Challenge Service, Payment Service라는 마이크로 서비스를 만들어 보겠습니다.

1.  Eureka Service

먼저 프로젝트를 생성할 때 위와 같이 Eureka Server Dependency를 추가해줍니다. 그런 다음 Eureka Server로 사용하겠다는 의미로 Application 클래스에 아래와 같이 @EnableEurekaServer 어노테이션을 추가해줍니다. 

@SpringBootApplication
@EnableEurekaServer
public class EurekaServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaServiceApplication.class, args);
    }

}

 

그리고 application.yml 파일을 아래와 같이 작성해줍니다. yml 파일 속성 중 register-with-eureka속성은 현재 프로젝트를 어딘가에 Eureka Client로 등록하겠다는 의미로 디폴트 값이 true입니다. 하지만 현재 프로젝트는 Eureka Client가 아닌 Eureka Server로 사용될 것이므로 false값으로 설정해줍니다.

 

그리고 fetch-registry 속성은 Eureka Server로 부터 인스턴스들의 정보를 주기적으로 가져올 것인지를 설정하는 속성입니다. true로 설정하면 갱신된 정보를 받겠다는 의미입니다.

server:
  port: 8761

spring:
  application:
    name: eurekaservice

eureka:
  client:
    register-with-eureka: false
    fetch-registry: false

Eureka Server는 8761 포트가 디폴트 포트로 만약 임의로 변경한다면 아래와 같은 에러가 발생합니다.

 

만약 포트번호를 변경하고 싶다면 eureka.client 속성 하위에 아래 속성을 추가해주면 됩니다.

service-url:
  defaultZone: http://localhost:${server.port}/eureka

2. apigateway-service

다음은 모든 클라이언트의 요청이 지나가고 Load Balancer 역할을 하는 gateway 프로젝트를 생성하겠습니다. 필수 Dependency로 아래 두 가지 Dependency를 추가하여 프로젝트를 생성해줍니다.

그리고 해당 프로젝트를 Eureka Client로 사용하겠다는 의미로 @EnableDiscoveryClient 어노테이션을 추가해줍니다.

@SpringBootApplication
@EnableDiscoveryClient
public class ApigatewayServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(ApigatewayServiceApplication.class, args);
    }

}

 

다음으로 application.yml 파일을 아래와 같이 작성해 줍니다.

 

server:
  port: 8000

eureka:
  instance:
    prefer-ip-address: true
    instance-id: ${spring.application.name}:${spring.application.instance_id:${server.port}}
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://localhost:8761/eureka

spring:
  application:
    name: apigateway-service
  cloud:
    gateway:
      routes:
        - id: user-service
          uri: lb://USER-SERVICE
          predicates:
            - Path=/user-service/welcome
            - Method=GET
          filters:
            - RemoveRequestHeader=Cookie
            - RewritePath=/user-service/(?<segment>.*), /$\{segment}
        - id: user-service
          uri: lb://USER-SERVICE
          predicates:
            - Path=/user-service/welcome/auth
            - Method=GET
          filters:
            - RemoveRequestHeader=Cookie
            - RewritePath=/user-service/(?<segment>.*), /$\{segment}
            - JwtAuthenticationFilter

eureka.instance.prefer-ip-address 속성은 서비스의 호스트 이름이 아닌 IP 주소를 Eureka Server 에 등록하도록 지정하겠다는 의미입니다.

 

Eureka Client  Eureka Server에 등록될 때 자동으로 아래와 같은 조합으로 식별자를 생성하여 등록됩니다.

${spring.cloud.client.hostname}:${spring.application.name}:${spring.application.instance_id:$server.port}}}

 

eureka.instance.instance-id 속성은 위와 같이 자동으로 생성된 식별자를 재정의 하겠다는 속성입니다.

그리고 eureka.client 속성 하위의 service-url 속성은 현재 프로젝트가 등록될 Eureka Server의 위치를 지정하는 속성입니다.

 

spring.cloud.gateway 하위의 속성은 클라이언트로 부터 들어오는 요청의 url에 따라 해당 요청을 처리할 수 있는 마이크로 서비스로 전달하기위한 속성입니다. predicates의 Path값에 해당하는 요청이 들어오면 Eureka Server에 USER-SERVICE라는 이름으로 등록된 마이크로 서비스로 요청을 전달합니다. 그리고 filters 하위에 있는 값은 RequestHeader의 값을 초기화 시키겠다는 속성과 마이크로 서비스에 요청을 전달할 때 요청 url을 user-service/welcome에서 /welcome으로 변경하겠다는 의미입니다.

 

두 번째 필터에 추가된 JwtAuthenticationFilter는 인증 및 권한을 보여하기 위해 스프링 빈으로 등록된 필터를 추가한 것입니다.

3. user-service

마지막으로 user-service 프로젝트를 생성해 보겠습니다. 다른 마이크로 서비스도 같은 방법으로 생성해 주시면 됩니다.

먼저 필수 dependency 로 아래와 같이 Eureka Discovery Client를 추가하여 프로젝트를 생성합니다.

 

그런 다음 gateway 프로젝트와 마찬가지로 @EnableDiscoveryClient 어노테이션을 아래와 같이 추가해줍니다.

 

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class UserServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserServiceApplication.class, args);
    }

}

 

마지막으로 application.yml 파일을 아래와 같이 작성해줍니다. eureka와 관련된 속성은 gateway에서 작성한 yml 파일 속성과 같습니다.

 

server:
  # port 번호가 0번이면 랜덤으로 배정된다.
  port: 0

spring:
  application:
    name: user-service
  h2:
    console:
      enabled: true
      settings:
        web-allow-others: true
      path: /h2-console
  datasource:
    driver-class-name: org.h2.Driver
    url: jdbc:h2:mem:devday
    username: sa
    password:
  jpa:
    hibernate:
      ddl-auto: create
    show-sql: true
    generate-ddl: true

eureka:
  instance:
    prefer-ip-address: true
    instance-id: ${spring.application.name}:${spring.application.instance_id:${random.value}}
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://127.0.0.1:8761/eureka

 

위와 같이 프로젝트를 작성하고 Eureka Server를 실행한 후 gateway server와 user-service를 실행해줍니다.

그런 다음 localhost:8761로 접속하면 아래와 같이 apigateway-server와 user-service가 Eureka Server에 등록된 것을 확인할 수 있습니다.

 

다음 글에서는 apigateway-server 에서 JWT 토큰 방식으로 회원인증을 구현하는 방식과 마이크로 서비스 간에 통신하는 방법인 RestTemplate 과 FeignClient에 대해서 알아보겠습니다.

테스트 코드를 작성하는 것은 개발단계에서 서비스 코드를 작성하는 것 만큼이나 매우 중요하다. 그 이유는 다음과 같다.

1. 작성한 코드가 의도한대로 동작하는지 알 수 있다.

작성한 코드를 테스트하는 방법은 포스트맨을 사용하거나 QA 과정에서 직접 서비스를 사용해 보면서 기능을 테스트하는 방법이 있는데 테스트 해야할 기능이 많다면 전부 테스트하는데 많은 시간이 걸린다. 하지만 테스트 코드를 미리 작성해 놓으면 애플리케이션을 실행하지 않고도 클릭 한번으로 테스트할 수 있으며 프로젝트 빌드 툴인 메이븐이나 그레이들 같은 툴을 사용하여 작성된 테스트 케이스들을 한 번에 모두 실행할 수도 있다.

2. 리팩토링 하기 쉬워진다.

리팩토링이란 결과값에 영향을 주지 않으면서 코드의 가독성과 유지보수성을 높이기 위해 내부 구조를 변경하는 작업이다. 그렇기 때문에 리팩토링을 하고나면 전과 동일한 결과를 출력하는지 테스트를 수행해야 하는데 테스트 코드를 통해 쉽게 테스트 할 수 있다.

 

그러므로 신뢰성 있는 애플리케이션을 개발하기 위해서는 애플리케이션을 자동으로 테스트하는 테스트 케이스를 작성하고 유지 보수하는 것이 매우 중요하다.

테스트 종류

테스트 방식으로는 크게 단위 테스트와, 통합 테스트로 구분할 수 있는데 메서드 단위로 테스트 하는 것을 단위 테스트(unit test), 애플리케이션의 기능이나 API 단위로 테스트하는 것을 통합 테스트(integration test)라고 한다.

좋은 단위 테스트를 작성하기 위한 5가지 원칙

좋은 단위 테스트를 작성하기 위해 지켜야할 5가지 원칙이 존재하는데 이를 F(Fast), I(Isolated), R(Repeatable), S(Self-validating), T(Timely)라고 하고 각각의 의미는 다음과 같다.

First

테스트 케이스는 빠르게(fast) 동작해야 한다. 실행 시간이 오래 걸리는 테스트 케이스는 성공 여부를 빠르게 확인할 수 없어 개발 시간에 영향을 미친다.

Isolated

테스트 케이스는 다른 외부 요인에 영향을 받지 않도록 격리(isolated)해야 한다. 즉, 테스트 케이스 사이에 서로 영향을 주는 테이트 케이스를 작성하면 안된다. 만약 다른 테스트 코드에 의존하거나 상호 동작한다면 신뢰할 만한 테스트 결과를 얻을 수 없다.

Repeatable

테스트 케이스는 반복(repeat)해서 실행하고, 실행할 때마다 같은 테스트 결과를 보장해야 한다. 만약 테스트 케이스를 실행할 때마다 다른 결과가 나온다면 테스트 과정 자체를 신뢰할 수 없다.

Self-validating

테스트 케이스 내부에는 결과 값을 자체 검증(self-validating)할 수 있는 코드가 필요하다. 즉, 테스트 결과 값을 개발자가 직접 기대하는 값과 예상 결과 값을 비교해야 한다면 테스트 과정을 자동화할 수 없다.

Timely

실제 코드를 개발하기 전 테스트 케이스를 먼저 작성하는 것을 의미한다. 이는 개발 단계부터 계속해서 테스트를 하면서 요구 사항에 적합한 코드를 만들 수 있는 장점이 있고 테스트 주도 개발 방법론(Test Driven Development: TDD)에 적합하다.

테스트 도구

Junit

자바 언어 환경에서 제공하는 테스트 라이브러리이다. 테스트 케이스를 작성하고 실행할 수 있는 기능들을 제공한다. 테스트 케이스를 정의할 수 있는 애너테이션과 실행한 테스트 결과 값을 예상 값과 비교 및 검증할 수 있는 클래스들을 제공한다.

 

Junit 사용 예제

public class MiscTest {

    @BeforeAll
    public static void setup() {
        System.out.println("before all tests in the current test class");
    }

    @BeforeEach
    public void inti() {
        System.out.println("before each @Test");
    }


    @Test
    public void testHashSetContainsNonDuplicatedValue() {

        // Given
        Integer value = 1;
        Set<Integer> set = new HashSet<>();

        // When
        set.add(value);
        set.add(value);
        set.add(value);

        // Then
        Assertions.assertEquals(1, set.size());
        Assertions.assertTrue(set.contains(value));
    }

    @Test
    public void testDummy() {
        Assertions.assertTrue(Boolean.TRUE);
    }

    @AfterEach
    public void cleanup() {
        System.out.println("after each @Test");
    }

    @AfterAll
    public static void destroy() {
        System.out.println("after all tests in the current test class");
    }

}

 

@BeforeAll

테스트 클래스 인스턴스를 초기화할 때 가장 먼저 실행된다. 테스트 클래스에 포함된 테스트 메서드가 여러 개 있어도 한번만 실행되며 객체를 생성하기 전에 미리 실행해야 하므로 static 키워드를 사용해서 정의해야 한다.

 

@BeforeEach

모든 테스트 메서드가 실행되기 전 각각 한 번씩 실행된다. 즉, testHashSetContainsNonDuplicatedValue() 메서드와 testDummy() 메서드에서 각각2번 실행된다.

 

 

@AfterEach

모든 테스트 메서드가 실행된 후 각각 한 번씩 실행된다.

 

@AfterAll

테스트 클래스의 모든 테스트 메서드가 실행을 마치면 마지막에 한 번만 실행된다. @AfterAll도 @BeforeAll과 마찬가지로 static 키워드를 사용해서 정의해야 한다.

 

@Test

@Test 어노테이션이 부여된 메서드는 테스트 대상으로 지정된다.

 

테스트 결과 검증 메서드

Juit은 테스트 결과를 예상한 값과 자동으로 비교하여 검증할 수 있게 Assertions라는 스테틱 검증 메서드를 제공한다.

  • assertNull(Object actual) : 실제 값이 Null인지 검증한다.
  • assertNotNull(Object actual) : 실제 값이 Not null인지 검증한다.
  • assertTrue(boolean condition) : 조건이 참인지 검증한다.
  • assertFalse(boolean condition) : 조건이 거짓인지 검증한다.
  • assertEquals(Object expect, Object actual) : 예상 값과 실제 값이 같은지 비교한다. (동등성 비교)
  • assertNotEquals(Object expect, Object actual) : 예상 값과 실제 값이 다른지 비교한다. (동등성 비교)
  • assertSame(Object expect, Object actual) : 예상 값과 실제 값이 같은지 비교한다. (동일성 비교)
  • assertNotSame(Object expect, Object actual) : 예상 값과 실제 값이 다른지 비교한다. (동일성 비교)

Mockito

테스트에서 사용할 수 있는 목(mock) 프레임워크이다. 목 객체는 개발자가 입력 값에 따라 출력 값을 프로그래밍한 가짜 객체이다. Mockito 프레임워크는 테스트 대상 클래스가 의존하는 객체를 목 객체로 바꿀 수 있는 기능과 목 객체를 만들 수 있는 기능을 제공한다. 그래서 Junit과 함께 많이 사용되는 라이브러리이며 테스트 환경 설정부터 테스트 검증까지 테스트 전체 과정 모두를 처리할 수 있는 기능을 제공한다.

 

Mockito 사용 예제

@ExtendWith(MockitoExtension.class)
class ReviewServiceImplUnitTest {

    @Mock
    private ReviewRepository reviewRepository;

    @InjectMocks
    private ReviewServiceImpl reviewService;

    @Test
    @DisplayName("리뷰 상세 조회 성공")
    public void getDetailReview() {

        // Given
        Long reviewId = 1L;
        ReviewRequestDto requestDto = new ReviewRequestDto(
                "title",
                "content",
                "question",
                "answer"
        );

        Review review = Review.of(getMember(), getCompany(), requestDto);

        given(this.reviewRepository.findById(any()))
                .willReturn(Optional.ofNullable(review));

        // When
        ReviewResponseDto reviewResponseDto = reviewService.getDetailReview(reviewId);

        // Then
        Assertions.assertEquals("title", reviewResponseDto.getTitle());
        Assertions.assertEquals("content", reviewResponseDto.getContent());
        Assertions.assertEquals("question", reviewResponseDto.getQuestion());
        Assertions.assertEquals("answer", reviewResponseDto.getAnswer());
    }

    private Member getMember() {
        return Member.builder()
                .email("test@test.com")
                .password("1111")
                .name("kiyoom")
                .build();
    }

    private Company getCompany() {
        return Company.builder()
                .companyName("삼성")
                .companyUrl("www.samsung.com")
                .companyAddress("삼성시")
                .employeeCnt(500)
                .companyDesc("반도체")
                .build();
    }
}

 

테스트 성공

 

단위테스트는 하나의 모듈을 기준으로 독립적으로 진행되는 가장 작은 단위의 테스트로 스프링을 실행하지 않습니다. 

 

@ExtendWith(MockitoExtension.class)

이 테스트 클레스가 Mockito 프레임워크를 사용하겠다는 것을 의미합니다.

 

@Mock

가짜 객체를 선언하는 것을 의미합니다. 테스트 시 실제 객체 대신 @Mock 어노테이션이 선언된 가짜 객체가 주입되어 단위테스트가 실행됩니다.

 

@InjectMock

@Mock 어노테이션이 선언된 가짜 객체가 주입될 클래스를 의미합니다.

 

@MockBean

mock객체를 ApplicationContext 등록해준다. 그리고 ApplicationContext 객체를 주입받기 원하는 스프링 빈이 있다면 해당 객체를 주입해준다. 만약 ApplicationContext 객체와 같은 클래스 타입과 이름이 같은 스프링 빈이 있다면 해당 객체는 객체로 바뀐다

 

스텁(stub)

프로그래밍할 수 있으며 개발자가 원하는 결과를 응답하는 메서드를 의미한다.

 

BDDMockito.given()

스텁을 만드는 메서드로 given()의 인자에 스텁으로 만들 대상 메서드를 입력한다. 테스트하고자 하는 클래스의 메서드에서 스텁 메서드를 호출하면 프로그래밍된 결과가 응답한다.

 

ArgumentMatchers.any()

any()를 사용하면 어떤 인자 값을 사용하더라도 given()으로 만들어진 스텁이 동작한다.  

 

BDDMockito.willReturn()

given()으로 선언된 스텁이 호출되면 willReturn() 메서드의 인자로 입력된 값을 응답한다.

 

spring-boot-test

스프링 부트 프레임워크의 기능을 통합 테스트할 수 있는 기능을 제공한다.

@SpringBootTest
@Transactional
class ReviewServiceImplTest {

    @Autowired
    private ReviewService reviewService;

    @Test
    @DisplayName("라뷰 작성 성공")
    public void writeReviewSuccessTest() {

        // Given
        String email = "test@test.com";
        Long companyId = 1L;
        ReviewRequestDto requestDto = new ReviewRequestDto(
                "title",
                "content",
                "question",
                "answer"
        );

        // When
        ReviewResponseDto reviewResponseDto = reviewService.writeReview(requestDto, email, companyId);

        // Then
        Assertions.assertEquals("title", reviewResponseDto.getTitle());
        Assertions.assertEquals("content", reviewResponseDto.getContent());
        Assertions.assertEquals("question", reviewResponseDto.getQuestion());
        Assertions.assertEquals("answer", reviewResponseDto.getAnswer());
    }

}

 

테스트 성공

 

@Transactional

테스트 클래스에서 @Transactional을 선언하면 테스트가 종료될 때 데이터베이스를 초기화해준다. 테스트 클래스는 반복사용할 수 있어야 하기 때문에 데이터베이스를 초기화하여 다음 테스트에 영향을 주지 않도록 해야한다.

 

@SpringBootTest

SpringBootTest를 수행하기 위해서는 테스트 대상 클래스에 @SpringBootTest 어노테이션을 부여해주어야 한다. @SpringBootTest 어노테이션을 테스트 클래스에 선언하면 @SpringBootApplication 어노테이션이 적용된 클래스를 찾아 @SpringBootTest에 설정된 속성과 함꼐 애플리케이션에 선언된 스프링 빈들도 스캔하고 생성한다. 그렇기 때문에 SpringBootTest를 실행하면 아래와 같이 Spring이 실행된다.

 

 

Junit4를 사용한다면 아래와 같이 테스트 클래스에 @RunWith(SpringRunner.class) 설정을 같이 사용해야하고 Junit5를 사용한다면 @RunWith 설정을 생략해도 된다.

@SpringBootTest
@Transactional
@RunWith(SpringRunner.class)
class ReviewServiceImplTest {
    ....
}

 

@Autowired

SpringBootTest 테스트하고자 하는 클래스가 다른 스프링 빈에 의존하고 있기 때문에 ApplicationContext 테스트 대상 클래스가 의존하는 적절한 스프링 빈들을 생성하고 스프링 빈을 주입해야 테스트할 있다. 그렇기 때문에 반드시 생성자에 @Autowired 어노테이션을 정의해서 스프링 빈을 주입받아야 한다.

+ Recent posts