λ³Έλ¬Έ λ°”λ‘œκ°€κΈ°
Kafka

Kafka Producer μ„€μ • κ°€μ΄λ“œ

by 주발2 2025. 7. 6.
λ°˜μ‘ν˜•

Kafkaλ₯Ό 처음 κ΅¬μΆ•ν•˜λ©΄μ„œ(MSK) λ‹¨μˆœνžˆ Produce/Consume 으둜 μ‚¬μš©ν•˜λŠ” 것보닀 ν”„λ‘œλ“€μ„œ, 컨슈머, 브둜컀, ν† ν”½ λ“± μ„€μ • 값듀에 λŒ€ν•΄ μ΄ν•΄ν•˜κ³  적절히 μ„€μ •ν•˜λŠ” 것이 κ½€λ‚˜ μ–΄λ €μ› λŠ”λ°μš”,

이와 κ΄€λ ¨ν•˜μ—¬ 각각 μ–΄λ–€ 섀정듀이 μ‘΄μž¬ν•˜λŠ”μ§€ μ‚΄νŽ΄λ³΄κ³ , 상황에 λ§žλŠ” 값듀을 잘 μ„€μ •ν•  수 μžˆλ„λ‘ μ •λ¦¬ν•˜κ³ μž ν•©λ‹ˆλ‹€.

 

μš°μ„  μΉ΄ν”„μΉ΄ ν”„λ‘œλ“€μ„œ(Producer) κ΄€λ ¨ μ„€μ •λΆ€ν„°!

 

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org


key.serializer

Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
  • key 데이터 직렬화λ₯Ό μœ„ν•œ Serializer 클래슀
  • Type: class
  • Default: 
  • Name: KEY_SERIALIZER_CLASS_CONFIG

String νƒ€μž…μ˜ 직렬화 클래슀(StringSerializer)을 일반적으둜 μ‚¬μš©ν•œλ‹€.

 

value.serializer

Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
  • value 데이터 직렬화λ₯Ό μœ„ν•œ Serializer 클래슀
  • Type: class
  • Default: 
  • Name: VALUE_SERIALIZER_CLASS_CONFIG

JSON νƒ€μž…μ˜ 직렬화 클래슀(JsonSerializer)λ₯Ό 일반적으둜 μ‚¬μš©ν•œλ‹€.

 

bootstrap.servers

A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. While the order of servers in the list does not matter, we recommend including more than one server to ensure resilience if any servers are down. This list does not need to contain the entire set of brokers, as Kafka clients automatically manage and update connections to the cluster efficiently. This list must be in the form host1:port1,host2:port2,...
  • Kafka ν΄λŸ¬μŠ€ν„°μ˜ 초기 연결을 μœ„ν•œ 브둜컀 μ„œλ²„μ˜ 호슀트/포트 리슀트
  • Type: list
  • Default: ""
  • Name: BOOTSTRAP_SERVERS_CONFIG

κ³ κ°€μš©μ„±μ„ μœ„ν•΄ 보톡 ν•œ λŒ€ μ΄μƒμ˜ μ„œλ²„(보톡 3λŒ€μ •λ„λ‘œ κ΅¬μΆ•ν•˜λŠ” λ“―)λ₯Ό ꢌμž₯ν•œλ‹€κ³  ν•œλ‹€.

 

compression.type

Enum KafkaProducerProperties.CompressionType

The compression type for all data generated by the producer. The default is none (i.e. no compression).
Valid values are none, gzip, snappy, lz4, or zstd. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
  • λ©”μ‹œμ§€μ— λŒ€ν•œ μ••μΆ• 방식
  • Type: String
  • Default: none (none, gzip, snappy, lz4, zstd)
  • Name: COMPRESSION_TYPE_CONFIG

λ©”μ‹œμ§€μ˜ 크기, μ²˜λ¦¬λŸ‰, λ„€νŠΈμ›Œν¬ λŒ€μ—­ν­, CPU μ‚¬μš©λŸ‰ 등을 κ³ λ €ν•˜μ—¬ μ μ ˆν•œ μ••μΆ• νƒ€μž…μ„ μ„€μ •ν•˜λ©΄ 될 λ“― ν•˜λ‹€.

