1. Overview
The NB Kafka adapter allows publishing messages to or consuming messages from
At high level, this adapter supports the following Kafka functionalities
- Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers)
- Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowledgements (to brokers) (aka, message commits)
- auto message commit
- manual message commit with a configurable number of message commits in one batch
- Kafka Transaction support
1.1. Example NB Yaml
2. Usage
## Kafka Producer
## Kafka Consumer
2.1. NB Kafka adapter specific CLI parameters
-
num_clnt
: the number of Kafka clients to publish messages to or to receive messages from- For producer workload, this is the number of the producer threads to publish messages to the same topic
- Can have multiple producer threads for one topic/partition (
KafkaProducer
is thread-safe) threads
andnum_clnt
values MUST be the same.
- Can have multiple producer threads for one topic/partition (
- For consumer workload, this is the partition number of a topic
- Consumer workload supports to subscribe from multiple topics. If so, it requires all topics having the same partition number.
- Only one consumer thread for one topic/partition (
KafkaConsumer
is NOT thread-safe) threads
MUST be equal tonum_clnt
*num_cons_grp
- For producer workload, this is the number of the producer threads to publish messages to the same topic
-
num_cons_grp
: the number of consumer groups- Only relevant for consumer workload
For the Kafka NB adapter, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported:
async_api
(boolean):- When true, use async Kafka client API.
seq_tracking
(boolean):- When true, a sequence number is created as part of each message's properties
- This parameter is used in conjunction with the next one in order to simulate abnormal message processing errors and then be able to detect such errors successfully.
seqerr_simu
:- A list of error simulation types separated by comma (,)
- Valid error simulation types
out_of_order
: simulate message out of sequencemsg_loss
: simulate message lossmsg_dup
: simulate message duplication
- This value should be used only for testing purposes. It is not recommended to use this parameter in actual testing environments.
e2e_starting_time_source
:- Starting timestamp for end-to-end operation. When specified, will update the
e2e_msg_latency
histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch. - The possible values for
e2e_starting_time_source
:message_publish_time
: uses the message publishing timestamp as the starting time. The message publishing time, in this case, is computed by the Kafka client on record generation. This is the case, asCreateTime
is the default.
- Starting timestamp for end-to-end operation. When specified, will update the