Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread William Lee
Hi Greg,
Thanks for replying.

> From your description, it sounds like you want the success/failure of
> a callback to influence whether that record (and later records) are
> present in the topic. Is this correct?
Yes

> The solution that you posted does actually write a record that has an
> erroneous callback, is that desirable, or would you want that record
> to also be rejected?
The source code demonstrated the original problem, not the solution.
What I want is: once an exception is present in producer callback, I would
assume the record is not delivered to the broker and I would like all
records after the exception to be suspended from sending to the broker so
that all records already delivered to the broker is strictly ordered after
the producer was closed.
In the source code demo, the former batch met an exception, but the latter
batch was sent successfully even though I initiated the producer closing
operation immediately when I detected the exception. That is not what I
want.
As for "does actually write a record that has an erroneous callback" you
said, I noticed this problem, but this is not the key point of my problem
so I did not mention it.

As for transactional producers, transactional producers are not suited to
my user case. My data is database CDC(change data capture) data which
should maintain order strictly(similar to debezium project). There is no
need for me to use a transactional producer.

> I think you should carefully consider throwing delivery-critical
> errors from the callback, as that is not a common workflow. Could
> those errors be moved to a different part of the pipeline, such as the
> consumer application?
The problem is not related to the consumer side. I do want to achieve
Exactly Once delivery. I previously thought idempotent producer could be a
solution, but I later found that idempotent producer could only guarantee
ordering when kafka is retrying producer batch internally.

Thanks and regards,
William

Greg Harris  于2024年3月12日周二 00:50写道:

> Hi William,
>
> From your description, it sounds like you want the success/failure of
> a callback to influence whether that record (and later records) are
> present in the topic. Is this correct?
> The solution that you posted does actually write a record that has an
> erroneous callback, is that desirable, or would you want that record
> to also be rejected?
>
> This sounds like a use-case for transactional producers [1] utilizing
> Exactly Once delivery. You can start a transaction, emit records, have
> them persisted in Kafka, perform some computation afterwards, and then
> decide whether to commit or abort the transaction based on the result
> of that computation.
>
> There is also a performance penalty to transactional producers, but it
> is different from the max.in.flight.requests.per.connection bottleneck
> and not directly comparable.
> I think you should carefully consider throwing delivery-critical
> errors from the callback, as that is not a common workflow. Could
> those errors be moved to a different part of the pipeline, such as the
> consumer application?
>
> And since you're performance sensitive, please be aware that
> performance (availability) nearly always comes at the cost of delivery
> guarantees (consistency) [2]. If you're not willing to pay the
> performance cost of max.in.flight.requests.per.connection=1, then you
> may need to make a compromise on the consistency and find a way to
> manage the extra data.
>
> Thanks,
> Greg Harris
>
> [1]
> https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> [2] https://en.wikipedia.org/wiki/CAP_theorem
>
> On Mon, Mar 11, 2024 at 7:32 AM William Lee 
> wrote:
> >
> > Hi Haruki,
> > Thanks for your answer.
> > > I still don't get why you need this behavior though
> > The reason is I have to ensure message ordering per partition strictly.
> > Once there is an exception in the producer callback, it indicates that
> the
> > exception is not a retryable exception(from kafka producer's
> perspective).
> > There must be something wrong, so I have to stop sending records and
> > resolve the underlying issue.
> >
> > As for the performance problem, I found a kafka wiki which investigated
> the
> > impact of max.in.flight.requests.per.connection: An analysis of the
> impact
> > of max.in.flight.requests.per.connection and acks on Producer
> performance -
> > Apache Kafka - Apache Software Foundation
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/An+analysis+of+the+impact+of+max.in.flight.requests.per.connection+and+acks+on+Producer+performance
> >
> > From the wiki, max.in.flight.requests.per.connection is better set to 2
> or
> > more.
> >
> > By setting max.in.flight.requests.per.connection to 1, I'm concerned that
> > this could become a performance bottleneck
>


Re: Kafka Streams 3.5.1 based app seems to get stalled

2024-03-11 Thread Matthias J. Sax

