Kafkaλ₯Ό μ²μ ꡬμΆνλ©΄μ(MSK) λ¨μν Produce/Consume μΌλ‘ μ¬μ©νλ κ²λ³΄λ€ νλ‘λμ, 컨μλ¨Έ, λΈλ‘컀, ν ν½ λ± μ€μ κ°λ€μ λν΄ μ΄ν΄νκ³ μ μ ν μ€μ νλ κ²μ΄ κ½€λ μ΄λ €μ λλ°μ,
μ΄μ κ΄λ ¨νμ¬ κ°κ° μ΄λ€ μ€μ λ€μ΄ μ‘΄μ¬νλμ§ μ΄ν΄λ³΄κ³ , μν©μ λ§λ κ°λ€μ μ μ€μ ν μ μλλ‘ μ 리νκ³ μ ν©λλ€.
μ°μ μΉ΄νμΉ΄ νλ‘λμ(Producer) κ΄λ ¨ μ€μ λΆν°!
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
key.serializer
Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
- key λ°μ΄ν° μ§λ ¬νλ₯Ό μν Serializer ν΄λμ€
- Type: class
- Default:
- Name: KEY_SERIALIZER_CLASS_CONFIG
String νμ μ μ§λ ¬ν ν΄λμ€(StringSerializer)μ μΌλ°μ μΌλ‘ μ¬μ©νλ€.
value.serializer
Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
- value λ°μ΄ν° μ§λ ¬νλ₯Ό μν Serializer ν΄λμ€
- Type: class
- Default:
- Name: VALUE_SERIALIZER_CLASS_CONFIG
JSON νμ μ μ§λ ¬ν ν΄λμ€(JsonSerializer)λ₯Ό μΌλ°μ μΌλ‘ μ¬μ©νλ€.
bootstrap.servers
A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. While the order of servers in the list does not matter, we recommend including more than one server to ensure resilience if any servers are down. This list does not need to contain the entire set of brokers, as Kafka clients automatically manage and update connections to the cluster efficiently. This list must be in the form host1:port1,host2:port2,...
- Kafka ν΄λ¬μ€ν°μ μ΄κΈ° μ°κ²°μ μν λΈλ‘컀 μλ²μ νΈμ€νΈ/ν¬νΈ 리μ€νΈ
- Type: list
- Default: ""
- Name: BOOTSTRAP_SERVERS_CONFIG
κ³ κ°μ©μ±μ μν΄ λ³΄ν΅ ν λ μ΄μμ μλ²(λ³΄ν΅ 3λμ λλ‘ κ΅¬μΆνλ λ―)λ₯Ό κΆμ₯νλ€κ³ νλ€.
compression.type
Enum KafkaProducerProperties.CompressionType
The compression type for all data generated by the producer. The default is none (i.e. no compression).
Valid values are none, gzip, snappy, lz4, or zstd. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
- λ©μμ§μ λν μμΆ λ°©μ
- Type: String
- Default: none (none, gzip, snappy, lz4, zstd)
- Name: COMPRESSION_TYPE_CONFIG
λ©μμ§μ ν¬κΈ°, μ²λ¦¬λ, λ€νΈμν¬ λμν, CPU μ¬μ©λ λ±μ κ³ λ €νμ¬ μ μ ν μμΆ νμ μ μ€μ νλ©΄ λ λ― νλ€.
(νμ¬ λ°ννλ λ©μμ§κ° κ°λ¨νκ³ , μ²λ¦¬λλ λ§μ§ μμ μν©μ΄κΈ°μ λ°λ‘ μμΆ μ€μ μ νμ§λ μμλ€.)

