[ https://issues.apache.org/jira/browse/KAFKA-1657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jay Kreps resolved KAFKA-1657. ------------------------------ Resolution: Won't Fix I think the ultimate solution here is to move to the new consumer which should fix all this. > Fetch request using Simple consumer fails due to failed due to Leader not > local for partition > --------------------------------------------------------------------------------------------- > > Key: KAFKA-1657 > URL: https://issues.apache.org/jira/browse/KAFKA-1657 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8.1.1 > Reporter: aarti gupta > > I have a three node Kafka cluster, running on the same physical machine, (on > different ports) > with replication factor = 3, and a single topic with 3 partitions. > Multiple producers write to the topic, and a custom partitioner is used to > direct messages to a given partition. > I use the simple consumer to read from a given partition of the topic, and > have three instances of my consumer running > The code snippet for the simple consumer suggests, that having any node in > the cluster, (not necessarily the leader for that partition) is sufficient to > find the leader for the partition, however, on running this, I find, that > given a different node in the cluster, a null pointer exception is thrown, > and the logs show the error > [2014-09-28 20:40:20,984] WARN [KafkaApi-1] Fetch request with correlation id > 0 from client testClient on partition [VCCTask,1] failed due to Leader not > local for partition [VCCTask,1] on broker 1 (kafka.server.KafkaApis) > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic VCCTask > Topic:VCCTask PartitionCount:3 ReplicationFactor:3 Configs: > Topic: VCCTask Partition: 0 Leader: 1 Replicas: 2,3,1 Isr: > 1,2,3 > Topic: VCCTask Partition: 1 Leader: 1 Replicas: 3,1,2 Isr: > 1,2,3 > Topic: VCCTask Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: > 1,2,3 > If i specify the leader for the partition, instead of any node in the > cluster, everything works great, but this is an operational nightmare. > I was able to reproduce this using a simple test, where a producer writes > numbers from 1 to 999999, and the consumers, consume from a specific > partition. > Here are the code snippets > public class TestConsumerStoreOffsetZookeeper { > public static void main(String[] args) throws JSONException { > TestConsumerStoreOffsetZookeeper testConsumer = new > TestConsumerStoreOffsetZookeeper(); > JSONObject jsonObject = new JSONObject(); > jsonObject.put("topicName", "VCCTask"); > jsonObject.put("clientName", "testClient"); > jsonObject.put("partition", args[0]); > jsonObject.put("hostPort", "172.16.78.171"); > jsonObject.put("znodeName", "VCCTask"); > jsonObject.put("port", args[1]); > testConsumer.initialize(jsonObject); > final long startTime = System.currentTimeMillis(); > testConsumer.startReceiving(new FutureCallback<byte[]>() { > int noOfMessagesConsumed= 0; > @Override > public void onSuccess(byte[] result) { > LOG.info("YES!! " + ByteBuffer.wrap(result).getLong()); > ++noOfMessagesConsumed; > LOG.info("# Messages consumed "+ noOfMessagesConsumed +" Time > elapsed"+ (System.currentTimeMillis()-startTime )/1000 +" seconds"); > } > @Override > public void onFailure(Throwable t) { > LOG.info("NO!! " + t.fillInStackTrace().getMessage()); > } > }); > } > private String topicToRead; > private static Logger LOG = > Logger.getLogger("TestConsumerStoreOffsetZookeeper"); > List<String> seedBrokers = Lists.newArrayList("localhost"); > private int port; > private SimpleConsumer consumer; > Integer partition; > String clientName; > private Broker currentLeader; > private String counter; > CuratorFramework zooKeeper; > public void startReceiving(final FutureCallback<byte[]> futureCallback) { > findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead, > partition); > LOG.info("Kafka consumer delegate listening on topic " + topicToRead > + " and partition " + partition); > int numErrors = 0; > while (true) { > long latestOffset = 0; > Stat stat = null; > final String path = "/" + topicToRead + "/"+partition; > try { > //************************Read top of the > stat = zooKeeper.checkExists().forPath(path); > if (stat == null) { > latestOffset = > getLastOffsetFromBeginningOfStream(this.consumer, topicToRead, partition, > OffsetRequest.EarliestTime(), clientName); > byte b[] = new byte[8]; > ByteBuffer byteBuffer = ByteBuffer.wrap(b); > byteBuffer.putLong(latestOffset); > final String s = > zooKeeper.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path); > LOG.info(" Zookeeper create string is "+ s); > stat = zooKeeper.checkExists().forPath(path); > if (stat == null) { > LOG.info("Stat was null"); > throw new RuntimeException("Stat in zookeeper was > null, cannot continue as message stream cannot be persisted"); > } > } else { > final byte[] data = > zooKeeper.getData().storingStatIn(stat).forPath(path); > if(data.length>0){ > latestOffset = ByteBuffer.wrap(data).getLong(); > }else { > latestOffset = > getLastOffsetFromBeginningOfStream(this.consumer,topicToRead,partition,OffsetRequest.EarliestTime(),clientName); > } > } > } catch (Exception e) { > throw new RuntimeException(e.fillInStackTrace().getMessage()); > } > LOG.info("Topic name is " + topicToRead); > LOG.info("Last offset is " + latestOffset); > LOG.info("Constructing new fetch request on " + topicToRead + " > from offset" + latestOffset); > FetchRequest request = new > FetchRequestBuilder().clientId(clientName).addFetch(topicToRead, partition, > latestOffset, 100000).build(); > FetchResponse fetchResponse = consumer.fetch(request); > if (fetchResponse.hasError()) { > numErrors++; > final short code = fetchResponse.errorCode(topicToRead, > partition); > LOG.info("Error fetching data from broker: " + > consumer.host() + " Reason " + code); > if (code == ErrorMapping.OffsetOutOfRangeCode()) { > LOG.info("Offset out of range error: calculating offset > again"); > throw new RuntimeException("Offset is out of range, > multiple consumers are not allowed, this consumer will exit"); > } > if (numErrors > 5 && code!=3) { > consumer.close(); > consumer = null; > findLeaderAndUpdateSelfPointers(seedBrokers, port, > topicToRead, partition); > numErrors = 0; > } > continue; > } > final ByteBufferMessageSet messageAndOffsets = > fetchResponse.messageSet(topicToRead, partition); > final int validBytes = messageAndOffsets.validBytes(); > LOG.info("Received fetch response on topic " + topicToRead + " > from offset" + latestOffset + " fetch response valid bytes is " + validBytes); > try { > if (validBytes == 0) { > LOG.info("No message received"); > //Don't keep hammering Kafka > Thread.sleep(1000); > continue; > } > for (MessageAndOffset messageAndOffset : messageAndOffsets) { > LOG.info("Processing offset"); > final long currentOffset = messageAndOffset.offset(); > LOG.info("Processing offset " + currentOffset); > //in case of compression entire block may be received > if (currentOffset < latestOffset) { > LOG.info("Found an old offset: " + currentOffset + > "Expecting:" + latestOffset); > continue; > } > final ByteBuffer payload = > messageAndOffset.message().payload(); > byte[] bytes = new byte[payload.limit()]; > payload.get(bytes); > LOG.info(this.getClass().getName() + " Received message > from offset" + String.valueOf(latestOffset) + new String(bytes, "UTF-8")); > LOG.info(this.getClass().getName() + " Executing future > callback"); > //TODO ***************this should be atomic with writing > job in db*********************** > futureCallback.onSuccess(bytes); > try { > long nextOffset = messageAndOffset.nextOffset(); > incrementOffset(nextOffset, stat, path); > } catch (KeeperException | InterruptedException e) { > LOG.info("Encountered exception in writing to" + > e.fillInStackTrace().getMessage()); > } > > //**************************************************************************************** > } > LOG.info("Outside for loop"); > } catch (Exception e1) { > LOG.info("Error in processing message or running callback " + > e1.getMessage()); > futureCallback.onFailure(e1); > throw new RuntimeException(e1); > } > } > } > private void incrementOffset(long nextOffset, Stat stat, String path) > throws Exception { > if (stat == null) { > throw new RuntimeException("Given stat was null"); > } > byte b[] = new byte[8]; > ByteBuffer byteBuffer = ByteBuffer.wrap(b); > byteBuffer.putLong(nextOffset); > LOG.info("Offset consumed successfully: Setting offset in zookeeper > as next offset: "+ nextOffset); > final Stat statWrite = zooKeeper.setData().forPath(path, b); > if(statWrite.getDataLength() ==0){ > throw new RuntimeException("Unable to save offset in zookeeper"); > } > } > //TODO: agupta adapters should not have an initialize method, rename and > merge with startListening > public void initialize(JSONObject configData) { > try { > final String hostPort = configData.getString("hostPort"); > zooKeeper = CuratorFrameworkFactory.newClient(hostPort,new > ExponentialBackoffRetry(10, 3000)); > zooKeeper.start(); > this.counter = configData.getString("znodeName"); > this.topicToRead = configData.getString("topicName"); > LOG.info("Topic name is " + topicToRead); > //TODO: agupta: read seedbrokers from zookeeper > //*ZkClient zkClient = new ZkClient("localhost:2108", 4000, 6000, > new BytesPushThroughSerializer()); > //List<String> brokerList = zkClient.getChildren("/brokers/ips"); > List<String> seedBrokers = Lists.newArrayList("localhost"); > this.seedBrokers = seedBrokers; > this.port = configData.getInt("port"); > this.partition= configData.getInt("partition"); > this.clientName = configData.getString("clientName"); > LOG.info("Finding leader with for partition " + partition + " > clientName " + clientName); > } catch (JSONException | IOException e) { > e.printStackTrace(); > LOG.info("Error parsing configuration" + e.getMessage()); > } catch (Exception e) { > LOG.info("Error starting zookeeper" + e.getMessage()); > } > } > /** > * Find last offset to define where to start reading if this is the first > read > * > * @param consumer > * @param topic > * @param partition > * @param whichTime > * @param clientName > * @return > */ > public static long getLastOffsetFromBeginningOfStream(SimpleConsumer > consumer, String topic, int partition, > long whichTime, > String clientName) { > TopicAndPartition topicAndPartition = new TopicAndPartition(topic, > partition); > Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new > HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); > requestInfo.put(topicAndPartition, new > PartitionOffsetRequestInfo(whichTime, 1)); > kafka.javaapi.OffsetRequest request = new > kafka.javaapi.OffsetRequest(requestInfo, > kafka.api.OffsetRequest.CurrentVersion(), clientName); > OffsetResponse response = consumer.getOffsetsBefore(request); > if (response.hasError()) { > System.out.println("Error fetching data Offset Data the Broker. > Reason: " + response.errorCode(topic, partition)); > return 0; > } > long[] offsets = response.offsets(topic, partition); > return offsets[0]; > } > /** > * Return the lead broker for a given topic and partition > * > * @param seedBrokers > * @param port > * @param topic > * @param partition > * @return > */ > private PartitionMetadata findLeaderAndUpdateSelfPointers(List<String> > seedBrokers, int port, String topic, int partition) { > PartitionMetadata returnMetaData = null; > loop: > for (String seed : seedBrokers) { > SimpleConsumer consumer = null; > try { > this.consumer = new SimpleConsumer(seed, port, 100000, 64 * > 1024, "leaderLookup"); > List<String> topics = Collections.singletonList(topic); > TopicMetadataRequest req = new TopicMetadataRequest(topics); > kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); > List<TopicMetadata> metaData = resp.topicsMetadata(); > for (TopicMetadata item : metaData) { > for (PartitionMetadata part : item.partitionsMetadata()) { > if (part.partitionId() == partition) { > returnMetaData = part; > LOG.info("Found leader " + > returnMetaData.leader().host()); > break loop; > } > } > } > } catch (Exception e) { > LOG.info("Error communicating with Broker [" + seed + "] to > find Leader for [" + topic > + ", " + partition + "] Reason: " + e); > } finally { > if (consumer != null) consumer.close(); > } > } > LOG.info("KafkaConsumerDelegate initializing self pointers "); > if (returnMetaData != null) { > currentLeader = returnMetaData.leader(); > if (currentLeader != null) { > this.consumer = new SimpleConsumer(currentLeader.host(), > currentLeader.port(), 100000, 64 * 1024, clientName); > } > } > LOG.info("KafkaConsumerDelegate: returning metadata"); > return returnMetaData; > } > ******************************* -- This message was sent by Atlassian JIRA (v6.3.4#6332)