Re: Shutting down a Streams job

2017-02-08 Thread Elias Levy
It is certainly possible, but when you got dozens of workers, that would
take a very long time, specially if you got a lot of state, as partitions
get reassigned and state moved about.  In fact, it is likely to fail at
some point, as local state that can be stored in a multitude of nodes may
not be able to be stored locally as the number of nodes becomes smaller.

On Wed, Feb 8, 2017 at 12:34 PM, Dmitry Minkovsky <dminkov...@gmail.com>
wrote:

> Can you take them down sequentially? Like, say, with a Kubernetes
> StatefulSet
> <https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-
> set/#ordered-pod-termination>
> .
>
> On Wed, Feb 8, 2017 at 2:15 PM, Elias Levy <fearsome.lucid...@gmail.com>
> wrote:
>
> > What are folks doing to cleanly shutdown a Streams job comprised of
> > multiple workers?
> >
> > Right now I am doing sys.addShutdownHook(streams.close()) but that is
> not
> > working well to shutdown a fleet of workers.  When I signal the fleet to
> > shutdown by sending them all a SIGTERM, some of them will shutdown, but
> > some will persist.  It appears that there is a race condition between the
> > shutdown signal and a rebalancing occurring as a result of other workers
> > shutting down.  If a worker has not started shutting down before the
> > rebalancing starts, the rebalancing will cause the worker to not
> shutdown.
> >
> > Others seeing the same thing?
> >
>


Shutting down a Streams job

2017-02-08 Thread Elias Levy
What are folks doing to cleanly shutdown a Streams job comprised of
multiple workers?

Right now I am doing sys.addShutdownHook(streams.close()) but that is not
working well to shutdown a fleet of workers.  When I signal the fleet to
shutdown by sending them all a SIGTERM, some of them will shutdown, but
some will persist.  It appears that there is a race condition between the
shutdown signal and a rebalancing occurring as a result of other workers
shutting down.  If a worker has not started shutting down before the
rebalancing starts, the rebalancing will cause the worker to not shutdown.

Others seeing the same thing?


Re: Fetch offset out of range errors while testing Streams application

2017-01-23 Thread Elias Levy
Guozhang,

Thanks for the reply.  I figured it out after a while.  Indeed, the global
default time based retention was tripping me.  I was using older data for
testing and publishing messages with explicit timestamps.  It took me a
while to figure out what was happening because kafka-topics.sh does not
display the parameters of a topic that have a default value.  Thus, I was
under the impression that the topic's data was being kept indefinitely.

It would be best if kaka-topic.sh displayed all configuration for a topic,
or at least something as important as the retention period, even if the
value comes from a global default.

On Sun, Jan 22, 2017 at 3:29 PM, Guozhang Wang  wrote:

> Note that for log retention, Kafka brokers have a global config that could
> be applied to any topics, and topics themselves have a per-topic config
> that can override the broker-level global config, you may want to check
> both the broker configs as well as the topic configs (e.g. with the
> kafka-topics command tool) to make sure that time-based retention is
> properly set on both levels.
>


Re: Fetch offset out of range errors while testing Streams application

2017-01-20 Thread Elias Levy
Suggestions?