retries
Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error.
Produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.
Enabling idempotence requires this config value to be greater than 0. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.
Allowing retries while setting enable.idempotence to false and max.in.flight.requests.per.connection to greater than 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.
- νλ‘λμκ° λ©μμ§ λ°ν μ€ν¨ μ μ¬μλ νμ μ€μ
- Type: int
- Default: 2_147_483_647 (Integer.MAX_VALUE)
- Name: RETRIES_CONFIG
- κ°μ΄ λ΄μΌν μ€μ : delivery.timeout.ms, enable.idempotence, max.in.flight.requests.per.connection
μ μ€λͺ μ€ μ€μνκ² λ΄μΌ ν λ¬Έμ₯μ΄ μλλ°,
Produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement.
μ¦ μ¬μλ νμκ° μμ§λκΈ° μ delivery.timeout.msμ μ€μ λ νμμμμ λ¨Όμ λλ¬νκ² λλ©΄ μμ²μ΄ μ€ν¨νκ² λλ€.
(retry, timeout μ€ νλλΌλ λ¨Όμ λλ¬νκ² λ κ²½μ° λ©μμ§ μ€ν¨ μ²λ¦¬)
λ°λΌμ μ€μ μ¬μλ νμ(Integer.MAX_VALUE)λ§νΌ λ°νμ λν΄ μ¬μλλ₯Ό νλκ²μ μλλ€.
timeout μ€μ μ΄ μλ€λ©΄ μ¬μλ νμλ₯Ό λ°λμ μ€μ μ ν΄μ£Όμ΄μΌ ν κ² κ°μλ°, timeout μ€μ μ΄ μ‘΄μ¬νκΈ° λλ¬Έμ retryμ λν 컀μ€ν μ€μ μ λ°λ‘ ν΄μ£Όμ§ μμλ ν° λ¬Έμ κ° μμ§ μμκΉ μΆλ€.
(delivery.timeout.ms λ μλμμ μ΄ν΄λ³΄κ² μ§λ§ κΈ°λ³Έμ΄ 120000, 2λΆ)
μΆκ°μ μΌλ‘ μλμμ μ΄ν΄λ³Ό idempotence(λ©±λ±μ±) μ€μ μΌλ‘ μΈν΄ μ€λ³΅ λ°ν μμ΄ μμ νκ² μ¬μλ μμ²λ κ°λ₯νλ€.
delivery.timeout.ms
An upper bound on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgement from the broker (if expected), and the time allowed for retriable send failures.
The producer may report failure to send a record earlier than this config if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch which reached an earlier delivery expiration deadline.
The value of this config should be greater than or equal to the sum of request.timeout.ms and linger.ms
- νλ‘λμκ° λ©μμ§λ₯Ό λΈλ‘컀μ μ±κ³΅μ μΌλ‘ λ°ννκΈ°κΉμ§ κΈ°λ€λ¦¬λ μ΅λ μκ°
- μ¦, λ©μμ§ λ°νμ μ±κ³΅/μ€ν¨ μλ΅μ΄ μ¬λκΉμ§ μΌλ§λ κΈ°λ€λ¦΄κ²μΈμ§? λ₯Ό μλ―Ένλ κ°
- Type: int
- Default: 120000 (2 minutes)
- Name: DELIVERY_TIMEOUT_MS_CONFIG
λν΄νΈ κ°μΈ 2λΆλ μΆ©λΆν λλν μκ°μ΄ μλκΉ μΆλ€. λ¨μν λ©μμ§ λ°ν μμ²΄λ‘ 2λΆλμ μ€ν¨νλ κ²½μ°λ©΄ λ©μμ§ λ°νμ μ€ν¨νλ€κΈ° 보λ€λ, μΉ΄νμΉ΄ ν΄λ¬μ€ν° μ체μ λ¬Έμ μ΄μ§ μμκΉ? μΆλ€.
enable.idempotence
KIP-679: Producer will enable the strongest delivery guarantee by default
When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream.
Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be 'all'.
Idempotence is enabled by default if no conflicting configurations are set. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting configurations are set, a ConfigException is thrown.
- λ©μμ§ λ°ν μ μ νν νλμ λ©μμ§(exactly one)λ§ λ°μ‘νλλ‘ λ³΄μ₯νλ μ€μ
- max.in.flight.requests.pr.connect μ΄ 5λ³΄λ€ μμμΌ νκ³ , retriesλ 0λ³΄λ€ μ»€μΌνκ³ , acksλ 'all'μ΄μ΄μΌ ν¨
- Type: boolean
- Default: true
- Name: ENABLE_IDEMPOTENCE_CONFIG
defaultκ° true(λ©μμ§μ λ©±λ±μ± 보μ₯)μ΄κΈ° λλ¬Έμ, μλ μ΄ν΄λ³Ό max.in.flight.requests.pr.connect μ€μ μ΄λ retries κ°λ λ°λ‘ μ€μ νμ§ μμλ€λ©΄ ν¬κ² 컀μ€ν μ΄ νμμλ μ€μ
Kafkaμμ enable.idempotenceλ₯Ό νμ±ννλ©΄ Producer ID + SequenceNumberλ‘ μ€λ³΅ λ©μμ§λ₯Ό νν°λ§νλ€.
acks
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
- acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.
- acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
- acks=all (Default) This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.
Note that enabling idempotence requires this config value to be 'all'. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.
- νλ‘λμκ° μμ² μλ£λ₯Ό κ²°μ νκΈ° μ 리λλ‘λΆν° λ°μμΌνλ μλ΅ νμλ‘, μ μ‘λλ λ μ½λμ λ΄κ΅¬μ±μ μ μ΄νλ€.
- acks=0 : νλ‘λμλ μλ²λ‘λΆν° μ΄λ€ νμΈλ κΈ°λ€λ¦¬μ§ μκ³ μ μ‘λκ²μΌλ‘ κ°μ£ΌνκΈ°μ μλ²κ° λ μ½λλ₯Ό λ°μλ€λ 보μ₯μ ν μ μκ³ , ν΄λΌμ΄μΈνΈλ μ€ν¨λ₯Ό μ μ μκΈ° λλ¬Έμ μ¬μλκ° μ μ©λμ§ μμ
- acks=1 : 리λμ μλ΅μ λ°μ§λ§ λ€λ₯Έ νλ‘μλ€μ μΉμΈμ κΈ°λ€λ¦¬μ§ μκ³ μλ΅ μ²λ¦¬λ₯Ό νλ μ€μ
- acks=all : (κΈ°λ³Έκ°) 리λκ° μ 체 νλ‘μλ€μ μΉμΈκΉμ§ κΈ°λ€λ¦¬λ μ€μ , μ΅μ νλμ in-sync 볡μ λ³Έμ΄ μ΄μμλ ν λ μ½λκ° μμ€λμ§ μλ κ²μ 보μ₯ν¨ (acks = -1 κ³Ό λμΌ)
- Type: String
- Default: all [0, 1, all]
- Name: ACKS_CONFIG
max.in.flight.requests.per.connection
The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this configuration is set to be greater than 1 and enable.idempotence is set to false, there is a risk of message reordering after a failed send due to retries (i.e., if retries are enabled); if retries are disabled or if
enable.idempotence is set to true, ordering will be preserved.
Additionally, enabling idempotence requires the value of this configuration to be less than or equal to 5, because broker only retains at most 5 batches for each producer. If the value is more than 5, previous batches may be removed on broker side.
- νλ‘λμκ° ν΄λ¬μ€ν°λ‘λΆν° μλ΅μ λ°μ§ λͺ»ν μν©μμ connectionλΉ μμ²μ λ³΄λΌ μ μλ μ΅λ λ μ½λ κ°―μ
- Type: int
- Default: 5
- Name: MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
if this configuration is set to be greater than 1 and enable.idempotence is set to false, there is a risk of message reordering after a failed send due to retries
μ€μ κ°μ΄ 1λ³΄λ€ ν¬κ³ , enable.idempotenceκ° falseλΌλ©΄ μ¬μλλ‘ μΈν΄ μ€ν¨κ±΄λ€μ λν λ°μ‘ ν μμκ° λ³κ²½λ μνμ΄ μλ€.
νμ§λ§ idempotence μ€μ μ trueλ‘ νκ±°λ retriesλ₯Ό disabledνλ©΄ μμ κ°μ λ¬Έμ κ° λ°μνμ§ μλλ€κ³ νλ€.

