Re: Shutting down a Streams job
It is certainly possible, but when you got dozens of workers, that would take a very long time, specially if you got a lot of state, as partitions get reassigned and state moved about. In fact, it is likely to fail at some point, as local state that can be stored in a multitude of nodes may not be able to be stored locally as the number of nodes becomes smaller. On Wed, Feb 8, 2017 at 12:34 PM, Dmitry Minkovsky <dminkov...@gmail.com> wrote: > Can you take them down sequentially? Like, say, with a Kubernetes > StatefulSet > <https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful- > set/#ordered-pod-termination> > . > > On Wed, Feb 8, 2017 at 2:15 PM, Elias Levy <fearsome.lucid...@gmail.com> > wrote: > > > What are folks doing to cleanly shutdown a Streams job comprised of > > multiple workers? > > > > Right now I am doing sys.addShutdownHook(streams.close()) but that is > not > > working well to shutdown a fleet of workers. When I signal the fleet to > > shutdown by sending them all a SIGTERM, some of them will shutdown, but > > some will persist. It appears that there is a race condition between the > > shutdown signal and a rebalancing occurring as a result of other workers > > shutting down. If a worker has not started shutting down before the > > rebalancing starts, the rebalancing will cause the worker to not > shutdown. > > > > Others seeing the same thing? > > >
Shutting down a Streams job
What are folks doing to cleanly shutdown a Streams job comprised of multiple workers? Right now I am doing sys.addShutdownHook(streams.close()) but that is not working well to shutdown a fleet of workers. When I signal the fleet to shutdown by sending them all a SIGTERM, some of them will shutdown, but some will persist. It appears that there is a race condition between the shutdown signal and a rebalancing occurring as a result of other workers shutting down. If a worker has not started shutting down before the rebalancing starts, the rebalancing will cause the worker to not shutdown. Others seeing the same thing?
Re: Fetch offset out of range errors while testing Streams application
Guozhang, Thanks for the reply. I figured it out after a while. Indeed, the global default time based retention was tripping me. I was using older data for testing and publishing messages with explicit timestamps. It took me a while to figure out what was happening because kafka-topics.sh does not display the parameters of a topic that have a default value. Thus, I was under the impression that the topic's data was being kept indefinitely. It would be best if kaka-topic.sh displayed all configuration for a topic, or at least something as important as the retention period, even if the value comes from a global default. On Sun, Jan 22, 2017 at 3:29 PM, Guozhang Wangwrote: > Note that for log retention, Kafka brokers have a global config that could > be applied to any topics, and topics themselves have a per-topic config > that can override the broker-level global config, you may want to check > both the broker configs as well as the topic configs (e.g. with the > kafka-topics command tool) to make sure that time-based retention is > properly set on both levels. >
Re: Fetch offset out of range errors while testing Streams application
Suggestions? On Thu, Jan 19, 2017 at 6:23 PM, Elias Levy <fearsome.lucid...@gmail.com> wrote: > In the process of testing a Kafka Streams application I've come across a > few issues that are baffling me. > > For testing I am executing a job on 20 nodes with four cores per node, > each instance configured to use 4 threads, against a 5 node broker cluster > running 0.10.1.1. > > Before execution kafka-streams-application-reset.sh is ran to reset > offset of input topics to zero. The app calls KafkaStreams.cleanUp() on > startup to clean up the local state stores. All workers are started > simultaneously. All topics have 100 partitions. The main input topic has > 1TB of data 3x replicated. min.insync.replicas is set to 3. > > The application consumes from the main input topic, transforms the input, > and repartitions it using KStream.through() to write to another topic. It > reads from the repartitioned topic and continues processing. > > In the brokers we are seeing errors such as: > > [2017-01-19 21:24:41,298] WARN [ReplicaFetcherThread-3-1009], Replica 1010 > for partition some_topic-91 reset its fetch offset from 424762 to current > leader 1009's start offset 424779 (kafka.server.ReplicaFetcherThread) > [2017-01-19 21:24:41,350] WARN [ReplicaFetcherThread-2-1009], Replica 1010 > for partition some_topic-66 reset its fetch offset from 401243 to current > leader 1009's start offset 401376 (kafka.server.ReplicaFetcherThread) > [2017-01-19 21:24:41,381] ERROR [ReplicaFetcherThread-3-1009], Current > offset 424762 for partition [some_topic,91] out of range; reset offset to > 424779 (kafka.server.ReplicaFetcherThread) > [2017-01-19 21:24:41,399] WARN [ReplicaFetcherThread-3-1009], Replica 1010 > for partition some_topic-71 reset its fetch offset from 456158 to current > leader 1009's start offset 456189 (kafka.server.ReplicaFetcherThread) > [2017-01-19 21:24:41,400] WARN [ReplicaFetcherThread-0-1007], Replica 1010 > for partition some_topic-84 reset its fetch offset from 399325 to current > leader 1007's start offset 399327 (kafka.server.ReplicaFetcherThread) > [2017-01-19 21:24:41,446] ERROR [ReplicaFetcherThread-2-1009], Current > offset 401243 for partition [some_topic,66] out of range; reset offset to > 401376 (kafka.server.ReplicaFetcherThread) > > If I understand these errors correctly, they are saying that the broker's > replica fetcher thread for these partitions failed to fetch at its current > offset because the leader's start offset is higher. It basically says the > leader no longer has the messages at the offset requested. That makes no > sense, as the topic is not configured to delete any messages. I observed > these errors 512 times in total across all brokers while executing the > application. > > From there is seems to cascade to the Streams application: > > INFO 2017-01-19 21:24:41,417 [StreamThread-4][Fetcher.java:714] : Fetch > offset 1051824 is out of range for partition some_topic-14, resetting offset > ERROR 2017-01-19 21:24:41,421 [StreamThread-4][StreamThread.java:249] : > stream-thread [StreamThread-4] Streams application error during processing: > java.lang.NullPointerException > at org.apache.kafka.clients.consumer.internals.Fetcher. > resetOffset(Fetcher.java:341) > at org.apache.kafka.clients.consumer.internals.Fetcher. > resetOffsetsIfNeeded(Fetcher.java:197) > at org.apache.kafka.clients.consumer.KafkaConsumer. > updateFetchPositions(KafkaConsumer.java:1524) > at org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1018) > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:979) > at org.apache.kafka.streams.processor.internals. > StreamThread.runLoop(StreamThread.java:407) > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > INFO 2017-01-19 21:24:41,425 [StreamThread-4][StreamThread.java:268] : > stream-thread [StreamThread-4] Shutting down > > Saw this error 731 times across the workers. > > If we look at just one partition across brokers and workers and we group > the logs by time, we see this: > > worker-3 INFO 2017-01-19 21:*24:41*,806 [StreamThread-3][Fetcher.java:714] > : Fetch offset 429851 is out of range for partition some_topic-94, > resetting offset > > worker-3 INFO 2017-01-19 21:*29:41*,496 [StreamThread-1][Fetcher.java:714] > : Fetch offset 1317721 is out of range for partition some_topic-94, > resetting offset > > worker-1 INFO 2017-01-19 21:*34:41*,977 [StreamThread-2][Fetcher.java:714] > : Fetch offset 2014017 is out of range for partition some_topic-94, > resetting offset > > worker-1 INFO 2017-01-19 21:*39:41*,425 [StreamThre
Fetch offset out of range errors while testing Streams application
In the process of testing a Kafka Streams application I've come across a few issues that are baffling me. For testing I am executing a job on 20 nodes with four cores per node, each instance configured to use 4 threads, against a 5 node broker cluster running 0.10.1.1. Before execution kafka-streams-application-reset.sh is ran to reset offset of input topics to zero. The app calls KafkaStreams.cleanUp() on startup to clean up the local state stores. All workers are started simultaneously. All topics have 100 partitions. The main input topic has 1TB of data 3x replicated. min.insync.replicas is set to 3. The application consumes from the main input topic, transforms the input, and repartitions it using KStream.through() to write to another topic. It reads from the repartitioned topic and continues processing. In the brokers we are seeing errors such as: [2017-01-19 21:24:41,298] WARN [ReplicaFetcherThread-3-1009], Replica 1010 for partition some_topic-91 reset its fetch offset from 424762 to current leader 1009's start offset 424779 (kafka.server.ReplicaFetcherThread) [2017-01-19 21:24:41,350] WARN [ReplicaFetcherThread-2-1009], Replica 1010 for partition some_topic-66 reset its fetch offset from 401243 to current leader 1009's start offset 401376 (kafka.server.ReplicaFetcherThread) [2017-01-19 21:24:41,381] ERROR [ReplicaFetcherThread-3-1009], Current offset 424762 for partition [some_topic,91] out of range; reset offset to 424779 (kafka.server.ReplicaFetcherThread) [2017-01-19 21:24:41,399] WARN [ReplicaFetcherThread-3-1009], Replica 1010 for partition some_topic-71 reset its fetch offset from 456158 to current leader 1009's start offset 456189 (kafka.server.ReplicaFetcherThread) [2017-01-19 21:24:41,400] WARN [ReplicaFetcherThread-0-1007], Replica 1010 for partition some_topic-84 reset its fetch offset from 399325 to current leader 1007's start offset 399327 (kafka.server.ReplicaFetcherThread) [2017-01-19 21:24:41,446] ERROR [ReplicaFetcherThread-2-1009], Current offset 401243 for partition [some_topic,66] out of range; reset offset to 401376 (kafka.server.ReplicaFetcherThread) If I understand these errors correctly, they are saying that the broker's replica fetcher thread for these partitions failed to fetch at its current offset because the leader's start offset is higher. It basically says the leader no longer has the messages at the offset requested. That makes no sense, as the topic is not configured to delete any messages. I observed these errors 512 times in total across all brokers while executing the application. >From there is seems to cascade to the Streams application: INFO 2017-01-19 21:24:41,417 [StreamThread-4][Fetcher.java:714] : Fetch offset 1051824 is out of range for partition some_topic-14, resetting offset ERROR 2017-01-19 21:24:41,421 [StreamThread-4][StreamThread.java:249] : stream-thread [StreamThread-4] Streams application error during processing: java.lang.NullPointerException at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:341) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:197) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1018) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) INFO 2017-01-19 21:24:41,425 [StreamThread-4][StreamThread.java:268] : stream-thread [StreamThread-4] Shutting down Saw this error 731 times across the workers. If we look at just one partition across brokers and workers and we group the logs by time, we see this: worker-3 INFO 2017-01-19 21:*24:41*,806 [StreamThread-3][Fetcher.java:714] : Fetch offset 429851 is out of range for partition some_topic-94, resetting offset worker-3 INFO 2017-01-19 21:*29:41*,496 [StreamThread-1][Fetcher.java:714] : Fetch offset 1317721 is out of range for partition some_topic-94, resetting offset worker-1 INFO 2017-01-19 21:*34:41*,977 [StreamThread-2][Fetcher.java:714] : Fetch offset 2014017 is out of range for partition some_topic-94, resetting offset worker-1 INFO 2017-01-19 21:*39:41*,425 [StreamThread-3][Fetcher.java:714] : Fetch offset 2588834 is out of range for partition some_topic-94, resetting offset broker-3 [2017-01-19 21:*44:41*,595] WARN [ReplicaFetcherThread-2-1007], Replica 1008 for partition some_topic-94 reset its fetch offset from 3093739 to current leader 1007's start offset 3093742 (kafka.server.ReplicaFetcherThread) broker-3 [2017-01-19 21:*44:41*,642] ERROR [ReplicaFetcherThread-2-1007], Current offset 3093739 for partition [some_topic,94] out of range; reset offset to 3093742
Convert a KStream to KTable
I am correct in assuming there is no way to convert a KStream into a KTable, similar to KTable.toStream() but in the reverse direction, other than using KSteam.reduceByKey and a Reducer or looping back through Kafka and using KStreamBuilder.table?
Re: Time of derived records in Kafka Streams
On Sat, Sep 10, 2016 at 9:17 AM, Eno Thereskawrote: > > For aggregations, the timestamp will be that of the latest record being > aggregated. > How does that account for out of order records? What about kstream-kstream joins? The output from the join could be triggered by a record received from either stream depending on the order they are received and processed. If the timestamp of the output is just the timestamp of the latest received record, then it seems that the timestamp could be that of either record. Although I suppose that the best effort stream synchronization effort that Kafka Streams attempts means that usually the timestamp will be that of the later record.
Re: Unexpected KStream-KStream join behavior with asymmetric time window
https://issues.apache.org/jira/browse/KAFKA-4153 https://github.com/apache/kafka/pull/1846 On Mon, Sep 12, 2016 at 7:00 AM, Elias Levy <fearsome.lucid...@gmail.com> wrote: > Any ideas? > > > On Sunday, September 11, 2016, Elias Levy <fearsome.lucid...@gmail.com> > wrote: > >> Using Kafka 0.10.0.1, I am joining records in two streams separated by >> some time, but only when records from one stream are newer than records >> from the other. >> >> I.e. I am doing: >> >> stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1)) >> >> I would expect that the following would be equivalent: >> >> stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1)) >> >> Alas, I find that this is not the case. To generate the same output as >> the first example I must do: >> >> stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1)) >> >> What am I missing? >> >> >>
Re: Unexpected KStream-KStream join behavior with asymmetric time window
Any ideas? On Sunday, September 11, 2016, Elias Levy <fearsome.lucid...@gmail.com> wrote: > Using Kafka 0.10.0.1, I am joining records in two streams separated by > some time, but only when records from one stream are newer than records > from the other. > > I.e. I am doing: > > stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1)) > > I would expect that the following would be equivalent: > > stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1)) > > Alas, I find that this is not the case. To generate the same output as > the first example I must do: > > stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1)) > > What am I missing? > > >
Unexpected KStream-KStream join behavior with asymmetric time window
Using Kafka 0.10.0.1, I am joining records in two streams separated by some time, but only when records from one stream are newer than records from the other. I.e. I am doing: stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1)) I would expect that the following would be equivalent: stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1)) Alas, I find that this is not the case. To generate the same output as the first example I must do: stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1)) What am I missing?
Time of derived records in Kafka Streams
The Kafka Streams documentation discussed how to assign timestamps to records received from source topic via TimestampExtractor. But neither the Kafka nor the Confluent documentation on Kafka Streams explain what timestamp is associated with a record that has been transformed. What timestamp is associated with records that are output by stateless transformations like map or flatMap? What timestamp is associated with records that are outputted by stageful transformations like aggregations or joins? What about transformations on windows? What timestamp does the Kafka publisher use, if any, when writing to an intermediate topic via through() or a sink via to()?
Re: Heartbeating during long processing times
Shikhar, Thanks for pointing me to KIP-62. Once implemented, it will make workers that take a long time processing messages a lot simpler to implement. Until then, we have to continue using the pause/poll/resume pattern. That said, as fares I can tell, this pattern has not been well documented. It appears the issue I observed is the result of consumer rebalancing. When a consumer with paused partitions calls poll to trigger a heartbeat, the client will process any pending consumer rebalances. The rebalance will potentially result in the addition of newly assigned unpaused partitions. Worse is the fact that already assigned partitions that were paused and that continue to be assigned to the client after the rebalance will be become unpaused. I consider this a bug in the client. Paused partitions should not be unpaused during a rebalance if they continue to be assigned to the client. So pause/poll/resume is not sufficient for a worker that handles messages with long processing times. One must also implement a ConsumerRebalanceListener that pauses all assigned partitions if the consumer is in the middle of processing a message. On Fri, Jul 1, 2016 at 11:52 AM, Shikhar Bhushan <shik...@confluent.io> wrote: > Hi Elias, > > KIP-62 > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > > > has a discussion of current options, and the improvements that are coming. > > Best, > > Shikhar > > On Thu, Jun 30, 2016 at 6:02 PM Elias Levy <fearsome.lucid...@gmail.com> > wrote: > > > What is the officially recommended method to heartbeat using the new Java > > consumer during long message processing times? > > > > I thought I could accomplish this by setting max.poll.records to 1 in the > > client, calling consumer.pause(consumer.assignment()) when starting to > > process a record, calling consumer.resume(consumer.paused()) when done > > processing a record and committing its offset, and calling > consumer.poll(0) > > intermittently while processing the record. > > > > The testing shows that consumer.poll(0) will return records, rather than > > returning nil or an empty ConsumerRecords. > > >
Heartbeating during long processing times
What is the officially recommended method to heartbeat using the new Java consumer during long message processing times? I thought I could accomplish this by setting max.poll.records to 1 in the client, calling consumer.pause(consumer.assignment()) when starting to process a record, calling consumer.resume(consumer.paused()) when done processing a record and committing its offset, and calling consumer.poll(0) intermittently while processing the record. The testing shows that consumer.poll(0) will return records, rather than returning nil or an empty ConsumerRecords.
Consumer pause/resume & partition assignment race condition
While performing some prototyping on 0.10.0.0 using the new client API I noticed that some some clients fail to drain their topic partitions. The Kafka cluster is comprised of 3 nodes. The topic in question has been preloaded with messages. The topic has 50 partitions. The messages were loaded without a key, so they should be spread in a round robin fashion. The kafka-consumer-groups command shows that each partition has a log-end-offset of 137, except for one partition at 136. The worker is a simple single threaded client. As mentioned, it uses the new consumer API. The consumer is configured to fetch a single record at a time by setting the max.poll.record config property to 1. The worker handles commits and sets enable.auto.commit to false. The worker can take substantial time processing the messages. To avoid timing out the Kafka connection, the worker calls consumer.pause() with the results of consumer.assignment() when it starts processing the message, calls consumer.poll(0) at regular intervals while processing the message to trigger heartbeats to Kafka, and calls consumer.resume() with the result of a call to consumer.assignment() when it is done processing the message and it has committed the offset for the message using consumer.commitSync(). Note that when calling consumer.resume() I pass in the results of a fresh call to consumer.assignment(). Passing in the results of the results to the previous call to consumer.assignment(), the ones used when calling consumer.pause(), would result in an exception if partitions were reassigned while the worker was processing the message, as it may happen when workers join the consumer group. I presume this mean it call to assignment() generates a call to the consumer coordinator in the cluster to obtain the latest assignments rather than returning a locally cached copy of assignments. The test used four worker nodes running four workers each, for sixteen total workers. kafka-consumer-groups.sh shows that all partitions have been assigned to a worker, and that the workers successfully processed most partitions, 29 out of 50, to completion (lag is 0). 5 partition appear to not have been processed at all, with unknown shown for current-offset and lag, and 16 partitions have processed some messages but not all. In either case, the workers believe there are no more messages to fetch. When they call poll with a timeout, it eventually returns with no messages. The workers show no errors and continue to run. That indicates to me that the workers and cluster disagree on partition assignment. Thus, the consumer is not asking for messages on partitions the broker has assigned to it, and messages on those partitions are not processed. My guess is that partition assignments are being changed after by call to consumer.assignment() and consumer.resume(). Presumably I can solve this issue by implementing a ConsumerRebalanceListener and updating the assigning I call resume() with whenever onPartitionsRevoked and onPartitionsAssigned are called. Ideally, the Consumer interface would allow you to call pause() and resume() without a list of topic partitions, which would pause and resume fetching from all assigned partitions, which the client already is keeping track off. Thoughts? Suggestions?
Re: KAFKA-1499 compression.type
Anyone? On Thu, Jan 14, 2016 at 8:42 PM, Elias Levy <fearsome.lucid...@gmail.com> wrote: > The description of the compression.type config property in the > documentation is somewhat confusing. It begins with "Specify the final > compression type for a given topic.", yet it is defined as a broker > configuration property and it is not listed under topic-level configuration > properties. > > Reading the discussion in KAFKA-1499 leads me to believe that the broker > level property it a default that can be overridden by using the same > property at the topic level. > > Is this correct? > > If so, it would be best to make the documentation clearer and to add the > property to the topic-level config properties section in addition to the > broker level config section. >
KAFKA-1499 compression.type
The description of the compression.type config property in the documentation is somewhat confusing. It begins with "Specify the final compression type for a given topic.", yet it is defined as a broker configuration property and it is not listed under topic-level configuration properties. Reading the discussion in KAFKA-1499 leads me to believe that the broker level property it a default that can be overridden by using the same property at the topic level. Is this correct? If so, it would be best to make the documentation clearer and to add the property to the topic-level config properties section in addition to the broker level config section.
Kafka and Btrfs
Anyone using Kafka with Brtfs successfully? Any recommendations against taking that path? Elias
Kafka on EC2 with ephemeral storage mirrored to EBS
I am curious if anyone has attempted to run Kafka on EC2 using ephemeral storage for the logs (I am looking to use the I2 or D2 instance types), but actively copying the logs into an EBS volume to aid in bringing a dead broker back to life faster, so that it doesn't have to replicate all messages from peers when it restarts (there would be a script during restore that would copy the EBS backup into the local storage).
Re: Kafka availability guarantee
On Sun, Oct 11, 2015 at 2:34 PM, Todd Palinowrote: > To answer the question, yes, it is incorrect. There are a few things you > can do to minimize problems. One is to disable unclean leader election, use > acks=-1 on the producers, have an RF of 3 or greater, and set the min ISR > to 2. This means that the topic will only be available if there are at > least 2 replicas in sync, your producers will all wait for acknowledgements > from all in sync replicas (therefore, at least 2) before considering > produce requests to be complete, and if you get in a situation where all > three replicas go down, the cluster will not perform an unclean leader > election (which can lose messages). > > Basically, you have to trade availability for correctness here. You get to > pick one. > Thanks. I figured as much, but its good to have official confirmation. It may be good to clarify the section of the documentation I quoted least folks get the wrong impression, as the reality is that regardless of how high the replication factor is Kafka can lose messages with a single node failure if the in sync replica set is allowed to shrink to a single member. Cheers.
Kafka availability guarantee
Reading through the Kafka documentation for statements regarding Kafka's availability guarantees one comes across this statement: *With this ISR model and f+1 replicas, a Kafka topic can tolerate f failures without losing committed messages.* In my opinion, this appears incorrect or at best misleading. Consider a partition with a replication factor of 3. If one of the replicas lags, but does not fail, the ISR will be shrank to a set of 2 replicas, the leader and and one follower. The leader will consider the message committed when itself and the in sync follower write the message to their respective logs. Where a concurrent failure of 2 nodes occur, specifically the failure of the leader and the in sync follower, there won't be any remaining in sync replicas to take over as leader without potential message loss. Therefore Kafka cannot tolerate any failure of *f* nodes, where *f* is N - 1 and N is the replication factor. Kafka can only tolerate a failure of *f* if we take N to be the ISR set size, which is a dynamic value and not a topic configuration parameter that can me set a priori. Kafka can tolerate some failures of *f* replicas when N is the replication factor, so long as at least one in sync replica survives, but it can't tolerate all such failures. Am I wrong?