(ν˜„μž¬ λ°œν–‰ν•˜λŠ” λ©”μ‹œμ§€κ°€ κ°„λ‹¨ν•˜κ³ , μ²˜λ¦¬λŸ‰λ„ λ§Žμ§€ μ•Šμ„ 상황이기에 λ”°λ‘œ μ••μΆ• 섀정을 ν•˜μ§€λŠ” μ•Šμ•˜λ‹€.)

https://developer.ibm.com/articles/benefits-compression-kafka-messaging/

 

retries

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error.

Produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.

Enabling idempotence requires this config value to be greater than 0. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.

Allowing retries while setting enable.idempotence to false and max.in.flight.requests.per.connection to greater than 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.
  • ν”„λ‘œλ“€μ„œκ°€ λ©”μ‹œμ§€ λ°œν–‰ μ‹€νŒ¨ μ‹œ μž¬μ‹œλ„ 횟수 μ„€μ •
  • Type: int
  • Default: 2_147_483_647 (Integer.MAX_VALUE)
  • Name: RETRIES_CONFIG
  • 같이 봐야할 μ„€μ •: delivery.timeout.ms, enable.idempotence, max.in.flight.requests.per.connection

 

μœ„ μ„€λͺ… 쀑 μ€‘μš”ν•˜κ²Œ 봐야 ν•  λ¬Έμž₯이 μžˆλŠ”λ°,

Produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement.

 

즉 μž¬μ‹œλ„ νšŸμˆ˜κ°€ μ†Œμ§„λ˜κΈ° μ „ delivery.timeout.ms에 μ„€μ •λœ νƒ€μž„μ•„μ›ƒμ— λ¨Όμ € λ„λ‹¬ν•˜κ²Œ 되면 μš”μ²­μ΄ μ‹€νŒ¨ν•˜κ²Œ λœλ‹€.

(retry, timeout 쀑 ν•˜λ‚˜λΌλ„ λ¨Όμ € λ„λ‹¬ν•˜κ²Œ 될 경우 λ©”μ‹œμ§€ μ‹€νŒ¨ 처리)

 

λ”°λΌμ„œ μ‹€μ œ μž¬μ‹œλ„ 횟수(Integer.MAX_VALUE)만큼 λ°œν–‰μ— λŒ€ν•΄ μž¬μ‹œλ„λ₯Ό ν•˜λŠ”κ²ƒμ€ μ•„λ‹ˆλ‹€.

timeout 섀정이 μ—†λ‹€λ©΄ μž¬μ‹œλ„ 횟수λ₯Ό λ°˜λ“œμ‹œ 섀정을 ν•΄μ£Όμ–΄μ•Ό ν•  것 같은데, timeout 섀정이 μ‘΄μž¬ν•˜κΈ° λ•Œλ¬Έμ— retry에 λŒ€ν•œ μ»€μŠ€ν…€ 섀정은 λ”°λ‘œ ν•΄μ£Όμ§€ μ•Šμ•„λ„ 큰 λ¬Έμ œκ°€ μ—†μ§€ μ•Šμ„κΉŒ μ‹Άλ‹€.

(delivery.timeout.ms λŠ” μ•„λž˜μ—μ„œ μ‚΄νŽ΄λ³΄κ² μ§€λ§Œ 기본이 120000, 2λΆ„)

 

μΆ”κ°€μ μœΌλ‘œ μ•„λž˜μ—μ„œ μ‚΄νŽ΄λ³Ό idempotence(λ©±λ“±μ„±) μ„€μ •μœΌλ‘œ 인해 쀑볡 λ°œν–‰ 없이 μ•ˆμ „ν•˜κ²Œ μž¬μ‹œλ„ μš”μ²­λ„ κ°€λŠ₯ν•˜λ‹€.

 

