Re: Ensuring that the message is persisted after acknowledgement

2021-08-24 Thread Peter Bukowinski
Kunal,

I recommend looking at the broker and topic parameters that include the term 
“flush” , such as 
https://kafka.apache.org/documentation/#topicconfigs_flush.messages 


Kafka lets you configure how often log messages are flushed to disk, either per 
topic or globally. The default settings leave the flushing completely to the 
OS. Kafka was designed to take full advantage of the OS page cache because it 
significantly improves performance for both producers and consumers, allowing 
them to write to and read from memory.

If your application requires absolute disk persistence and you are willing to 
take a significant performance hit, you can set the topic property 
flush.messages to 1 for any topic that requires this guarantee.

—
Peter

> On Aug 24, 2021, at 10:31 PM, Kunal Goyal  wrote:
> 
> Hi Sunil
> 
> The article that you shared talks about acks. But even if the message is
> received by all in-sync replicas and kafka sends response back to the
> producer, it is possible that none of the replicas did not flush the
> messages to disk. So, if all the replicas crash for some reason, the
> messages would be lost. For our application, we require some way to
> guarantee that the messages are persisted to disk.
> 
> Regards,
> Kunal
> 
> On Tue, Aug 24, 2021 at 8:40 PM Vairavanathan Emalayan <
> vairavanathan.emala...@cohesity.com> wrote:
> 
>> 
>> 
>> -- Forwarded message -
>> From: sunil chaudhari 
>> Date: Fri, Aug 20, 2021 at 8:00 AM
>> Subject: Re: Ensuring that the message is persisted after acknowledgement
>> To: 
>> Cc: Vairavanathan Emalayan 
>> 
>> 
>> Hi Kunal,
>> This article may help you.
>> 
>> https://betterprogramming.pub/kafka-acks-explained-c0515b3b707e
>> 
>> 
>> Cheers,
>> Sunil.
>> 
>> On Fri, 20 Aug 2021 at 8:11 PM, Kunal Goyal 
>> wrote:
>> 
>>> Hello,
>>> 
>>> We are exploring using Kafka for our application. Our requirement is that
>>> once we write some messages to Kafka, it should be guaranteed that the
>>> messages are persisted to disk.
>>> We found this
>>> <
>>> https://www.quora.com/Does-Kafka-sync-data-to-disk-asynchronously-like-Redis-does
 
>>> article which says that a Kafka broker acknowledges a record after it has
>>> written the record to the buffer of the I/O device; it does not issue an
>>> explicit fsync operation nor does it wait for the OS to confirm that the
>>> data has been written. Is this statement true for the current
>>> implementation? If so, is there any way in which we can ensure fsync is
>>> called before acknowledgement of messages?
>>> Any help would be appreciated.
>>> 
>>> --
>>> 
>>> Thanks & Regards
>>> 
>>> Kunal Goyal
>>> 
>> 



Re: Ensuring that the message is persisted after acknowledgement

2021-08-24 Thread Kunal Goyal
Hi Sunil

The article that you shared talks about acks. But even if the message is
received by all in-sync replicas and kafka sends response back to the
producer, it is possible that none of the replicas did not flush the
messages to disk. So, if all the replicas crash for some reason, the
messages would be lost. For our application, we require some way to
guarantee that the messages are persisted to disk.

Regards,
Kunal

On Tue, Aug 24, 2021 at 8:40 PM Vairavanathan Emalayan <
vairavanathan.emala...@cohesity.com> wrote:

>
>
> -- Forwarded message -
> From: sunil chaudhari 
> Date: Fri, Aug 20, 2021 at 8:00 AM
> Subject: Re: Ensuring that the message is persisted after acknowledgement
> To: 
> Cc: Vairavanathan Emalayan 
>
>
> Hi Kunal,
> This article may help you.
>
> https://betterprogramming.pub/kafka-acks-explained-c0515b3b707e
>
>
> Cheers,
> Sunil.
>
> On Fri, 20 Aug 2021 at 8:11 PM, Kunal Goyal 
> wrote:
>
>> Hello,
>>
>> We are exploring using Kafka for our application. Our requirement is that
>> once we write some messages to Kafka, it should be guaranteed that the
>> messages are persisted to disk.
>> We found this
>> <
>> https://www.quora.com/Does-Kafka-sync-data-to-disk-asynchronously-like-Redis-does
>> >
>> article which says that a Kafka broker acknowledges a record after it has
>> written the record to the buffer of the I/O device; it does not issue an
>> explicit fsync operation nor does it wait for the OS to confirm that the
>> data has been written. Is this statement true for the current
>> implementation? If so, is there any way in which we can ensure fsync is
>> called before acknowledgement of messages?
>> Any help would be appreciated.
>>
>> --
>>
>> Thanks & Regards
>>
>> Kunal Goyal
>>
>


How to use CRL (Certificate Revocation List) with Kafka

2021-08-24 Thread Darshan
Hi
We have a private CA and our Kafka Brokers are signed by a private CA.
Bunch of external clients connect to our broker and before connecting they
download the private CA's cert and add it to truststore. Everything works
fine.

On the Kafka broker side, we want to use CRL before we authenticate any
client. Just wondering how we can use the CRL or OCSP (Online Certificate
Status Protocol) with Kafka ? I couldn't find any documentation  around it,
so I thought of asking the community.

Any help would be appreciated.

Thanks.
--Darshan


Conditional Produce in Kafka (KAFKA-2260)

2021-08-24 Thread Niket Goel
Hi all,

I am new to the Kafka project and while having a discussion with a colleague, I 
was pointed to the JIRA KAFKA-2260 
 [1] which talks about adding 
the ability to perform "conditional produce" requests to Kafka. This idea 
sounded very exciting to me and I can see it draw parallels with Optimistic 
Concurrency Control in Traditional Databases wherein Kafka could be used as a 
"system of record”, or as an arbiter for disparate systems working on the same 
data. The natural scale-out architecture of Kafka makes this ability even more 
interesting than the more monolithic databases that generally offer this. I 
found a system which is doing something like this - Waltz 
altz 
 [2] today already.

Following the discussion in the JIRA [1] itself and then on KIP-27 

 [3], it seems like there was a lot of interest in this proposal, but it never 
materialized. Also IIUC, with the introduction of KIP-98 

 [4], some of the use cases that were motivating the discussion seem to have 
been addressed (e.g. idempotent producer). 

In theory, introducing this ability to Kafka seems pretty great to me, but 
being new to Kafka I am not able to articulate real-life use cases where users 
of Kafka would utilize such an ability. This is where I would like help.

My questions to the group here today are:
1. Is there a use case that you are using/or want to use Kafka for, where 
having the ability to "conditionally produce” or "optimistically lock a key" 
would simplify your life?
2. If there are such scenarios today, how are people making it work? Is it 
through other Kafka features, or through application side logic.

Please feel to also add your thoughts on this feature’s usefulness, or even 
point me towards more suggested reading if I am missing things. :) 


Thanks
Niket Goel

[1] https://issues.apache.org/jira/browse/KAFKA-2260 

[2] https://wepay.github.io/waltz/docs/introduction 
 
[3] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish 

 
[4] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
 

 



Re: Kafka Streams Handling uncaught exceptions REPLACE_THREAD

2021-08-24 Thread Yoda Jedi Master
Thank you for your help, I will check it and try it :-)

On Mon, Aug 16, 2021 at 11:45 AM Bruno Cadonna  wrote:

