1.1 콘솔 프로듀서로 메시지 보내기
- server.properties 파일에 auto.create.topics.enable = true 옵션을 설정하면 카프카에 존재하지 않는 토픽을 전송하면 자동으로 토픽 설정.
- 아래명령어로 토픽 이름 : khg, 파티션: 1, 리플리케이션 팩터:3인 토픽 생성
/opt/kafka/bin/kafka-topics.sh --zookeeper khg-zk001:2181,khg-zk002:2181,khg-zk003:2181/khg-kafka \
--topic khg-topic3 --partitions 1 --replication-factor 3 --create
- 토픽 상세정보 확인
/opt/kafka/bin/kafka-topics.sh --zookeeper khg-zk001:2181,khg-zk002:2181,khg-zk003:2181/khg-kafka \
--topic khg-topic3 --describe
[출력]
khg-topic3는 설정한대로 1개의 파티션과 3의 리플리케이션 팩터로 생성되엇고 0번 파티션의 정보는 리더가 3번에 위치. ISR은 3,1,2에 위치.
브로커별 토픽의 상세정보
- kafka-console-producer.sh로 테스트 목적등으로 토픽에 메시지를 보낼 수 있는 명령어 제공.
- 아래 명령어를 입력하면 메시지를 입력할수 있게 > 프롬프트가 나타나며 Ctrl + C 로 벗어날수 있다.
/opt/kafka/bin/kafka-console-producer.sh --broker-list khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092 \
--topic khg-topic3
- 아래 명령어로 > 프롬프트에서 입력한 메시지 확인 가능.
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092 \
--topic khg-topic3 --from-beginning
- 아래 명령어로 acks옵션이나 –compression-codec 옵션으로 압축과 관련된 옵션을 줄 수 있다.
/opt/kafka/bin/kafka-console-producer.sh --broker-list khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092 \
--topic khg-topic3 --compression-codec 'gzip' --request-required-acks 1
1.2 자바와 파이썬을 이용한 프로듀서
- gradle을 사용하여 kafka 프로듀서 빌드.
plugins {
id 'java'
}
group 'com.khg.kafka'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
jar {
manifest {
attributes 'Main-Class': 'com.khg.kafka.producer'
}
from {
configurations.compile.collect {
it.isDirectory() ? it : zipTree(it)
}
}
}
dependencies {
compile('org.apache.kafka:kafka-clients:2.1.0')
compile('org.apache.kafka:kafka-streams:2.1.0')
compile('org.slf4j:slf4j-simple:1.7.30') #SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" 에러로 추가.
}
test {
useJUnitPlatform()
}
- producer sample_code
package com.khg.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092");
#메시지의 키와 값에 문자열을 사용하기 때문에 내장된 StringSerializer를 지정.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
#메시지 전송
producer.send(new ProducerRecord<String, String>("khg-topic3", "Apache Kafka is a distributed streaming platform"));
producer.close();
}
}
1.2.1 메시지를 보내고 확인하지 않기
- try catch 문으로 메시지를 보내면 에러처리는 할 수 있지만 메시지가 성공적으로 전송되었는지는 확인 불가.
package com.khg.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<String, String>("khg-topic3", "Apache Kafka is a distributed streaming platform"));
}catch (Exception e){
e.printStackTrace();
}finally {
producer.close();
}
}
}
1.2.2 동기 전송
- send.get 메소드를 사용해 Future를 기다린 후 send가 성공했는지 실패했는지 확인해 신뢰성 있는 메시지를 전송 할 수 있다.
package com.khg.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("khg-topic3", "Apache Kafka is a distributed streaming platform")).get();
}catch (Exception e){
e.printStackTrace();
}finally {
producer.close();
}
}
}
1.2.3 비동기 전송
- 콜백 메소드 구현으로 메시지를 받을때 까지 기다리지 않고 브로커에서 응답을 받으면 콜백하는 구조로 구현.
- send로 메시지를 보낼때 구현한 콜백 오브젝트를 같이 보냄.
1.2.4 파이썬 클라이언트
- 파이썬 클라이언트의 경우 confluent-kafka-python과 kafka-python이 있는데 성능은 confluent-kafka-python가 빠르지만 librdkafka(아파치 카프카 프로토콜의 C라이브러리)를 반드시 설치해야한다.
1.3 프로듀서 활용 예제
- For loop를 추가한 producer 구현
package com.khg.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class producer_for {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 1; i < 11; i++)
producer.send(new ProducerRecord<String, String>("khg-topic3", "Apache Kafka is a distributed streaming platform"+i));
producer.close();
}
}
- 프로듀서에 key옵션을 줄 수 있는데 옵션을 주지 않으면 라운드로빈 방식으로 파티션 마다 균등하게 메세지를 보내며 key를 지정하면 특정 파티션으로만 메시지를 보낼 수 있다. 아래 명령어로 파티션이 2개인 토픽을 생성한다.
/opt/kafka/bin/kafka-topics.sh --zookeeper khg-zk001:2181,khg-zk002:2181,khg-zk003:2181/khg-kafka \
--topic khg-topic2 --partitions 2 --replication-factor 1 --create
- 아래 코드 처럼 홀수는 key=1로 지정하고 짝수는 key=2로 지정해서 보내면 key=2는 파티션 0번 key=1은 파티션 1번에 메시지가 저장된다.
package com.khg.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class producer_key {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "khg-kafka001:9092,khg-kafka002:9092,khg-kafka003:9092");
props.put("acks", "1");
props.put("compression.type", "gzip");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String testTopic = "khg-topic2";
String oddKey = "1";
String evenKey = "2";
for (int i = 1; i < 11; i++) {
if (i % 2 == 1) {
producer.send(new ProducerRecord<String, String>(testTopic, oddKey, String.format("%d - Apache Kafka is a distributed streaming platform - key=" + oddKey, i)));
} else {
producer.send(new ProducerRecord<String, String>(testTopic, evenKey, String.format("%d - Apache Kafka is a distributed streaming platform - key=" + evenKey, i)));
}
}
producer.close();
}
}
[0번 파티션]
[1번 파티션]
1.4 프로듀서 주요 옵션
- bootstrap.servers
처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보를 나타냅니다. 정의된 포맷은 호스트이름 :포트,호스트이름:포트,호스트이름:포트 이다. 호스트 하나만 입력해 사용할 수 있지만, 장애가 발생하는 경우 접속이 불가능하기에 클러스터에 있는 호스트를 모두 입력하는것을 권장한다.
acks 프로듀서가 카프카 토픽의 리더에게 메시지를 보낸 후 요청을 완료하기 전 ack(승인)의 수. 수가 크면 메시지 손실 가능성이 낮아지지만 속도가 줄어들고 수가 작으면 손실 가능성이 높아지지만 속도가 높아진다.
acks=0 프로듀서는 서버로 부터 어떠한 ack도 기다리지 않는다.
asks=1 리더는 데이터를 기록하지만, 모든 팔로워는 확인하지 않는다. 이 경우 데이터 손실이 발생가능하다.
acks=all 또는 -1 ISR의 팔로워로부터 데이터에 대한 acks를 기다린다.
buffer.memory 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 전체 메모리 바이트
compression.type 프로듀서가 데이터를 압축해서 보낼 수 있는데, 어떤 타입으로 압축할지를 정할 수 있다.none, gzip, snappy, lz4 같은 다양한 포맷의 선택이 가능하다.
retries 일시적인 오류로 인해 전송에 실패한 데이터를 다시 보내는 회수
batch.size 배치크기 바이트 단위를 조정
linger.ms 배치형태의 메시지를 보내기전에 추가적인 메시지들을 위해 기다리는 시간을 조정. 배치 사이즈에 도달하지 못한 상황에서linger.ms 제한시간에 도달시 메시지 전송
max.request.size 프로듀서가 보낼수 있는 최대 메시지 바이트. 기본값은 (1mb)
1.5 메시지 전송 방법
프로듀서 옵션 중 acks 옵션에 따라 메시지 손실 여부와 성능이 달라지게 됨.
1.5.1 메시지 손실 가능성이 높지만 빠른 전송이 필요한 경우
- acks=0로 설정하면 메세지에 대한 응답을 기다리지 않기 때문에 속도는 빠르지만 메시지가 유실 될 가능성이 높다.
1.5.2 메세지 손실 가능성이 적고 적당한 속도의 전송이 필요한 경우
- acks=1로 설정하면 프로듀서가 카프카로 메세지를 보낸뒤 카프카가 잘 받았는지 확인을 한다.
- 확인 과정에서 기다리는 시간이 추가되어 속도는 약간 떨어지게됨.
- acks=1로 설정해도 메시지가 유실되는 경우가 발생할 수 있음.
- 메시지가 유실되는 경우는 아래와 같은 경우. 메시지를 받은 리더가 장애가 발생하는 경우 메시지가 손실될 가능성이 있는데 리더가 메시지를 받고 팔로워들이 메세지를 동기화 하기전에 급작스럽게 다운이 되면 팔로워중 리더를 선출하게 되는데 이때 리더는 메세지를 받지 못한 팔로워기 때문에 이과정에서 메시지가 유실된다. 프로듀서 입장에서는 리더가 받은것 까지는 확인을 했기 때문에 다음 메세지를 보내게 된다.
[토픽을 받은 리더가 acks를 보낸후 팔로워가 저장하기전 리더 장애 발생]
[메세지를 받지 못한 팔로워가 리더로 선택]
1.5.3 전송 속도는 느리지만 메시지 손실이 없어야 하는 경우
- acks=all로 설정하면 프로듀서가 메시지를 전송하고 난 후 리더가 메시지를 받았는지 확인하고 추가적으로 팔로워까지 메시지를 받았는지 확인.
- 프로듀서 설정뿐만 아니라 브로커 설정인 min.insync.replica 설정도 해줘야한다 (server.properties에서 설정)
- min.insync.replica를 1로 설정한다면 acks=all로 설정하더라도 acks=1 처럼 동작한다. 카프카에서는 프로듀서만 acks=all로 메시지를 보낸다고 해서 손실 없는 메시지를 보장해주는 것이 아니기 때문에 옵션을 잘 이해하고 설정해야 한다.
- min.insync.replica를 2로 설정하면 acks를 보내기 전에 최소 2개의 리플리케이션을 유지하는지 확인한다. 결과적으로 리더와 팔로워중 하나에 메시지가 저장되고 난 후에 ack를 보낸다.
- 아파치 카프카 문서에는 손실 없는 메시지 전송을 위한 조건으로 프로듀서는 acks=all, 브로커의 min.insync.replica의 옵션은 2, 토픽의 리플리케이션 팩터는 3으로 권장하고 있다.
- min.insync.replica을 3으로 설정(브로커 갯수와 같게 설정하면)하게 되면 브로커 하나만 다운되더라도 메시지를 보낼 수 없는 클러스터 전체 장애와 비슷한 상황이 발생하게 된다. (acks=all인 경우)
- min.insync.replica을 2로 설정하고 리플리케이션 팩터를 3으로 설정해도 동시에 브로커가 2대 까지 다운되는 경우에는 프로듀서의 요청을 처리할 수 없는 상황이 발생.