Without detailed logs (maybe even DEBUG) hard to say.

But from what you describe, it could be a metadata issue? Why are you 
setting



METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to make 
rebalances rare)


Refreshing metadata has nothing to do with rebalances, and a metadata 
refresh does not trigger a rebalance.




-Matthias


On 3/10/24 5:56 PM, Venkatesh Nagarajan wrote:

Hi all,

A Kafka Streams application sometimes stops consuming events during load 
testing. Please find below the details:

Details of the app:


   *   Kafka Streams Version: 3.5.1
   *   Kafka: AWS MSK v3.6.0
   *   Consumes events from 6 topics
   *   Calls APIs to enrich events
   *   Sometimes joins two streams
   *   Produces enriched events in output topics

Runs on AWS ECS:

   *   Each task has 10 streaming threads
   *   Autoscaling based on offset lags and a maximum of 6 ECS tasks
   *   Input topics have 60 partitions each to match 6 tasks * 10 threads
   *   Fairly good spread of events across all topic partitions using 
partitioning keys

Settings and configuration:


   *   At least once semantics
   *   MAX_POLL_RECORDS_CONFIG: 10
   *   APPLICATION_ID_CONFIG

// Make rebalances rare and prevent stop-the-world rebalances

   *   Static membership (using GROUP_INSTANCE_ID_CONFIG)
   *   METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to 
make rebalances rare)
   *   MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
   *   SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis

State store related settings:

   *   TOPOLOGY_OPTIMIZATION_CONFIG: OPTIMIZE
   *   STATESTORE_CACHE_MAX_BYTES_CONFIG: 300 * 1024 * 1024L
   *   NUM_STANDBY_REPLICAS_CONFIG: 1


Symptoms:
The symptoms mentioned below occur during load tests:

Scenario# 1:
Steady input event stream

Observations:

   *   Gradually increasing offset lags which shouldn't happen normally as the 
streaming app is quite fast
   *   Events get processed

Scenario# 2:
No input events after the load test stops producing events

Observations:

   *   Offset lag stuck at ~5k
   *   Stable consumer group
   *   No events processed
   *   No errors or messages in the logs


Scenario# 3:
Restart the app when it stops processing events although offset lags are not 
zero

Observations:

   *   Offset lags start reducing and events start getting processed

Scenario# 4:
Transient errors occur while processing events


   *   A custom exception handler that implements 
StreamsUncaughtExceptionHandler returns 
StreamThreadExceptionResponse.REPLACE_THREAD in the handle method
   *   If transient errors keep occurring occasionally and threads get 
replaced, the problem of the app stalling disappears.
   *   But if transient errors don't occur, the app tends to stall and I need 
to manually restart it


Summary:

   *   It appears that some streaming threads stall after processing for a 
while.
   *   It is difficult to change log level for Kafka Streams from ERROR to INFO 
as it starts producing a lot of log messages especially during load tests.
   *   I haven't yet managed to push Kafka streams metrics into AWS OTEL 
collector to get more insights.

Can you please let me know if any Kafka Streams config settings need changing? 
Should I reduce the values of any of these settings to help trigger rebalancing 
early and hence assign partitions to members that are active:


   *   METADATA_MAX_AGE_CONFIG: 5 hours in millis (to make rebalances rare)
   *   MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
   *   SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis

Should I get rid of static membership – this may increase rebalancing but may 
be okay if it can prevent stalled threads from appearing as active members

Should I try upgrading Kafka Streams to v3.6.0 or v3.7.0? Hoping that v3.7.0 
will be compatible with AWS MSK v3.6.0.


Thank you very much.

Kind regards,
Venkatesh

UTS CRICOS Provider Code: 00099F DISCLAIMER: This email message and any 
accompanying attachments may contain confidential information. If you are not 
the intended recipient, do not read, use, disseminate, distribute or copy this 
message or attachments. If you have received this message in error, please 
notify the sender immediately and delete this message. Any views expressed in 
this message are those of the individual sender, except where the sender 
expressly, and with authority, states them to be the views of the University of 
Technology Sydney. Before opening any attachments, please check them for 
viruses and defects. Think. Green. Do. Please consider the environment before 
printing this email.



Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread Greg Harris
Hi William,

