Re: UnknownProducerIdException every few minutes

2019-10-10 Thread Matthias J. Sax
Maybe related to https://issues.apache.org/jira/browse/KAFKA-7190

It's fixed in upcoming 2.4 release.


-Matthias

On 9/25/19 3:08 PM, Alessandro Tagliapietra wrote:
> Hello everyone,
> 
> I've another problem unrelated to the previous one so I'm creating another
> thread
> We've a stream application that reads from a topic, read/writes from 3
> different stores and writes the output to another topic. All with exactly
> once processing guarantee enabled.
> 
> Due to a bug in the producer logic that was sending messages to the topic
> our stream is reading from, it was sending over and over the same data with
> increasing timestamps.
> For example it was sending data with an initial timestamp of today at 12:00
> until 15:00 and then 12:00 again over and over creating a cycle in the
> timestamps.
> 
> In our stream configuration we're using a timestamp extractor that reads
> the message from the timestamp json.
> 
> Since this bugged producer was sending data in batches every few minutes
> our stream was dying every few minutes with this error:
> 
> [kafka-producer-network-thread |
> my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1-0_0-producer]
> ERROR org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
> task [0_0] Error sending record to topic
> my-app-orders-pipeline-order-unique-emitter-changelog due to This exception
> is raised by the broker if it could not locate the producer metadata
> associated with the producerId in question. This could happen if, for
> instance, the producer's records were deleted because their retention time
> had elapsed. Once the last records of the producerId are removed, the
> producer's metadata is removed from the broker, and future appends by the
> producer will return this exception.; No more records will be sent and no
> more offsets will be recorded for this task. Enable TRACE logging to view
> failed record key and value.
> org.apache.kafka.common.errors.UnknownProducerIdException: This exception
> is raised by the broker if it could not locate the producer metadata
> associated with the producerId in question. This could happen if, for
> instance, the producer's records were deleted because their retention time
> had elapsed. Once the last records of the producerId are removed, the
> producer's metadata is removed from the broker, and future appends by the
> producer will return this exception.
> [my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1]
> ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
> stream-thread
> [my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1]
> Failed to commit stream task 0_0 due to the following error:
> org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending
> since an error caught with a previous record (key 59 value [B@16e88816
> timestamp 1568779124999) to topic
> my-app-orders-pipeline-order-unique-emitter-changelog due to
> org.apache.kafka.common.errors.UnknownProducerIdException: This exception
> is raised by the broker if it could not locate the producer metadata
> associated with the producerId in question. This could happen if, for
> instance, the producer's records were deleted because their retention time
> had elapsed. Once the last records of the producerId are removed, the
> producer's metadata is removed from the broker, and future appends by the
> producer will return this exception.
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:138)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:201)
> at
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1318)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:720)
> at
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:706)
> at
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:663)
> at
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:585)
> at
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:73)
> at
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:789)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:331)
> at 

Re: OutOfOrderSequenceException

2019-10-10 Thread Matthias J. Sax
An OutOfOrderSequenceException is a severe error indicating potential
data loss.

It can happen if there was a problem broker side, and data was truncated
because of a leader change, even after the data was acknowledged as
successfully written to the producer.

What is your topic/brokers configs?

You should use `replication.factor=3`, `min.in.sync.replicas=2` and
`unclean.leader.election.enable=false`.


-Matthias