delivery.timeout.ms

An upper bound on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgement from the broker (if expected), and the time allowed for retriable send failures.

The producer may report failure to send a record earlier than this config if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch which reached an earlier delivery expiration deadline.

The value of this config should be greater than or equal to the sum of request.timeout.ms and linger.ms
  • ν”„λ‘œλ“€μ„œκ°€ λ©”μ‹œμ§€λ₯Ό λΈŒλ‘œμ»€μ— μ„±κ³΅μ μœΌλ‘œ λ°œν–‰ν•˜κΈ°κΉŒμ§€ κΈ°λ‹€λ¦¬λŠ” μ΅œλŒ€ μ‹œκ°„
  • 즉, λ©”μ‹œμ§€ λ°œν–‰μ˜ 성곡/μ‹€νŒ¨ 응닡이 μ˜¬λ•ŒκΉŒμ§€ μ–Όλ§ˆλ‚˜ 기닀릴것인지? λ₯Ό μ˜λ―Έν•˜λŠ” κ°’
  • Type: int
  • Default: 120000 (2 minutes)
  • Name: DELIVERY_TIMEOUT_MS_CONFIG

λ””ν΄νŠΈ 값인 2뢄도 μΆ©λΆ„νžˆ λ„‰λ„‰ν•œ μ‹œκ°„μ΄ μ•„λ‹κΉŒ μ‹Άλ‹€. λ‹¨μˆœνžˆ λ©”μ‹œμ§€ λ°œν–‰ 자체둜 2λΆ„λ™μ•ˆ μ‹€νŒ¨ν•˜λŠ” 경우면 λ©”μ‹œμ§€ λ°œν–‰μ— μ‹€νŒ¨ν•œλ‹€κΈ° λ³΄λ‹€λŠ”, μΉ΄ν”„μΉ΄ ν΄λŸ¬μŠ€ν„° 자체의 λ¬Έμ œμ΄μ§€ μ•Šμ„κΉŒ? μ‹Άλ‹€.

 

enable.idempotence

KIP-679: Producer will enable the strongest delivery guarantee by default

When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream.

Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be 'all'.

Idempotence is enabled by default if no conflicting configurations are set. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting configurations are set, a ConfigException is thrown.
  • λ©”μ‹œμ§€ λ°œν–‰ μ‹œ μ •ν™•νžˆ ν•˜λ‚˜μ˜ λ©”μ‹œμ§€(exactly one)만 λ°œμ†‘ν•˜λ„λ‘ 보μž₯ν•˜λŠ” μ„€μ •
  • max.in.flight.requests.pr.connect 이 5보닀 μž‘μ•„μ•Ό ν•˜κ³ , retriesλŠ” 0보닀 μ»€μ•Όν•˜κ³ , acksλŠ” 'all'이어야 함
  • Type: boolean
  • Default: true
  • Name: ENABLE_IDEMPOTENCE_CONFIG

defaultκ°€ true(λ©”μ‹œμ§€μ˜ λ©±λ“±μ„± 보μž₯)이기 λ•Œλ¬Έμ—, μ•„λž˜ μ‚΄νŽ΄λ³Ό max.in.flight.requests.pr.connect μ„€μ •μ΄λ‚˜ retries 값도 λ”°λ‘œ μ„€μ •ν•˜μ§€ μ•Šμ•˜λ‹€λ©΄ 크게 μ»€μŠ€ν…€μ΄ ν•„μš”μ—†λŠ” μ„€μ •

 

Kafkaμ—μ„œ enable.idempotenceλ₯Ό ν™œμ„±ν™”ν•˜λ©΄ Producer ID + SequenceNumber둜 쀑볡 λ©”μ‹œμ§€λ₯Ό ν•„ν„°λ§ν•œλ‹€.

 

acks

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
  - acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.
  - acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
  - acks=all (Default) This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

