Hi everyone,
my team is building real-time system using Samza (version 0.11.0) and we are 
facing some issues with data loss so we would like to hear your thoughts.

Due to using some additional tools for monitoring and alerting we exceeded 
number of allowed open files so TooManyOpenFiles exception caused our brokers 
to fail.
After fixing this issue failed brokers and all Samza jobs were restarted. Issue 
was gone but it seems like we are constantly losing almost half or the messages 
from some of our topics after this incident.
To keep things as simple as possible I will focus just on a small part of the 
pipeline. On the picture below  we can see two topics, both with 80 partitions, 
that are input and output for one of our Samza jobs. Number of messages in 
those topics should be the same but we see that output topic has almost two 
times less messages than the input one. There is no some kind of bottleneck so 
messages are not kept in Kafka for too long and they are not deleted by log 
retention before processing.

http://ibb.co/iJrDbF

Another strange thing is that some old messages are appearing after day or two. 
All this is leading us to conclusion that Samza's consumers are somehow not 
aware of all of the partitions. Is it possible that consumers are not aware of 
new partition leaders, since new leader selection occurred after broker 
failures, and somehow they are trying to get data from the old ones that are 
not the leaders anymore and have a lower offsets meaning that a new messages 
are skipped. Is there some kind of topic metadata caching that could lead us to 
this situation? While debugging we discovered KafkaSystemConsumer exception 
that says no leader for partition. Looking at the Kafka Manager all partitions 
have their leaders.


Here are some additional details that might be useful.

Our Samza jobs are built on top of Samza v 0.11.0.
Kafka 0.8.2.1 consumers/producers are used in jobs.

Kafka cluster:
- 8 brokers
- Kafka version 0.10.1
- unclean.leader.election.enable false
- replica.lag.time.max.ms 10000
- log.flush.interval.ms 7200000

Input topic:
- segment.bytes 2147483647
- retention.ms 172800000

Some warnings from logs:
Error processing fetch operation on partition [topic_name,35], offset 232841013 
(kafka.server.ReplicaManager)
java.lang.IllegalStateException: Failed to read complete buffer for 
targetOffset 241769924 startPosition 2147479938 in

BrokerProxy [WARN] Got non-recoverable error codes during multifetch. Throwing 
an exception to trigger reconnect. Errors: 
Error([topic_name,47],-1,kafka.common.UnknownException

BrokerProxy [WARN] It appears that we received an invalid or empty offset 
Some(366399914) for [topic_name,60]. Attempting to use Kafka's 
auto.offset.reset setting. This can result in data loss if processing continues.

Any help and suggestion will be appreciated.

Thanks,
Aleksandar Bircakovic

Reply via email to