On Thu, Jan 19, 2017 at 6:23 PM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> In the process of testing a Kafka Streams application I've come across a
> few issues that are baffling me.
>
> For testing I am executing a job on 20 nodes with four cores per node,
> each instance configured to use 4 threads, against a 5 node broker cluster
> running 0.10.1.1.
>
> Before execution kafka-streams-application-reset.sh is ran to reset
> offset of input topics to zero.  The app calls KafkaStreams.cleanUp() on
> startup to clean up the local state stores.  All workers are started
> simultaneously. All topics have 100 partitions. The main input topic has
> 1TB of data 3x replicated.  min.insync.replicas is set to 3.
>
> The application consumes from the main input topic, transforms the input,
> and repartitions it using KStream.through() to write to another topic.  It
> reads from the repartitioned topic and continues processing.
>
> In the brokers we are seeing errors such as:
>
> [2017-01-19 21:24:41,298] WARN [ReplicaFetcherThread-3-1009], Replica 1010
> for partition some_topic-91 reset its fetch offset from 424762 to current
> leader 1009's start offset 424779 (kafka.server.ReplicaFetcherThread)
> [2017-01-19 21:24:41,350] WARN [ReplicaFetcherThread-2-1009], Replica 1010
> for partition some_topic-66 reset its fetch offset from 401243 to current
> leader 1009's start offset 401376 (kafka.server.ReplicaFetcherThread)
> [2017-01-19 21:24:41,381] ERROR [ReplicaFetcherThread-3-1009], Current
> offset 424762 for partition [some_topic,91] out of range; reset offset to
> 424779 (kafka.server.ReplicaFetcherThread)
> [2017-01-19 21:24:41,399] WARN [ReplicaFetcherThread-3-1009], Replica 1010
> for partition some_topic-71 reset its fetch offset from 456158 to current
> leader 1009's start offset 456189 (kafka.server.ReplicaFetcherThread)
> [2017-01-19 21:24:41,400] WARN [ReplicaFetcherThread-0-1007], Replica 1010
> for partition some_topic-84 reset its fetch offset from 399325 to current
> leader 1007's start offset 399327 (kafka.server.ReplicaFetcherThread)
> [2017-01-19 21:24:41,446] ERROR [ReplicaFetcherThread-2-1009], Current
> offset 401243 for partition [some_topic,66] out of range; reset offset to
> 401376 (kafka.server.ReplicaFetcherThread)
>
> If I understand these errors correctly, they are saying that the broker's
> replica fetcher thread for these partitions failed to fetch at its current
> offset because the leader's start offset is higher.  It basically says the
> leader no longer has the messages at the offset requested.  That makes no
> sense, as the topic is not configured to delete any messages.  I observed
> these errors 512 times in total across all brokers while executing the
> application.
>
> From there is seems to cascade to the Streams application:
>
> INFO  2017-01-19 21:24:41,417 [StreamThread-4][Fetcher.java:714] : Fetch
> offset 1051824 is out of range for partition some_topic-14, resetting offset
> ERROR 2017-01-19 21:24:41,421 [StreamThread-4][StreamThread.java:249] :
> stream-thread [StreamThread-4] Streams application error during processing:
> java.lang.NullPointerException
> at org.apache.kafka.clients.consumer.internals.Fetcher.
> resetOffset(Fetcher.java:341)
> at org.apache.kafka.clients.consumer.internals.Fetcher.
> resetOffsetsIfNeeded(Fetcher.java:197)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> updateFetchPositions(KafkaConsumer.java:1524)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1018)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(StreamThread.java:407)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> INFO  2017-01-19 21:24:41,425 [StreamThread-4][StreamThread.java:268] :
> stream-thread [StreamThread-4] Shutting down
>
> Saw this error 731 times across the workers.
>
> If we look at just one partition across brokers and workers and we group
> the logs by time, we see this:
>
> worker-3 INFO  2017-01-19 21:*24:41*,806 [StreamThread-3][Fetcher.java:714]
> : Fetch offset 429851 is out of range for partition some_topic-94,
> resetting offset
>
> worker-3 INFO  2017-01-19 21:*29:41*,496 [StreamThread-1][Fetcher.java:714]
> : Fetch offset 1317721 is out of range for partition some_topic-94,
> resetting offset
>
> worker-1 INFO  2017-01-19 21:*34:41*,977 [StreamThread-2][Fetcher.java:714]
> : Fetch offset 2014017 is out of range for partition some_topic-94,
> resetting offset
>
> worker-1 INFO  2017-01-19 21:*39:41*,425 [StreamThre

Fetch offset out of range errors while testing Streams application

2017-01-19 Thread Elias Levy
In the process of testing a Kafka Streams application I've come across a
few issues that are baffling me.

For testing I am executing a job on 20 nodes with four cores per node, each
instance configured to use 4 threads, against a 5 node broker cluster
running 0.10.1.1.

Before execution kafka-streams-application-reset.sh is ran to reset offset
of input topics to zero.  The app calls KafkaStreams.cleanUp() on startup
to clean up the local state stores.  All workers are started
simultaneously. All topics have 100 partitions. The main input topic has
1TB of data 3x replicated.  min.insync.replicas is set to 3.

The application consumes from the main input topic, transforms the input,
and repartitions it using KStream.through() to write to another topic.  It
reads from the repartitioned topic and continues processing.

In the brokers we are seeing errors such as:

[2017-01-19 21:24:41,298] WARN [ReplicaFetcherThread-3-1009], Replica 1010
for partition some_topic-91 reset its fetch offset from 424762 to current
leader 1009's start offset 424779 (kafka.server.ReplicaFetcherThread)
[2017-01-19 21:24:41,350] WARN [ReplicaFetcherThread-2-1009], Replica 1010
for partition some_topic-66 reset its fetch offset from 401243 to current
leader 1009's start offset 401376 (kafka.server.ReplicaFetcherThread)
[2017-01-19 21:24:41,381] ERROR [ReplicaFetcherThread-3-1009], Current
offset 424762 for partition [some_topic,91] out of range; reset offset to
424779 (kafka.server.ReplicaFetcherThread)
[2017-01-19 21:24:41,399] WARN [ReplicaFetcherThread-3-1009], Replica 1010
for partition some_topic-71 reset its fetch offset from 456158 to current
leader 1009's start offset 456189 (kafka.server.ReplicaFetcherThread)
[2017-01-19 21:24:41,400] WARN [ReplicaFetcherThread-0-1007], Replica 1010
for partition some_topic-84 reset its fetch offset from 399325 to current
leader 1007's start offset 399327 (kafka.server.ReplicaFetcherThread)
[2017-01-19 21:24:41,446] ERROR [ReplicaFetcherThread-2-1009], Current
offset 401243 for partition [some_topic,66] out of range; reset offset to
401376 (kafka.server.ReplicaFetcherThread)

If I understand these errors correctly, they are saying that the broker's
replica fetcher thread for these partitions failed to fetch at its current
offset because the leader's start offset is higher.  It basically says the
leader no longer has the messages at the offset requested.  That makes no
sense, as the topic is not configured to delete any messages.  I observed
these errors 512 times in total across all brokers while executing the
application.

>From there is seems to cascade to the Streams application:

INFO  2017-01-19 21:24:41,417 [StreamThread-4][Fetcher.java:714] : Fetch
offset 1051824 is out of range for partition some_topic-14, resetting offset
ERROR 2017-01-19 21:24:41,421 [StreamThread-4][StreamThread.java:249] :
stream-thread [StreamThread-4] Streams application error during processing:
java.lang.NullPointerException
at
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:341)
at
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:197)
at
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1018)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
INFO  2017-01-19 21:24:41,425 [StreamThread-4][StreamThread.java:268] :
stream-thread [StreamThread-4] Shutting down