On 10/7/19 2:41 AM, Jose Manuel Vega Monroy wrote:
> Hi there,
> 
>  
> 
> Finally we upgraded our producer configuration to ensure message order:
> 
>  
> 
>     retries = 1
> 
> # note to ensure order enable.idempotence=true, which forcing to
> acks=all and max.in.flight.requests.per.connection<=5
> 
>     enable.idempotence = true
> 
>     max.in.flight.requests.per.connection = 4
> 
>     acks = "all"
> 
>  
> 
> However, recently we faced this exception:
> 
>  
> 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker
> received an out of order sequence number..
> 
>  
> 
> Any idea why happened? Is that expected?
> 
>  
> 
> It could be related to retries configuration? It’s that configuration
> properly set?
> 
>  
> 
> From official documentation
> 
> we found *recommending unset retries*, being default value to
> Integer.MAX_VALUE:
> 
>  
> 
> “To take advantage of the idempotent producer, it is imperative to avoid
> application level re-sends since these cannot be de-duplicated. As such,
> if an application enables idempotence, it is recommended to leave the
> retries config unset, as it will be defaulted to Integer.MAX_VALUE.
> Additionally, if a send(ProducerRecord) returns an error even with
> infinite retries (for instance if the message expires in the buffer
> before being sent), then it is recommended to shut down the producer and
> check the contents of the last produced message to ensure that it is not
> duplicated. Finally, the producer can only guarantee idempotence for
> messages sent within a single session.”
> 
>  
> 
> Thanks
> 
>  
> 
> https://www.williamhillplc.com/content/signature/WHlogo.gif?width=180
> 
> 
> https://www.williamhillplc.com/content/signature/senet.gif?width=180
> 
> 
>   
> 
> *Jose Manuel Vega Monroy **
> **Java Developer / Software Developer Engineer in Test*
> 
> Direct: +*0035 0 2008038 (Ext. 8038)*
> Email: jose.mon...@williamhill.com 
> 
> William Hill | 6/1 Waterport Place | Gibraltar | GX11 1AA
> 
>  
> 
>  
> 
>  
> 



signature.asc
Description: OpenPGP digital signature


Re: Corrupted RocksDB KeyValueStore.get() returns null?

2019-10-10 Thread Matthias J. Sax
Hard to say.

Are you sure you query the correct instance? As you have 16 partitions,
you need to ensure that the instance you query actually hosts the key.
Or do you have only one instance running?

How do you know that there should be a value for the key? Note that
everything is async and continuously changing and thus it might hard to
reason about the system state?


-Matthias


On 10/8/19 7:53 AM, Stefan Hoffmeister wrote:
> Hello everybody,
> 
> we seem to experience very unexpected behaviour from the Kafka Streams 2.2.0 
> framework, where KeyValueState.get() returns a value of null, although it 
> really should return some other value.
> 
> This seems to be content corruption somewhere at / around the cache layer in 
> Kafka Streams, with RocksDB delivering the persistent cache.
> 
> On the implementation side, we simply
> 
> 
> streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("my-topic"),
> 
> then retrieve and cache the store in init() of a Transformer, then use the 
> store in transform().
> 
> We see that *very rarely* KeyValueStore.get() returns null, although the 
> (changelog) topic (on the Kafka broker) has clear evidence that 
> KeyValueStore.get() should have returned a value.
> 
> This was seen at exactly the same time for at least two different topics, 
> upon starting the streaming application,
> 
> a) (changelog) topic with exactly one partition, guaranteed to contain 
> exactly one key - had reliably returned data for four weeks
> 
> b) (changelog) topic over 16 partitions, where for about 30 calls to get(), a 
> total of 20 returned data as expected, and about 10 returned an incorrect 
> value of null.
> 
> Has anyone seen something like that before, any pointers to root cause?
> 
> The following items have made it onto our review agenda:
> 
> a) virtual network instability, possibly leading to interesting RocksDB 
> startup behaviour 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader#initialize)
> 
> b) on-disk data corruption (Linux -> NFS as persistent cache storage for 
> RocksDB)
> 
> 
> Any insight or pointers greatly appreciated!
> 
> Many thanks!
> 
> Stefan
> 



signature.asc
Description: OpenPGP digital signature


Re: Is it possible to know the number of producers for a topic?

2019-10-10 Thread Jörn Franke
Not sure what yo ur exact use case is? Exactly once processing? 
Can you describe the full pipeline ?
For what you have described you could use a producer id together with the 
message. However , your description opens new questions - what do you plan to 
do in case this happens? Depending on that other solutions could make sense