> Hi Yoda,
>
> for certain cases, Kafka Streams allows you to specify handlers that
> skip the problematic record. Those handlers are:
>
> 1. deserialization exception handler configured in
> default.deserialization.exception.handler
> 2. time extractor set in default.timestamp.extractor and in the Consumed
> object
> 3. production exception handler configured in
> default.production.exception.handler
>
> Kafka Streams provides implementations for handlers 1 and 2 to skip the
> problematic records, that are LogAndContinueExceptionHandler and
> LogAndSkipOnInvalidTimestamp, respectively.
>
> For some more details have a look at
>
> https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling
>
> If problematic records cause an exception in user code, the user code
> needs to provide functionality to skip the problematic record.
>
> Best,
> Bruno
>
> On 10.08.21 13:26, Yoda Jedi Master wrote:
> > Hi Bruno, thank you for your answer.
> > I mean that the message that caused the exception was consumed and
> replaced
> > thread will continue from the next message. How then does it handle
> > uncaught exceptions, if it will fail again?
> >
> >
> > On Tue, Aug 10, 2021 at 12:33 PM Bruno Cadonna 
> wrote:
> >
> >> Hi Yoda,
> >>
> >> What do you mean exactly with "skipping that failed message"?
> >>
> >> Do you mean a record consumed from a topic that caused an exception that
> >> killed the stream thread?
> >>
> >> If the record killed the stream thread due to an exception, for example,
> >> a deserialization exception, it will probably also kill the next stream
> >> thread that will read that record. Replacing a stream thread does not
> >> skip records but it can result in duplicate records depending on the
> >> application’s processing mode determined by the
> >> PROCESSING_GUARANTEE_CONFIG value as stated in the docs you cited.
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >>
> >> On 10.08.21 11:15, Luke Chen wrote:
> >>> Hi Yoda,
> >>> For your question:
>  If an application gets an uncaught exception, then the failed thread
> >> will
> >>> be replaced with another thread and it will continue processing
> messages,
> >>> skipping that failed message?
> >>>
> >>> --> Yes, if everything goes well after `replace thread`, you can ignore
> >>> this failed message. Just one reminder that you should check the failed
> >>> message to avoid this `uncaught exception` thrown again, because if
> this
> >>> happens frequently, it'll impact application performance.
> >>>
> >>> Thank you.
> >>> Luke
> >>>
> >>> On Tue, Aug 10, 2021 at 4:25 PM Yoda Jedi Master 
> >> wrote:
> >>>
>  "REPLACE_THREAD - Replaces the thread receiving the exception and
>  processing continues with the same number of configured threads.
> (Note:
>  this can result in duplicate records depending on the application’s
>  processing mode determined by the PROCESSING_GUARANTEE_CONFIG value)"
> 
>  If an application gets an uncaught exception, then the failed thread
> >> will
>  be replaced with another thread and it will continue processing
> >> messages,
>  skipping that failed message?
> 
> >>>
> >>
> >
>


Re: Kafka attempt rediscovery never happen

2021-08-24 Thread Luke Chen
Hi Antonio,
What you can do is to check the broker side log, to see if there's any
info. If no, you can also enable DEBUG log to have more info for
troubleshooting.

Thank you.
Luke

On Mon, Aug 23, 2021 at 11:27 PM Antonio Pires 
wrote:

> Hello fellow Kafka users,
>
> I've been trying to understand what happened to one of my Consumers but
> with no luck so far, but maybe someone could help me with some insight.
>
> I got these lines on the logs
>
> │ 20:43:43,909 INFO  com.mms.logic [] -
> Successfully consumed message.
> correlationId=ce62d95e-09e4-4f8f-b166-5193d582733b topic=nj_topic
> partition=2 offset=371360
> │ 20:43:55,281 ERROR
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] -
> [Consumer clientId=consumer-mms-1, groupId=mms] Offset commit failed on
> partition nj_topic-2 at offset 371361: The coordinator is not aware of this
> member.
> │ 20:43:55,282 INFO
>  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] -
> [Consumer clientId=consumer-mms-1, groupId=mms] OffsetCommit failed with
> Generation{generationId=17,
> memberId='consumer-mms-1-c69e919b-2719-42cf-bf76-687fb1b0ea18',
> protocol='range'}: The coordinator is not aware of this member.
> │ 20:43:55,281 INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] -
> [Consumer clientId=consumer-mms-1, groupId=mms] Group coordinator
> njss1.env.net:9092 (id: 2147483646 rack: null) is unavailable or invalid,
> will attempt rediscovery
>
> After this nothing else is logged, no actual attempt to rediscover the
> coordinator and rejoin is made but I still get a response from the webping
> endpoint of the service which is running the consumer, meaning the service
> did not go down but the consumer for some reason did not retry to commit
> the offset.
>
> Any ideas?
>
> Regards
> Antonio
>