Saw this error 731 times across the workers.

If we look at just one partition across brokers and workers and we group
the logs by time, we see this:

worker-3 INFO  2017-01-19 21:*24:41*,806 [StreamThread-3][Fetcher.java:714]
: Fetch offset 429851 is out of range for partition some_topic-94,
resetting offset

worker-3 INFO  2017-01-19 21:*29:41*,496 [StreamThread-1][Fetcher.java:714]
: Fetch offset 1317721 is out of range for partition some_topic-94,
resetting offset

worker-1 INFO  2017-01-19 21:*34:41*,977 [StreamThread-2][Fetcher.java:714]
: Fetch offset 2014017 is out of range for partition some_topic-94,
resetting offset

worker-1 INFO  2017-01-19 21:*39:41*,425 [StreamThread-3][Fetcher.java:714]
: Fetch offset 2588834 is out of range for partition some_topic-94,
resetting offset

broker-3 [2017-01-19 21:*44:41*,595] WARN [ReplicaFetcherThread-2-1007],
Replica 1008 for partition some_topic-94 reset its fetch offset from
3093739 to current leader 1007's start offset 3093742
(kafka.server.ReplicaFetcherThread)
broker-3 [2017-01-19 21:*44:41*,642] ERROR [ReplicaFetcherThread-2-1007],
Current offset 3093739 for partition [some_topic,94] out of range; reset
offset to 3093742 

Convert a KStream to KTable