> Am 11.10.2019 um 08:56 schrieb Dylan Martin :
> 
> I have a situation where I should always have exactly one producer per 
> topic, but under certain rare circumstances I can have more than one, and 
> that's really bad when it happens, so I want to detect it.
> 
> Can I know how many producer a topic has?  Can I identify them?
> 
> Thanks!
> 
> 
> -Dylan
> 
> The information contained in this email message, and any attachment thereto, 
> is confidential and may not be disclosed without the sender's express 
> permission. If you are not the intended recipient or an employee or agent 
> responsible for delivering this message to the intended recipient, you are 
> hereby notified that you have received this message in error and that any 
> review, dissemination, distribution or copying of this message, or any 
> attachment thereto, in whole or in part, is strictly prohibited. If you have 
> received this message in error, please immediately notify the sender by 
> telephone, fax or email and delete the message and all of its attachments. 
> Thank you.


Re: Add me to the contributors list...

2019-10-10 Thread Jun Rao
Hi, Senthil,

Thanks for your interest. Just added you to the contributors list and gave
you the wiki permissions.

Jun

On Thu, Oct 10, 2019 at 5:26 PM Senthilnathan Muthusamy
 wrote:

> Hi,
>
> I am Senthil from Microsoft Azure Compute and will be contributing to the
> KIP-280. Can you please add me to the contributors list and provide access
> to the KIP-280, JIRA & the repo.
>
> My details:
> Name: Senthilnathan Muthusamy
> Username: senthilm-ms
> Email: senth...@microsoft.com
>
> Thanks,
> Senthil
>


Add me to the contributors list...

2019-10-10 Thread Senthilnathan Muthusamy
Hi,

I am Senthil from Microsoft Azure Compute and will be contributing to the 
KIP-280. Can you please add me to the contributors list and provide access to 
the KIP-280, JIRA & the repo.

My details:
Name: Senthilnathan Muthusamy
Username: senthilm-ms
Email: senth...@microsoft.com

Thanks,
Senthil


Is it possible to know the number of producers for a topic?

2019-10-10 Thread Dylan Martin
I have a situation where I should always have exactly one producer per topic, 
but under certain rare circumstances I can have more than one, and that's 
really bad when it happens, so I want to detect it.

Can I know how many producer a topic has?  Can I identify them?

Thanks!


-Dylan

The information contained in this email message, and any attachment thereto, is 
confidential and may not be disclosed without the sender's express permission. 
If you are not the intended recipient or an employee or agent responsible for 
delivering this message to the intended recipient, you are hereby notified that 
you have received this message in error and that any review, dissemination, 
distribution or copying of this message, or any attachment thereto, in whole or 
in part, is strictly prohibited. If you have received this message in error, 
please immediately notify the sender by telephone, fax or email and delete the 
message and all of its attachments. Thank you.


Records with null value (tombstone) are not removed during compaction

2019-10-10 Thread Uma Maheswari
I have created a topic with cleanup.policy set to compact. segment.ms and 
delete.retention.ms are also configured for the topic. Compaction is happening 
but records with null value are not removed. But when segment.bytes is 
configured, records with null value are removed. Please help.

Thanks,
Uma


Records will null value (tombstone) not removed during compaction

2019-10-10 Thread Uma Maheswari
I have a topic with cleanup.policy set to compact. segment.ms and 
delete.retention.ms are configured for the topic. Compaction is happening but 
records with null value are not removed (tombstone). Whereas when segment.bytes 
is configured for the topic, records with null values are removed. Please help 
me.

Thanks,
Uma


Re: Long delay between incoming and outgoing messages using kafka streams

2019-10-10 Thread Petter Arvidsson
Hi Sophie,

Thank you for your response.

I tested the proposed setting for CACHE_MAX_BYTES_BUFFERING_CONFIG and it
seem to not significantly change the behavior of the application. The
latency remains very similar. The documentation states the following
regarding CACHE_MAX_BYTES_BUFFERING_CONFIG and COMMIT_INTERVAL_MS_CONFIG:

---
To enable caching but still have an upper bound on how long records will be
cached, you can set the commit interval. In this example, it is set to 1000
milliseconds:

Properties props = new Properties();
// Enable record cache of size 10 MB.
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 *
1024L);
// Set commit interval to 1 second.
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
---

Which made me believe that the COMMIT_INTERVAL_MS_CONFIG would already
"override" the CACHE_MAX_BYTES_BUFFERING_CONFIG and provide an upper bound
of the latency of 1s per processing step by flushing buffers every second.
Is this the case or does these two configuration values interact in some
other way?

We are using 256 partitions for all our topics. Is this to be considered a
very high partition count? Do you think we might be affected by the bug in
2.3.0?

Thank you for the help!

Best regards,
Petter

On Wed, Oct 9, 2019 at 7:33 PM Sophie Blee-Goldman 
wrote:

> Hi Petter,
>
> I'd recommend turning off caching by setting
> p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING, 0);
>
> 2.3.0 also has some known performance issues that will be fixed in 2.3.1,
> but they
> shouldn't be noticeable if you turn caching off and aren't reading/writing
> to topics
> with a very high partition count. These are fixed in 2.3.1 which should be
> released
> soon for you to upgrade, but the caching is likely the main reason for the
> latency you see.
>
> I'd also note that Streams, and Kafka in general, is typically tuned for
> high
> throughput rather than low latency, so I wouldn't be too concerned about
> a large latency unless that is a specific requirement.
>
> Cheers,
> Sophie
>
> On Wed, Oct 9, 2019 at 6:05 AM Petter Arvidsson <
> petter.arvids...@relayr.io>
> wrote:
>
> > Hi,
> >
> > I have a fairly simple kafka streams application that read messages from
> > two topics. The problem I am facing is that the delay between sending
> > events to the streams application and it producing results is very high
> (as
> > in several minutes). My question is: how can I make this latency smaller?
> >
> > The streams is doing the following:
> > ktable1 = topic1
> >   -> (filter out messages using flatMap)
> >   -> groupBy (with new key, adds internal rekeying topic)
> >   -> aggregate (in memory store backed by internal compacted topic)
> >
> > ktabe2 = topic2
> >   -> (rekey to same key as ktable1 over internal topic)
> >   -> join (with ktable1)
> >   -> aggregate (in memory store backed by internal compacted topic)
> >
> > ktable2.toStream.to(topic2)
> >
> > Ktable1 keep configuration that allows messages to pass through and be
> > aggregated into ktable2. Ktable2 keeps aggregates based on messages on
> > topic2. Ktable2.toStream is then used to put the aggregated messages back
> > out on topic2. The "problem" (or misunderstanding as to how kafka stream
> is
> > processing messages) is that the delay from sending a message on topic1
> to
> > the point where messages received on topic2 are passing the join is
> several
> > minutes. With the settings I have (see below) on a not that heavily
> loaded
> > system, I would assume the latency would be a couple of seconds (based on
> > the COMMIT_INTERVAL_MS_CONFIG).
> >
> > I use the following settings (as well as settings for bootstrap servers,
> > application id and so forth):
> > p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000)
> > p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> > p.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> >
> > The store used for the KTables is the one returned by
> > "Stores.inMemoryKeyValueStore()".
> >
> > Kafka libraries use version "2.3.0" and the "kafka-streams-scala"
> scaladsl
> > is used to build the streams. The broker is using version "1.1.0".
> >
> > Best regards,
> > Petter
> >
>


Upgrading from 0.11.0.2 to 2.2.1

2019-10-10 Thread Vincent Rischmann
Hello,

I have a cluster still on 0.11.0.2 that we're planning to upgrade to 2.2.1 
eventually (I don't want to upgrade to 2.3.0 just yet).

I'm aware of the documentation at 
https://kafka.apache.org/22/documentation.html#upgrade and I plan to follow 
each steps.

I'm just wondering if folks around here recommend to upgrade directly from 
0.11.0.2 to 2.2.1 or instead do something like 0.11.0.2 -> 1.1.1 -> 2.2.1.
Also if anyone made the jump from 0.11.0.2 to 2.2.1 in production I'd love to 
hear your feedback, especially if things didn't work properly.

Thanks !