Note that enabling idempotence requires this config value to be 'all'. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.
  • ν”„λ‘œλ“€μ„œκ°€ μš”μ²­ μ™„λ£Œλ₯Ό κ²°μ •ν•˜κΈ° μ „ λ¦¬λ”λ‘œλΆ€ν„° λ°›μ•„μ•Όν•˜λŠ” 응닡 횟수둜, μ „μ†‘λ˜λŠ” λ ˆμ½”λ“œμ˜ 내ꡬ성을 μ œμ–΄ν•œλ‹€.
    • acks=0 : ν”„λ‘œλ“€μ„œλŠ” μ„œλ²„λ‘œλΆ€ν„° μ–΄λ–€ 확인도 기닀리지 μ•Šκ³  μ „μ†‘λœκ²ƒμœΌλ‘œ κ°„μ£Όν•˜κΈ°μ— μ„œλ²„κ°€ λ ˆμ½”λ“œλ₯Ό λ°›μ•˜λ‹€λŠ” 보μž₯을 ν•  수 μ—†κ³ , ν΄λΌμ΄μ–ΈνŠΈλŠ” μ‹€νŒ¨λ₯Ό μ•Œ 수 μ—†κΈ° λ•Œλ¬Έμ— μž¬μ‹œλ„κ°€ μ μš©λ˜μ§€ μ•ŠμŒ
    • acks=1 : λ¦¬λ”μ˜ 응닡을 λ°›μ§€λ§Œ λ‹€λ₯Έ νŒ”λ‘œμ›Œλ“€μ˜ μŠΉμΈμ„ 기닀리지 μ•Šκ³  응닡 처리λ₯Ό ν•˜λŠ” μ„€μ •
    • acks=all : (κΈ°λ³Έκ°’) 리더가 전체 νŒ”λ‘œμ›Œλ“€μ˜ μŠΉμΈκΉŒμ§€ κΈ°λ‹€λ¦¬λŠ” μ„€μ •, μ΅œμ†Œ ν•˜λ‚˜μ˜ in-sync 볡제본이 μ‚΄μ•„μžˆλŠ” ν•œ λ ˆμ½”λ“œκ°€ μ†μ‹€λ˜μ§€ μ•ŠλŠ” 것을 보μž₯함 (acks = -1 κ³Ό 동일)
  • Type: String
  • Default: all [0, 1, all]
  • Name: ACKS_CONFIG

 

max.in.flight.requests.per.connection

The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this configuration is set to be greater than 1 and enable.idempotence is set to false, there is a risk of message reordering after a failed send due to retries (i.e., if retries are enabled); if retries are disabled or if 
enable.idempotence is set to true, ordering will be preserved.

Additionally, enabling idempotence requires the value of this configuration to be less than or equal to 5, because broker only retains at most 5 batches for each producer. If the value is more than 5, previous batches may be removed on broker side.
  • ν”„λ‘œλ“€μ„œκ°€ ν΄λŸ¬μŠ€ν„°λ‘œλΆ€ν„° 응닡을 λ°›μ§€ λͺ»ν•œ μƒν™©μ—μ„œ connectionλ‹Ή μš”μ²­μ„ 보낼 수 μžˆλŠ” μ΅œλŒ€ λ ˆμ½”λ“œ 갯수
  • Type: int
  • Default: 5
  • Name: MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION

 

if this configuration is set to be greater than 1 and enable.idempotence is set to false, there is a risk of message reordering after a failed send due to retries

μ„€μ • 값이 1보닀 크고, enable.idempotenceκ°€ false라면 μž¬μ‹œλ„λ‘œ 인해 μ‹€νŒ¨κ±΄λ“€μ— λŒ€ν•œ λ°œμ†‘ ν›„ μˆœμ„œκ°€ 변경될 μœ„ν—˜μ΄ μžˆλ‹€.

ν•˜μ§€λ§Œ idempotence 섀정을 true둜 ν•˜κ±°λ‚˜ retriesλ₯Ό disabledν•˜λ©΄ μœ„μ™€ 같은 λ¬Έμ œκ°€ λ°œμƒν•˜μ§€ μ•ŠλŠ”λ‹€κ³  ν•œλ‹€.

 