2016-10-07 Thread Elias Levy
I am correct in assuming there is no way to convert a KStream into a
KTable, similar to KTable.toStream() but in the reverse direction, other
than using KSteam.reduceByKey and a Reducer or looping back through Kafka
and using KStreamBuilder.table?


Re: Time of derived records in Kafka Streams

2016-09-16 Thread Elias Levy
On Sat, Sep 10, 2016 at 9:17 AM, Eno Thereska 
wrote:

>
> For aggregations, the timestamp will be that of the latest record being
> aggregated.
>

How does that account for out of order records?

What about kstream-kstream joins?  The output from the join could be
triggered by a record received from either stream depending on the order
they are received and processed.  If the timestamp of the output is just
the timestamp of the latest received record, then it seems that the
timestamp could be that of either record.  Although I suppose that the best
effort stream synchronization effort that Kafka Streams attempts means that
usually the timestamp will be that of the later record.


Re: Unexpected KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy
https://issues.apache.org/jira/browse/KAFKA-4153
https://github.com/apache/kafka/pull/1846

On Mon, Sep 12, 2016 at 7:00 AM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> Any ideas?
>
>
> On Sunday, September 11, 2016, Elias Levy <fearsome.lucid...@gmail.com>
> wrote:
>
>> Using Kafka 0.10.0.1, I am joining records in two streams separated by
>> some time, but only when records from one stream are newer than records
>> from the other.
>>
>> I.e. I am doing:
>>
>>   stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))
>>
>> I would expect that the following would be equivalent:
>>
>>   stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))
>>
>> Alas, I find that this is not the case.  To generate the same output as
>> the first example I must do:
>>
>>   stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))
>>
>> What am I missing?
>>
>>
>>


Re: Unexpected KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy
Any ideas?

On Sunday, September 11, 2016, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> Using Kafka 0.10.0.1, I am joining records in two streams separated by
> some time, but only when records from one stream are newer than records
> from the other.
>
> I.e. I am doing:
>
>   stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))
>
> I would expect that the following would be equivalent:
>
>   stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))
>
> Alas, I find that this is not the case.  To generate the same output as
> the first example I must do:
>
>   stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))
>
> What am I missing?
>
>
>


Unexpected KStream-KStream join behavior with asymmetric time window

2016-09-11 Thread Elias Levy
Using Kafka 0.10.0.1, I am joining records in two streams separated by some
time, but only when records from one stream are newer than records from the
other.

I.e. I am doing:

  stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))

I would expect that the following would be equivalent:

  stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))

Alas, I find that this is not the case.  To generate the same output as the
first example I must do:

  stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))

What am I missing?


Time of derived records in Kafka Streams

2016-09-09 Thread Elias Levy
The Kafka Streams documentation discussed how to assign timestamps to
records received from source topic via TimestampExtractor.  But neither the
Kafka nor the Confluent documentation on Kafka Streams explain what
timestamp is associated with a record that has been transformed.

What timestamp is associated with records that are output by stateless
transformations like map or flatMap?

What timestamp is associated with records that are outputted by stageful
transformations like aggregations or joins?

What about transformations on windows?

What timestamp does the Kafka publisher use, if any, when writing to an
intermediate topic via through() or a sink via to()?


Re: Heartbeating during long processing times

2016-07-06 Thread Elias Levy
Shikhar,

Thanks for pointing me to KIP-62.  Once implemented, it will make workers
that take a long time processing messages a lot simpler to implement.
Until then, we have to continue using the pause/poll/resume pattern.  That
said, as fares I can tell, this pattern has not been well documented.

It appears the issue I observed is the result of consumer rebalancing. When
a consumer with paused partitions calls poll to trigger a heartbeat, the
client will process any pending consumer rebalances.  The rebalance will
potentially result in the addition of newly assigned unpaused partitions.
Worse is the fact that already assigned partitions that were paused and
that continue to be assigned to the client after the rebalance will be
become unpaused. I consider this a bug in the client.  Paused partitions
should not be unpaused during a rebalance if they continue to be assigned
to the client.  So pause/poll/resume is not sufficient for a worker that
handles messages with long processing times.  One must also implement a
ConsumerRebalanceListener that pauses all assigned partitions if the
consumer is in the middle of processing a message.



