1.1 컨슈머 주요 옵션.
- 카프카의 컨슈머는 올드 컨슈머와 뉴 컨슈머 2개의 종류가 있다.
- 올드 컨슈머는 오프셋을 주키퍼의 지노드에 저장하는 방식을 지원.
- 뉴 컨슈머는 오프셋을 주키퍼가 아닌 카프카의 토픽에 저장.
- 아래의 옵션은 뉴 컨슈머 기준으로 작성됨.
- bootstrap.servers : 카프카 클러스터의 주소
- fetch.min.bytes (df: 1 bytes) : 한번에 가져올 수 있는 최소 사이즈로, 만약 가져오는 데이터가 지정한 사이즈보다 작으면 요청에 응답하지 않고, 데이터가 누적될 때 까지 기다린다.
- fetch.max.wait.ms(df: 500sec = 0.5sec) : fetch.min.bytes에 설정된 데이터보다 데이터 양이 적은 경우 요청에 응답을 기다리는 최대시간을 설정
- group.id: 컨슈머가 속한 컨슈머 그룹의 id
- enable.auto.commit (df: true) : 백그라운드로 주기적으로 오프셋을 커밋
- auto.offset.reset (df: latest) : 카프카에서 초기 오프셋이 없거나, 현재 오프셋이 더 이상 존재하지 않는경우 다음 옵션으로 리셋한다. – earlist: 가장 초기의 오프셋 값으로 설정 – latest: 가장 최근의 오프셋 값으로 설정 – none: 이전 오프셋값을 찾지 못하면 에러를 발생시킵니다.
- fetch.max.bytes (df : 52428800 bytes = 50mb) : 한번에 가져올 수 있는 최대 데이터 사이즈
- request.timeout.ms(df: 30000ms = 30sec) : 요청에 대한 응답을 기다리는 최대 시간으로, 지정한 시간만큼 요청에 대한 응답이 안오면 retry 한다.
- session.timeout.ms(df: 10000ms = 10sec):컨슈머와 브로커 사이의 세션 타임 아웃 시간으로, 브로커가 컨슈머가 살아있는 것으로 판단하는 시간. 컨슈머가 그룹 코디네이터에게 하트비트를 해당시간만큼 보내지 않으면, 해당 컨슈머는 장애가 생겼다고 판단하여 컨슈머 그룹은 리밸런스(rebalanace)를 시도한다.
- heartbeat.interval.ms(df: 3000ms = 3sec) : 그룹 코디네이터에게 얼마나 KafkaConsumer.poll() 메소드를 통하여 하트비트를 보낼 것인지 조정하며 일반적으로, session.timeout.ms의 1/3 정도로 설정한다.
- max.poll.records(df: 500) : 폴링루프에서 이뤄지는 한건의 KafkaConsumer.poll() 메소드에 대한 최대 레코드수를 조정한다.
- max.poll.interval.ms(df: 300000ms = 30sec) : 컨슈머가 하트비트를 보냄에도 불구하고, poll을 하지 않으면 장애이기 때문에 poll 주기를 설정하여 장애를 판단하는데 사용한다. 해당 옵션보다 poll 주기가 길었을 경우 컨슈머 그룹에서 제외한 후, 다른 컨슈머가 해당 파티션을 처리할 수 있도록 한다.
- auto.commit.interval.ms (df: 5000ms = 5sec) : 주기적으로 오프셋을 커밋하는 시간
1.2 콘솔 컨슈머로 메시지 가져오기.
- 컨슈머는 토픽에서 메시지를 가져오는 역할을 하는데 카프카 홈 디렉토리의 bin안에 kafka-console-consumer.sh로 콘솔에서 명령어를 실행할 수 있다.
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092 \
--topic khg-topic3 --from-beginning
- 컨슈머를 실행할 때는 항상 컨슈머 그룹이라는 것이 필요한 kafka-console-consumer.sh 명령어 실행시 추가 옵션으로 그룹을 지정하지 않으면 자동으로 console-consumer-XXXXX(숫자형태)로 생성된다.
# 컨슈머 그룹을 확인하는 명령어.
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092 --list
# 컨슈머 그룹을 지정하여 컨슈머를 생성하는 명령어
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092 \
--topic khg-topic3 --group khg-consumer-group --from-beginning
1.3 자바와 파이썬을 이용한 컨슈머
- 프로듀서와 마찬가지로 컨슈머도 자바 또는 파이썬으로 작성하여 생성할 수 있다.
# 컨슈머 생성하는 자바 코드
package khg.kafka;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092");
props.put("group.id", "khg-consumer");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 메시지를 가져올 토픽을 지정하고 리스트로 여러개도 지정가능.
consumer.subscribe(Arrays.asList("khg-topic"));
try {
while (true) {
//poll을 0으로 설정하면 poll이 즉시 리턴되어게 종료된것으로 간주하고 컨슈머에 할당된 파티션이 다른 컨슈머에게 넘어감.
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
} finally {
consumer.close();
}
}
}
1.4 파티션과 메시지 순서
- 카프카는 파티션이 여러개인 경우 라운드 로빈 형식으로 파티션에 데이터를 분산 저장하고, 컨슈머에서 해당 데이터를 불러올 때, 파티션 내의 순서는 보장하지만 파티션 간의 순서는 보장하지 않는다.
- 메시지의 순서를 보장하려면 파티션을 1개만 사용해야 하는데, 이런 경우 분산저장이 힘들고 컨슈머그룹 안의 하나의 컨슈머만 파티션과 연결 할 수 있기 때문에 처리량이 낮다.
1.5 컨슈머 그룹
- 카프카는 컨슈머 그룹이라는 개념을 통해, 하나의 토픽을 여러개의 컨슈머 그룹이 다른 방식으로 처리할 수 있다.
- 카프카는 컨슈머 그룹에 리밸런스라는 개념을 통하여, 컨슈머에 장애가 발생했을 시 다른컨슈머로 해당 작업을 자동으로 위임한다. 하지만 리밸런스 하는 동안에는 일시적으로 컨슈머가 메시지를 가져올 수 없는 상태가 된다.
- 토픽의 파티션에는 하나의 컨슈머만 연결할 수 있기 때문에 토픽의 파티션 이상의 컨슈머는 필요하지 않다.
- 컨슈머 컨슈머 그룹 안에서 멤버로 유지하고 할당된 파티션을 소유하는 방법은 주기적으로 하트비트를 보내는것이다.
- 하나의 컨슈머가 너무 많은 파티션을 담당하고 있는지 확인하는게 필요하다.
- 카프카가 다른 메시지 큐와 차별되는 차이점은 데이터를 가져가도 삭제가 되지않아 하나의 토픽에 대해 여러 용도로 사용할 수 있다는 장점이 있다.
1.6 커밋과 오프셋
- 오프셋이란? : 컨슈머가 자신이 가져간 메시지의 우치 정보.
- 컨슈머는 각각의 파티션에 대하여 자신이 가져간 메시지의 오프셋을 저장한다.
- 컨슈머가 다운 되거나 컨슈머 그룹에 새로운 컨슈머가 들어와 리밸런스가 일어나면 각각 컨슈머는 이전의 파티션이 아닌 다른 파티션을 새로 할당 받아 메시지 누락이나 중복이 발생할 수 있다.
- 커밋 : 컨슈머가 처리하고 있는 각 파티션 안에다가 자신의 오프셋을 업데이트 하는 동작
- 자동커밋 - enable.auto.commit 옵션등을 사용하여 설정 할 수 있으며, auto.commit.interval.ms 옵션을 통해 주기 조절 가능.
- 수동커밋 - DB에 저장하여 중복이 최대한 없도록 할 때 사용하는데, Consumer 프로젝트 내에서, consumer.commitSync() 라는 메소드를 통하여, 원하는 작업이 끝난 후 commit을 하도록 호출 할 수도 있다. 하지만 ,이 경우에도 DB에 저장은 되고, commit이 되기전 해당 서버가 죽게 된다면 해당 작업을 중복으로 처리 될 가능성은 있다.
- 특정 파티션 할당
- 특정 파티션에 할당해야 하는 상황은 아래와 같은 경우가 있을 수 있다.
- 키-값의 형태로 파티션에 저장되어 있고, 특정 파티션에 대한 메시지만 가져와야 할 때
- 컨슈머 프로세스가 가용성과 장애복구 능력이 높아서 브로커가 컨슈머의 실패를 감지, 리밸런스할 필요 없고 자동으로 컨슈머 프로세스가 다른 시스템에서 시작되는 경우(YARN, Mescos)
seek 메소드로 파티션의 특정 오프셋부터 메시지 가져오기를 할 수 있다.
TopicPartition p0 = new TopicPartition(topic, 0);
TopicPartition p1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(p0, p1));
consumer.assign(Arrays.asList(p0, p1));
consumer.seek(p0, offset1);
consumer.seek(p1, offset2);