idempotence μ€μ μ trueλ‘ νκ²λλ©΄ λΈλ‘μ»€κ° μ΅λ 5κ°μ producerλ§ μ μ§νκΈ° λλ¬Έμ max.in.flight.requests.per.connection κ°μ 5λ³΄λ€ μμμΌνλ€κ³ νλ€.
μΆκ°μ μΌλ‘ μ΄ν΄λ³΄λ©΄ μ’μ μ€μ λ€
- linger.ms, batch.size (λ°°μΉμ²λ¦¬ κ΄λ ¨)
- mterics
- μΈμ¦ (IAM, SASL/SCRAM, SSL λ±)
Amazon MSKλ₯Ό μ ν리μΌμ΄μ μμ μ¬μ©νκ³ μ κ·ΌνκΈ° μν΄μλ IAM, SASL/SCRAM, TLS λ±μ μΈμ¦ λ°©μλ€μ΄ μ‘΄μ¬νλλ° μΈμ¦ λ°©μμ λ°λΌ λΈλ‘컀μ ν¬νΈ λ²νΈλ λ¬λΌμ§κ³ , μ€μ λ λ¬λΌμ§ μ μλ€.

μ°Έκ³ λ‘ IAMμ ν΅ν μΈμ¦ μ ν¬νΈλ²νΈλ 9098μ΄κ³ , λλ΅μ μΌλ‘ μλμ κ°μ μ€μ λ€μ΄ νμνλ€.
@Bean
fun producerFactory(): ProducerFactory<String, Any> {
val configProps =
mutableMapOf<String, Any>().apply {
this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
...
// MSK IAM μΈμ¦ μ€μ
this[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
this[SaslConfigs.SASL_MECHANISM] = "AWS_MSK_IAM"
this[SaslConfigs.SASL_JAAS_CONFIG] = "software.amazon.msk.auth.iam.IAMLoginModule required;"
this[SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS] = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
return DefaultKafkaProducerFactory(configProps)
}
Producer μ€μ μμ μ½λ
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.support.serializer.JsonSerializer
@Configuration
@EnableKafka
class KafkaProducerConfig(
@Value("\${spring.kafka.bootstrap-servers}") private val bootstrapServers: List<String>,
) {
@Bean
fun kafkaTemplate(): KafkaTemplate<String, Any> = KafkaTemplate(producerFactory())
/**
* https://kafka.apache.org/documentation.html#producerconfigs
*/
@Bean
fun producerFactory(): ProducerFactory<String, Any> {
val configProps =
mutableMapOf<String, Any>().apply {
this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
// μλ μ€μ μ λͺ¨λ default κ°λ€κ³Ό λμΌν¨
this[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = true
this[ProducerConfig.ACKS_CONFIG] = "all"
this[ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG] = 120000 // 2 minutes
this[ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION] = 5
// MSK IAM μΈμ¦ μ€μ
this[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
this[SaslConfigs.SASL_MECHANISM] = "AWS_MSK_IAM"
this[SaslConfigs.SASL_JAAS_CONFIG] = "software.amazon.msk.auth.iam.IAMLoginModule required;"
this[SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS] = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
return DefaultKafkaProducerFactory(configProps)
}
}
'Kafka' μΉ΄ν κ³ λ¦¬μ λ€λ₯Έ κΈ
| Kafka Consumer μ€μ κ°μ΄λ (3) | 2025.07.09 |
|---|
λκΈ