[ 
https://issues.apache.org/jira/browse/KAFKA-1657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1657.
------------------------------
    Resolution: Won't Fix

I think the ultimate solution here is to move to the new consumer which should 
fix all this.

> Fetch request using Simple consumer fails due to failed due to Leader not 
> local for partition
> ---------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1657
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1657
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.1.1
>            Reporter: aarti gupta
>
> I have a three node Kafka cluster, running on the same physical machine, (on 
> different ports)
>  with replication factor = 3, and a single topic with 3 partitions.
> Multiple producers write to the topic, and a custom partitioner is used to 
> direct messages to a given partition.
> I use the simple consumer to read from a given partition of the topic, and 
> have three instances of my consumer running
> The code snippet for the simple consumer suggests, that having any node in 
> the cluster, (not necessarily the leader for that partition) is sufficient to 
> find the leader for the partition, however, on running this, I find, that 
> given a different node in the cluster, a null pointer exception is thrown, 
> and the logs show the error
> [2014-09-28 20:40:20,984] WARN [KafkaApi-1] Fetch request with correlation id 
> 0 from client testClient on partition [VCCTask,1] failed due to Leader not 
> local for partition [VCCTask,1] on broker 1 (kafka.server.KafkaApis)
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic VCCTask
> Topic:VCCTask PartitionCount:3        ReplicationFactor:3     Configs:
>       Topic: VCCTask  Partition: 0    Leader: 1       Replicas: 2,3,1 Isr: 
> 1,2,3
>       Topic: VCCTask  Partition: 1    Leader: 1       Replicas: 3,1,2 Isr: 
> 1,2,3
>       Topic: VCCTask  Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 
> 1,2,3
> If i specify the leader for the partition, instead of any node in the 
> cluster, everything works great, but this is an operational nightmare.
> I was able to reproduce this using a simple test, where a producer writes 
> numbers from 1 to 999999, and the consumers, consume from a specific 
> partition.
> Here are the code snippets
> public class TestConsumerStoreOffsetZookeeper {
>     public static void main(String[] args) throws JSONException {
>         TestConsumerStoreOffsetZookeeper testConsumer = new 
> TestConsumerStoreOffsetZookeeper();
>         JSONObject jsonObject = new JSONObject();
>         jsonObject.put("topicName", "VCCTask");
>         jsonObject.put("clientName", "testClient");
>         jsonObject.put("partition", args[0]);
>         jsonObject.put("hostPort", "172.16.78.171");
>         jsonObject.put("znodeName", "VCCTask");
>         jsonObject.put("port", args[1]);
>         testConsumer.initialize(jsonObject);
>         final long startTime = System.currentTimeMillis();
>         testConsumer.startReceiving(new FutureCallback<byte[]>() {
>             int noOfMessagesConsumed= 0;
>             @Override
>             public void onSuccess(byte[] result) {
>                 LOG.info("YES!! " + ByteBuffer.wrap(result).getLong());
>                 ++noOfMessagesConsumed;
>                 LOG.info("# Messages consumed "+ noOfMessagesConsumed +" Time 
> elapsed"+ (System.currentTimeMillis()-startTime )/1000 +" seconds");
>             }
>             @Override
>             public void onFailure(Throwable t) {
>                 LOG.info("NO!! " + t.fillInStackTrace().getMessage());
>             }
>         });
>     }
>     private String topicToRead;
>     private static Logger LOG = 
> Logger.getLogger("TestConsumerStoreOffsetZookeeper");
>     List<String> seedBrokers = Lists.newArrayList("localhost");
>     private int port;
>     private SimpleConsumer consumer;
>     Integer partition;
>     String clientName;
>     private Broker currentLeader;
>     private String counter;
>     CuratorFramework zooKeeper;
>     public void startReceiving(final FutureCallback<byte[]> futureCallback) {
>         findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead, 
> partition);
>         LOG.info("Kafka consumer delegate listening on topic " + topicToRead 
> + " and partition " + partition);
>         int numErrors = 0;
>         while (true) {
>             long latestOffset = 0;
>             Stat stat = null;
>             final String path = "/" + topicToRead + "/"+partition;
>             try {
>                 //************************Read top of the
>                 stat = zooKeeper.checkExists().forPath(path);
>                 if (stat == null) {
>                     latestOffset = 
> getLastOffsetFromBeginningOfStream(this.consumer, topicToRead, partition, 
> OffsetRequest.EarliestTime(), clientName);
>                     byte b[] = new byte[8];
>                     ByteBuffer byteBuffer = ByteBuffer.wrap(b);
>                     byteBuffer.putLong(latestOffset);
>                     final String s = 
> zooKeeper.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
>                     LOG.info(" Zookeeper create string is "+ s);
>                     stat = zooKeeper.checkExists().forPath(path);
>                     if (stat == null) {
>                         LOG.info("Stat was null");
>                         throw new RuntimeException("Stat in zookeeper was 
> null, cannot continue as message stream cannot be persisted");
>                     }
>                 } else {
>                     final byte[] data = 
> zooKeeper.getData().storingStatIn(stat).forPath(path);
>                     if(data.length>0){
>                     latestOffset = ByteBuffer.wrap(data).getLong();
>                     }else {
>                         latestOffset = 
> getLastOffsetFromBeginningOfStream(this.consumer,topicToRead,partition,OffsetRequest.EarliestTime(),clientName);
>                     }
>                 }
>             } catch (Exception e) {
>                 throw new RuntimeException(e.fillInStackTrace().getMessage());
>             }
>             LOG.info("Topic name is " + topicToRead);
>             LOG.info("Last offset is " + latestOffset);
>             LOG.info("Constructing new fetch request on  " + topicToRead + " 
> from offset" + latestOffset);
>             FetchRequest request = new 
> FetchRequestBuilder().clientId(clientName).addFetch(topicToRead, partition, 
> latestOffset, 100000).build();
>             FetchResponse fetchResponse = consumer.fetch(request);
>             if (fetchResponse.hasError()) {
>                 numErrors++;
>                 final short code = fetchResponse.errorCode(topicToRead, 
> partition);
>                 LOG.info("Error fetching data from broker: " + 
> consumer.host() + " Reason " + code);
>                 if (code == ErrorMapping.OffsetOutOfRangeCode()) {
>                     LOG.info("Offset out of range error: calculating offset 
> again");
>                     throw new RuntimeException("Offset is out of range, 
> multiple consumers are not allowed, this consumer will exit");
>                 }
>                 if (numErrors > 5 && code!=3) {
>                     consumer.close();
>                     consumer = null;
>                     findLeaderAndUpdateSelfPointers(seedBrokers, port, 
> topicToRead, partition);
>                     numErrors = 0;
>                 }
>                 continue;
>             }
>             final ByteBufferMessageSet messageAndOffsets = 
> fetchResponse.messageSet(topicToRead, partition);
>             final int validBytes = messageAndOffsets.validBytes();
>             LOG.info("Received fetch response on topic  " + topicToRead + " 
> from offset" + latestOffset + " fetch response valid bytes is " + validBytes);
>             try {
>                 if (validBytes == 0) {
>                     LOG.info("No message received");
>                     //Don't keep hammering Kafka
>                     Thread.sleep(1000);
>                     continue;
>                 }
>                 for (MessageAndOffset messageAndOffset : messageAndOffsets) {
>                     LOG.info("Processing offset");
>                     final long currentOffset = messageAndOffset.offset();
>                     LOG.info("Processing offset " + currentOffset);
>                     //in case of compression entire block may be received
>                     if (currentOffset < latestOffset) {
>                         LOG.info("Found an old offset: " + currentOffset + 
> "Expecting:" + latestOffset);
>                         continue;
>                     }
>                     final ByteBuffer payload = 
> messageAndOffset.message().payload();
>                     byte[] bytes = new byte[payload.limit()];
>                     payload.get(bytes);
>                     LOG.info(this.getClass().getName() + " Received message 
> from offset" + String.valueOf(latestOffset) + new String(bytes, "UTF-8"));
>                     LOG.info(this.getClass().getName() + " Executing future 
> callback");
>                     //TODO ***************this should be atomic with writing 
> job in db***********************
>                     futureCallback.onSuccess(bytes);
>                     try {
>                         long nextOffset = messageAndOffset.nextOffset();
>                         incrementOffset(nextOffset, stat, path);
>                     } catch (KeeperException | InterruptedException e) {
>                         LOG.info("Encountered exception in writing to" + 
> e.fillInStackTrace().getMessage());
>                     }
>                     
> //****************************************************************************************
>                 }
>                 LOG.info("Outside for loop");
>             } catch (Exception e1) {
>                 LOG.info("Error in processing message or running callback " + 
> e1.getMessage());
>                 futureCallback.onFailure(e1);
>                 throw new RuntimeException(e1);
>             }
>         }
>     }
>     private void incrementOffset(long nextOffset, Stat stat, String path) 
> throws Exception {
>         if (stat == null) {
>             throw new RuntimeException("Given stat was null");
>         }
>         byte b[] = new byte[8];
>         ByteBuffer byteBuffer = ByteBuffer.wrap(b);
>         byteBuffer.putLong(nextOffset);
>         LOG.info("Offset consumed successfully: Setting offset in zookeeper 
> as next offset: "+ nextOffset);
>         final Stat statWrite = zooKeeper.setData().forPath(path, b);
>         if(statWrite.getDataLength() ==0){
>             throw new RuntimeException("Unable to save offset in zookeeper");
>         }
>     }
>     //TODO: agupta adapters should not have an initialize method, rename and 
> merge with startListening
>     public void initialize(JSONObject configData) {
>         try {
>             final String hostPort = configData.getString("hostPort");
>             zooKeeper = CuratorFrameworkFactory.newClient(hostPort,new 
> ExponentialBackoffRetry(10, 3000));
>             zooKeeper.start();
>             this.counter = configData.getString("znodeName");
>             this.topicToRead = configData.getString("topicName");
>             LOG.info("Topic name is " + topicToRead);
>             //TODO: agupta: read seedbrokers from zookeeper
>             //*ZkClient zkClient = new ZkClient("localhost:2108", 4000, 6000, 
> new BytesPushThroughSerializer());
>             //List<String> brokerList = zkClient.getChildren("/brokers/ips");
>             List<String> seedBrokers = Lists.newArrayList("localhost");
>             this.seedBrokers = seedBrokers;
>             this.port = configData.getInt("port");
>             this.partition= configData.getInt("partition");
>             this.clientName = configData.getString("clientName");
>             LOG.info("Finding leader with for partition " + partition + " 
> clientName " + clientName);
>         } catch (JSONException | IOException e) {
>             e.printStackTrace();
>             LOG.info("Error parsing configuration" + e.getMessage());
>         } catch (Exception e) {
>             LOG.info("Error starting zookeeper" + e.getMessage());
>         }
>     }
>     /**
>      * Find last offset to define where to start reading if this is the first 
> read
>      *
>      * @param consumer
>      * @param topic
>      * @param partition
>      * @param whichTime
>      * @param clientName
>      * @return
>      */
>     public static long getLastOffsetFromBeginningOfStream(SimpleConsumer 
> consumer, String topic, int partition,
>                                                           long whichTime, 
> String clientName) {
>         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, 
> partition);
>         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new 
> HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
>         requestInfo.put(topicAndPartition, new 
> PartitionOffsetRequestInfo(whichTime, 1));
>         kafka.javaapi.OffsetRequest request = new 
> kafka.javaapi.OffsetRequest(requestInfo, 
> kafka.api.OffsetRequest.CurrentVersion(), clientName);
>         OffsetResponse response = consumer.getOffsetsBefore(request);
>         if (response.hasError()) {
>             System.out.println("Error fetching data Offset Data the Broker. 
> Reason: " + response.errorCode(topic, partition));
>             return 0;
>         }
>         long[] offsets = response.offsets(topic, partition);
>         return offsets[0];
>     }
>     /**
>      * Return the lead broker for a given topic and partition
>      *
>      * @param seedBrokers
>      * @param port
>      * @param topic
>      * @param partition
>      * @return
>      */
>     private PartitionMetadata findLeaderAndUpdateSelfPointers(List<String> 
> seedBrokers, int port, String topic, int partition) {
>         PartitionMetadata returnMetaData = null;
>         loop:
>         for (String seed : seedBrokers) {
>             SimpleConsumer consumer = null;
>             try {
>                 this.consumer = new SimpleConsumer(seed, port, 100000, 64 * 
> 1024, "leaderLookup");
>                 List<String> topics = Collections.singletonList(topic);
>                 TopicMetadataRequest req = new TopicMetadataRequest(topics);
>                 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
>                 List<TopicMetadata> metaData = resp.topicsMetadata();
>                 for (TopicMetadata item : metaData) {
>                     for (PartitionMetadata part : item.partitionsMetadata()) {
>                         if (part.partitionId() == partition) {
>                             returnMetaData = part;
>                             LOG.info("Found leader " + 
> returnMetaData.leader().host());
>                             break loop;
>                         }
>                     }
>                 }
>             } catch (Exception e) {
>                 LOG.info("Error communicating with Broker [" + seed + "] to 
> find Leader for [" + topic
>                         + ", " + partition + "] Reason: " + e);
>             } finally {
>                 if (consumer != null) consumer.close();
>             }
>         }
>         LOG.info("KafkaConsumerDelegate initializing self pointers ");
>         if (returnMetaData != null) {
>             currentLeader = returnMetaData.leader();
>             if (currentLeader != null) {
>                 this.consumer = new SimpleConsumer(currentLeader.host(), 
> currentLeader.port(), 100000, 64 * 1024, clientName);
>             }
>         }
>         LOG.info("KafkaConsumerDelegate: returning metadata");
>         return returnMetaData;
>     }
> *******************************



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to