Hi everyone, my team is building real-time system using Samza (version 0.11.0) and we are facing some issues with data loss so we would like to hear your thoughts.
Due to using some additional tools for monitoring and alerting we exceeded number of allowed open files so TooManyOpenFiles exception caused our brokers to fail. After fixing this issue failed brokers and all Samza jobs were restarted. Issue was gone but it seems like we are constantly losing almost half or the messages from some of our topics after this incident. To keep things as simple as possible I will focus just on a small part of the pipeline. On the picture below we can see two topics, both with 80 partitions, that are input and output for one of our Samza jobs. Number of messages in those topics should be the same but we see that output topic has almost two times less messages than the input one. There is no some kind of bottleneck so messages are not kept in Kafka for too long and they are not deleted by log retention before processing. http://ibb.co/iJrDbF Another strange thing is that some old messages are appearing after day or two. All this is leading us to conclusion that Samza's consumers are somehow not aware of all of the partitions. Is it possible that consumers are not aware of new partition leaders, since new leader selection occurred after broker failures, and somehow they are trying to get data from the old ones that are not the leaders anymore and have a lower offsets meaning that a new messages are skipped. Is there some kind of topic metadata caching that could lead us to this situation? While debugging we discovered KafkaSystemConsumer exception that says no leader for partition. Looking at the Kafka Manager all partitions have their leaders. Here are some additional details that might be useful. Our Samza jobs are built on top of Samza v 0.11.0. Kafka 0.8.2.1 consumers/producers are used in jobs. Kafka cluster: - 8 brokers - Kafka version 0.10.1 - unclean.leader.election.enable false - replica.lag.time.max.ms 10000 - log.flush.interval.ms 7200000 Input topic: - segment.bytes 2147483647 - retention.ms 172800000 Some warnings from logs: Error processing fetch operation on partition [topic_name,35], offset 232841013 (kafka.server.ReplicaManager) java.lang.IllegalStateException: Failed to read complete buffer for targetOffset 241769924 startPosition 2147479938 in BrokerProxy [WARN] Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: Error([topic_name,47],-1,kafka.common.UnknownException BrokerProxy [WARN] It appears that we received an invalid or empty offset Some(366399914) for [topic_name,60]. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues. Any help and suggestion will be appreciated. Thanks, Aleksandar Bircakovic