>From your description, it sounds like you want the success/failure of
a callback to influence whether that record (and later records) are
present in the topic. Is this correct?
The solution that you posted does actually write a record that has an
erroneous callback, is that desirable, or would you want that record
to also be rejected?

This sounds like a use-case for transactional producers [1] utilizing
Exactly Once delivery. You can start a transaction, emit records, have
them persisted in Kafka, perform some computation afterwards, and then
decide whether to commit or abort the transaction based on the result
of that computation.

There is also a performance penalty to transactional producers, but it
is different from the max.in.flight.requests.per.connection bottleneck
and not directly comparable.
I think you should carefully consider throwing delivery-critical
errors from the callback, as that is not a common workflow. Could
those errors be moved to a different part of the pipeline, such as the
consumer application?

And since you're performance sensitive, please be aware that
performance (availability) nearly always comes at the cost of delivery
guarantees (consistency) [2]. If you're not willing to pay the
performance cost of max.in.flight.requests.per.connection=1, then you
may need to make a compromise on the consistency and find a way to
manage the extra data.

Thanks,
Greg Harris

[1] 
https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
[2] https://en.wikipedia.org/wiki/CAP_theorem

On Mon, Mar 11, 2024 at 7:32 AM William Lee  wrote:
>
> Hi Haruki,
> Thanks for your answer.
> > I still don't get why you need this behavior though
> The reason is I have to ensure message ordering per partition strictly.
> Once there is an exception in the producer callback, it indicates that the
> exception is not a retryable exception(from kafka producer's perspective).
> There must be something wrong, so I have to stop sending records and
> resolve the underlying issue.
>
> As for the performance problem, I found a kafka wiki which investigated the
> impact of max.in.flight.requests.per.connection: An analysis of the impact
> of max.in.flight.requests.per.connection and acks on Producer performance -
> Apache Kafka - Apache Software Foundation
> 
> From the wiki, max.in.flight.requests.per.connection is better set to 2 or
> more.
>
> By setting max.in.flight.requests.per.connection to 1, I'm concerned that
> this could become a performance bottleneck


Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread William Lee
Hi Haruki,
Thanks for your answer.
> I still don't get why you need this behavior though
The reason is I have to ensure message ordering per partition strictly.
Once there is an exception in the producer callback, it indicates that the
exception is not a retryable exception(from kafka producer's perspective).
There must be something wrong, so I have to stop sending records and
resolve the underlying issue.

As for the performance problem, I found a kafka wiki which investigated the
impact of max.in.flight.requests.per.connection: An analysis of the impact
of max.in.flight.requests.per.connection and acks on Producer performance -
Apache Kafka - Apache Software Foundation

>From the wiki, max.in.flight.requests.per.connection is better set to 2 or
more.

By setting max.in.flight.requests.per.connection to 1, I'm concerned that
this could become a performance bottleneck


Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread Haruki Okada
Hi.

> I immediately stop sending more new records and stop the kafka
producer, but some extra records were still sent

I still don't get why you need this behavior though, as long as you set
max.in.flight.requests.per.connection to greater than 1, it's impossible to
avoid this because KafkaProducer can do nothing about requests that are
already sent out.

By the way, with appropriate batch.size and linger.ms configuration, you
can achieve high throughput even with
max.in.flight.requests.per.connection=1 which wouldn't be a problem unless
you have to send large data over slow network.

2024年3月11日(月) 22:55 William Lee :

> Hi all,
> I am facing a problem when I detect an exception in kafka producer
> callback, I immediately stop sending more new records and stop the kafka
> producer, but some extra records were still sent.
>
> I found a way to resolve this issue: setting
> max.in.flight.requests.per.connection to 1 and closing kafka producer when
> encountering an exception in kafka producer callback.
> set max.in.flight.requests.per.connection to 1 will make sure only one
> request will be inflight for one partition, and closing kafka producer in
> producer callback will make Sender in "forceClose" state thus avoiding
> sending extra records.
>
> But, as far as I know, setting max.in.flight.requests.per.connection to 1
> will decrease the performance of kafka producer. I would like to know, is
> there any other way to work around this issue without setting
> max.in.flight.requests.per.connection to 1 so that I can ensure performance
> of kafka producer?
>
> here is my demo source code, you can also find it on Github Gist:
> https://gist.github.com/52Heartz/a5d67cf266b35bafcbfa7bc2552f4576
>
> public class KafkaProducerProblemDemo {
>
> public static void main(String[] args) {
> Logger rootLogger = (Logger)
> LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
> rootLogger.setLevel(Level.INFO);
>
> String topicName = "test_topic_202403112035";
> Map kafkaTopicConfigs = new HashMap<>();
> Properties props = new Properties();
> props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "3000");
> props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.223.3:9094");
> CreateTopicsResult createTopicsResult;
> try (AdminClient adminClient = AdminClient.create(props)) {
> NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
> newTopic.configs(kafkaTopicConfigs);
> kafkaTopicConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG,
> "1048576");
> kafkaTopicConfigs.put(TopicConfig.RETENTION_BYTES_CONFIG,
> "1048576");
> kafkaTopicConfigs.put(TopicConfig.RETENTION_MS_CONFIG,
> "8640");
> createTopicsResult =
> adminClient.createTopics(Collections.singletonList(newTopic));
> System.out.println(createTopicsResult.all().get());
> } catch (Exception e) {
> rootLogger.error("create topic error", e);
> }
>
> // adjust requestTimeout to ensure the request timeout is enough
> long requestTimeout = 2000;
> Properties kafkaProps = new Properties();
> kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
> String.valueOf(requestTimeout));
> kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.StringSerializer.class.getName());
> kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
> kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.223.3:9094");
> kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
> String.valueOf(requestTimeout));
> kafkaProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152);
> // force one batch per record
> kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
> kafkaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>
> kafkaProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
>
> try (KafkaProducer kafkaProducer = new
> KafkaProducer<>(kafkaProps)) {
> AtomicBoolean isFirstRecord = new AtomicBoolean(true);
> AtomicReference sendException = new
> AtomicReference<>();
>
> for (int i = 0; i < 2048; i++) {
> String content = String.valueOf(i);
> ProducerRecord record = new
> ProducerRecord<>(topicName, content.getBytes());
>
> if (sendException.get() != null) {
> // once found exception in callback, stop sending more
> records
> kafkaProducer.close();
> break;
> }
>
> kafkaProducer.send(record, (RecordMetadata metadata,
> Exception exception) -> {
> if (isFirstRecord.getAndSet(false)) {
> try {
>

Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread William Lee
Hi all,
I am facing a problem when I detect an exception in kafka producer
callback, I immediately stop sending more new records and stop the kafka
producer, but some extra records were still sent.

I found a way to resolve this issue: setting
max.in.flight.requests.per.connection to 1 and closing kafka producer when
encountering an exception in kafka producer callback.
set max.in.flight.requests.per.connection to 1 will make sure only one
request will be inflight for one partition, and closing kafka producer in
producer callback will make Sender in "forceClose" state thus avoiding
sending extra records.

But, as far as I know, setting max.in.flight.requests.per.connection to 1
will decrease the performance of kafka producer. I would like to know, is
there any other way to work around this issue without setting
max.in.flight.requests.per.connection to 1 so that I can ensure performance
of kafka producer?

here is my demo source code, you can also find it on Github Gist:
https://gist.github.com/52Heartz/a5d67cf266b35bafcbfa7bc2552f4576

public class KafkaProducerProblemDemo {

public static void main(String[] args) {
Logger rootLogger = (Logger)
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
rootLogger.setLevel(Level.INFO);

String topicName = "test_topic_202403112035";
Map kafkaTopicConfigs = new HashMap<>();
Properties props = new Properties();
props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "3000");
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "
192.168.223.3:9094");
CreateTopicsResult createTopicsResult;
try (AdminClient adminClient = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
newTopic.configs(kafkaTopicConfigs);
kafkaTopicConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG,
"1048576");
kafkaTopicConfigs.put(TopicConfig.RETENTION_BYTES_CONFIG,
"1048576");
kafkaTopicConfigs.put(TopicConfig.RETENTION_MS_CONFIG,
"8640");
createTopicsResult =
adminClient.createTopics(Collections.singletonList(newTopic));
System.out.println(createTopicsResult.all().get());
} catch (Exception e) {
rootLogger.error("create topic error", e);
}

// adjust requestTimeout to ensure the request timeout is enough
long requestTimeout = 2000;
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
String.valueOf(requestTimeout));
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class.getName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
192.168.223.3:9094");
kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
String.valueOf(requestTimeout));
kafkaProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152);
// force one batch per record
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
kafkaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

kafkaProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");

try (KafkaProducer kafkaProducer = new
KafkaProducer<>(kafkaProps)) {
AtomicBoolean isFirstRecord = new AtomicBoolean(true);
AtomicReference sendException = new
AtomicReference<>();

for (int i = 0; i < 2048; i++) {
String content = String.valueOf(i);
ProducerRecord record = new
ProducerRecord<>(topicName, content.getBytes());

if (sendException.get() != null) {
// once found exception in callback, stop sending more
records
kafkaProducer.close();
break;
}

kafkaProducer.send(record, (RecordMetadata metadata,
Exception exception) -> {
if (isFirstRecord.getAndSet(false)) {
try {
// sleep more than twice the
DELIVERY_TIMEOUT_MS_CONFIG to make waiting batch expired
// simulate spend too much time in kafka
callback
Thread.sleep(requestTimeout * 2 + 1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

if (exception != null) {
rootLogger.error("send data failed, record content:
{}, reason: {}", content, exception.toString());
sendException.compareAndSet(null, exception);
} else {
rootLogger.info("send data success, offset: {},
record content: {}", metadata.offset(), content);
 

Issues while setting up RBAC for Apache Kafka using Ranger

2024-03-11 Thread Karthik Suvarnasa
Hi All,

I'm working on setting up RBAC for Apache Kafka using Ranger. Right now,
I'm facing an authorization issue while testing the console producer script
in Kafka. I need help in properly configuring Kafka with Ranger. Below are
the steps I performed.


   - I successfully installed the ranger service.
   - Integrated Ranger with AD using UserSync.
   - Installed Ranger Kafka Plugin on Kafka and made the following changes
   to Kafka server.properties file
  - *authorizer.class.name
  
=org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer*
   - Created Kafka service in Ranger Admin
   - Created a policy in ranger admin to restrict access to topic named
   test for everyone except one user.

I'm using PLAINTEXT://HOSTIP:PORT for listeners.

Now, when I try write to that topic using *./kafka-console-producer.sh
--broker-list hostip:port --topic test*

I'm unable to produce to it, and I'm getting authorization error messages.
which seems okay. But I don't know how to produce the topic with an
authorized user. I tried using a producer config file with the below config


*client.id =
testusersasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="testuser" password="testpass";*

Below is the output
*./kafka-console-producer.sh --broker-list * *hostip:port*  * --topic test
--producer.config producer.properties*

[2024-03-08 16:54:09,034] WARN The configuration 'sasl.jaas.config' was
supplied but isn't a known config.
(org.apache.kafka.clients.producer.ProducerConfig)
>hi
[2024-03-08 16:54:15,309] WARN [Producer clientId= testuser] Error while
fetching metadata with correlation id 3 : {test=TOPIC_AUTHORIZATION_FAILED}
(org.apache.kafka.clients.NetworkClient)
[2024-03-08 16:54:15,321] ERROR [Producer clientId= testuser] Topic
authorization failed for topics [test] (org.apache.kafka.clients.Metadata)
[2024-03-08 16:54:15,325] ERROR Error when sending message to topic test
with key: null, value: 2 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized
to access topics: [test]

Please provide steps to connect and produce to the topic with test user
(This user is from AD).

Regards,
*Karthik Suvarnasa*