enable.idempotence = false 둜 μΈν•œ λ©”μ‹œμ§€ μˆœμ„œκ°€ λ°”λ€” 수 μžˆλŠ” 이슈

 

idempotence 섀정을 true둜 ν•˜κ²Œλ˜λ©΄ λΈŒλ‘œμ»€κ°€ μ΅œλŒ€ 5개의 producer만 μœ μ§€ν•˜κΈ° λ•Œλ¬Έμ— max.in.flight.requests.per.connection 값은 5보닀 μž‘μ•„μ•Όν•œλ‹€κ³  ν•œλ‹€.

 

μΆ”κ°€μ μœΌλ‘œ μ‚΄νŽ΄λ³΄λ©΄ 쒋을 μ„€μ •λ“€

Amazon MSKλ₯Ό μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ—μ„œ μ‚¬μš©ν•˜κ³  μ ‘κ·Όν•˜κΈ° μœ„ν•΄μ„œλŠ” IAM, SASL/SCRAM, TLS λ“±μ˜ 인증 방식듀이 μ‘΄μž¬ν•˜λŠ”λ° 인증 방식에 따라 브둜컀의 포트 λ²ˆν˜Έλ„ 달라지고, 섀정도 λ‹¬λΌμ§ˆ 수 μžˆλ‹€.

https://aws.amazon.com/ko/blogs/tech/amazon-msk-topic-iam-access-control/

 

참고둜 IAM을 ν†΅ν•œ 인증 μ‹œ ν¬νŠΈλ²ˆν˜ΈλŠ” 9098이고, λŒ€λž΅μ μœΌλ‘œ μ•„λž˜μ™€ 같은 섀정듀이 ν•„μš”ν•˜λ‹€.

@Bean
fun producerFactory(): ProducerFactory<String, Any> {
    val configProps =
        mutableMapOf<String, Any>().apply {
            this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
            
            ...

            // MSK IAM 인증 μ„€μ •
            this[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
            this[SaslConfigs.SASL_MECHANISM] = "AWS_MSK_IAM"
            this[SaslConfigs.SASL_JAAS_CONFIG] = "software.amazon.msk.auth.iam.IAMLoginModule required;"
            this[SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS] = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        }

    return DefaultKafkaProducerFactory(configProps)
}

 

 

Producer μ„€μ • μ˜ˆμ‹œ μ½”λ“œ

import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.support.serializer.JsonSerializer

@Configuration
@EnableKafka
class KafkaProducerConfig(
    @Value("\${spring.kafka.bootstrap-servers}") private val bootstrapServers: List<String>,
) {

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, Any> = KafkaTemplate(producerFactory())

    /**
     * https://kafka.apache.org/documentation.html#producerconfigs
     */
    @Bean
    fun producerFactory(): ProducerFactory<String, Any> {
        val configProps =
            mutableMapOf<String, Any>().apply {
                this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
                this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
                this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
                
                // μ•„λž˜ 섀정은 λͺ¨λ‘ default κ°’λ“€κ³Ό 동일함
                this[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = true
                this[ProducerConfig.ACKS_CONFIG] = "all"
                this[ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG] = 120000 // 2 minutes
                this[ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION] = 5

                // MSK IAM 인증 μ„€μ •
                this[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
                this[SaslConfigs.SASL_MECHANISM] = "AWS_MSK_IAM"
                this[SaslConfigs.SASL_JAAS_CONFIG] = "software.amazon.msk.auth.iam.IAMLoginModule required;"
                this[SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS] = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
            }

        return DefaultKafkaProducerFactory(configProps)
    }
}

 

λ°˜μ‘ν˜•

'Kafka' μΉ΄ν…Œκ³ λ¦¬μ˜ λ‹€λ₯Έ κΈ€

Kafka Consumer μ„€μ • κ°€μ΄λ“œ  (3) 2025.07.09

λŒ“κΈ€