On Fri, Jul 1, 2016 at 11:52 AM, Shikhar Bhushan <shik...@confluent.io>
wrote:

> Hi Elias,
>
> KIP-62
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> >
> has a discussion of current options, and the improvements that are coming.
>
> Best,
>
> Shikhar
>
> On Thu, Jun 30, 2016 at 6:02 PM Elias Levy <fearsome.lucid...@gmail.com>
> wrote:
>
> > What is the officially recommended method to heartbeat using the new Java
> > consumer during long message processing times?
> >
> > I thought I could accomplish this by setting max.poll.records to 1 in the
> > client, calling consumer.pause(consumer.assignment()) when starting to
> > process a record, calling consumer.resume(consumer.paused()) when done
> > processing a record and committing its offset, and calling
> consumer.poll(0)
> > intermittently while processing the record.
> >
> > The testing shows that consumer.poll(0) will return records, rather than
> > returning nil or an empty ConsumerRecords.
> >
>


Heartbeating during long processing times

2016-06-30 Thread Elias Levy
What is the officially recommended method to heartbeat using the new Java
consumer during long message processing times?

I thought I could accomplish this by setting max.poll.records to 1 in the
client, calling consumer.pause(consumer.assignment()) when starting to
process a record, calling consumer.resume(consumer.paused()) when done
processing a record and committing its offset, and calling consumer.poll(0)
intermittently while processing the record.

The testing shows that consumer.poll(0) will return records, rather than
returning nil or an empty ConsumerRecords.


Consumer pause/resume & partition assignment race condition

2016-06-24 Thread Elias Levy
While performing some prototyping on 0.10.0.0 using the new client API I
noticed that some some clients fail to drain their topic partitions.

The Kafka cluster is comprised of 3 nodes.  The topic in question has been
preloaded with messages.  The topic has 50 partitions.  The messages were
loaded without a key, so they should be spread in a round robin fashion.
The kafka-consumer-groups command shows that each partition has a
log-end-offset of 137, except for one partition at 136.

The worker is a simple single threaded client.  As mentioned, it uses the
new consumer API.  The consumer is configured to fetch a single record at a
time by setting the max.poll.record config property to 1.  The worker
handles commits and sets enable.auto.commit to false.

The worker can take substantial time processing the messages.  To avoid
timing out the Kafka connection, the worker calls consumer.pause() with the
results of consumer.assignment() when it starts processing the message,
calls consumer.poll(0) at regular intervals while processing the message to
trigger heartbeats to Kafka, and calls consumer.resume() with the result of
a call to consumer.assignment() when it is done processing the message and
it has committed the offset for the message using consumer.commitSync().

Note that when calling consumer.resume() I pass in the results of a fresh
call to consumer.assignment().  Passing in the results of the results to
the previous call to consumer.assignment(), the ones used when calling
consumer.pause(), would result in an exception if partitions were
reassigned while the worker was processing the message, as it may happen
when workers join the consumer group.  I presume this mean it call to
assignment() generates a call to the consumer coordinator in the cluster to
obtain the latest assignments rather than returning a locally cached copy
of assignments.

The test used four worker nodes running four workers each, for sixteen
total workers.

kafka-consumer-groups.sh shows that all partitions have been assigned to a
worker, and that the workers successfully processed most partitions, 29 out
of 50, to completion (lag is 0).

5 partition appear to not have been processed at all, with unknown shown
for current-offset and lag, and 16 partitions have processed some messages
but not all.  In either case, the workers believe there are no more
messages to fetch.  When they call poll with a timeout, it eventually
returns with no messages.  The workers show no errors and continue to run.

That indicates to me that the workers and cluster disagree on partition
assignment.  Thus, the consumer is not asking for messages on partitions
the broker has assigned to it, and messages on those partitions are not
processed.

My guess is that partition assignments are being changed after by call to
consumer.assignment() and consumer.resume().

Presumably I can solve this issue by implementing a
ConsumerRebalanceListener and updating the assigning I call resume() with
whenever onPartitionsRevoked and onPartitionsAssigned are called.

Ideally, the Consumer interface would allow you to call pause() and
resume() without a list of topic partitions, which would pause and resume
fetching from all assigned partitions, which the client already is keeping
track off.

Thoughts?  Suggestions?


Re: KAFKA-1499 compression.type

2016-01-15 Thread Elias Levy
Anyone?

On Thu, Jan 14, 2016 at 8:42 PM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> The description of the compression.type config property in the
> documentation is somewhat confusing.  It begins with "Specify the final
> compression type for a given topic.", yet it is defined as a broker
> configuration property and it is not listed under topic-level configuration
> properties.
>
> Reading the discussion in KAFKA-1499 leads me to believe that the broker
> level property it a default that can be overridden by using the same
> property at the topic level.
>
> Is this correct?
>
> If so, it would be best to make the documentation clearer and to add the
> property to the topic-level config properties section in addition to the
> broker level config section.
>


KAFKA-1499 compression.type

2016-01-14 Thread Elias Levy
The description of the compression.type config property in the
documentation is somewhat confusing.  It begins with "Specify the final
compression type for a given topic.", yet it is defined as a broker
configuration property and it is not listed under topic-level configuration
properties.

Reading the discussion in KAFKA-1499 leads me to believe that the broker
level property it a default that can be overridden by using the same
property at the topic level.

Is this correct?

If so, it would be best to make the documentation clearer and to add the
property to the topic-level config properties section in addition to the
broker level config section.


Kafka and Btrfs

2015-10-27 Thread Elias Levy
Anyone using Kafka with Brtfs successfully? Any recommendations against
taking that path?

Elias


Kafka on EC2 with ephemeral storage mirrored to EBS

2015-10-21 Thread Elias Levy
I am curious if anyone has attempted to run Kafka on EC2 using ephemeral
storage for the logs (I am looking to use the I2 or D2 instance types), but
actively copying the logs into an EBS volume to aid in bringing a dead
broker back to life faster, so that it doesn't have to replicate all
messages from peers when it restarts (there would be a script during
restore that would copy the EBS backup into the local storage).


Re: Kafka availability guarantee

2015-10-11 Thread Elias Levy
On Sun, Oct 11, 2015 at 2:34 PM, Todd Palino  wrote:

> To answer the question, yes, it is incorrect. There are a few things you
> can do to minimize problems. One is to disable unclean leader election, use
> acks=-1 on the producers, have an RF of 3 or greater, and set the min ISR
> to 2. This means that the topic will only be available if there are at
> least 2 replicas in sync, your producers will all wait for acknowledgements
> from all in sync replicas (therefore, at least 2) before considering
> produce requests to be complete, and if you get in a situation where all
> three replicas go down, the cluster will not perform an unclean leader
> election (which can lose messages).
>
> Basically, you have to trade availability for correctness here. You get to
> pick one.
>

Thanks.  I figured as much, but its good to have official confirmation.

It may be good to clarify the section of the documentation I quoted least
folks get the wrong impression, as the reality is that regardless of how
high the replication factor is Kafka can lose messages with a single node
failure if the in sync replica set is allowed to shrink to a single member.

Cheers.


Kafka availability guarantee

2015-10-11 Thread Elias Levy
Reading through the Kafka documentation for statements regarding Kafka's
availability guarantees one comes across this statement:

*With this ISR model and f+1 replicas, a Kafka topic can tolerate f
failures without losing committed messages.*

In my opinion, this appears incorrect or at best misleading.  Consider a
partition with a replication factor of 3.  If one of the replicas lags, but
does not fail, the ISR will be shrank to a set of 2 replicas, the leader
and and one follower.  The leader will consider the message committed when
itself and the in sync follower write the message to their respective
logs.  Where a concurrent failure of 2 nodes occur, specifically the
failure of the leader and the in sync follower, there won't be any
remaining in sync replicas to take over as leader without potential message
loss.  Therefore Kafka cannot tolerate any failure of *f* nodes, where *f*
is N - 1 and N is the replication factor. Kafka can only tolerate a failure
of *f* if we take N to be the ISR set size, which is a dynamic value and
not a topic configuration parameter that can me set a priori. Kafka can
tolerate some failures of *f* replicas when N is the replication factor, so
long as at least one in sync replica survives, but it can't tolerate all
such failures.

Am I wrong?