Hi Guozhang,

I want to get the latest offset, code as follows:
consumer.assign(topicPartitionList);
consumer.seekToEnd(topicPartitionList);
long offset = consumer.position(topicPartition);

I note that the topic is marked for deletion but "delete.topic.enable" is not 
set to true.
Maybe it cause that?

After set delete.topic.enable to true and restart, I encounter another 
question, log as follows:
[2016-07-21 18:25:27,740] INFO [delete-topics-thread-61], Starting  
(kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2016-07-21 18:25:27,740] INFO [Controller 61]: Controller startup complete 
(kafka.controller.KafkaController)
[2016-07-21 18:25:27,801] INFO [delete-topics-thread-61], Handling deletion for 
topics test-topic (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2016-07-21 18:25:27,806] ERROR [delete-topics-thread-61], Error due to  
(kafka.controller.TopicDeletionManager$DeleteTopicsThread)
java.util.NoSuchElementException: key not found: test-topic
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:59)
        at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
        at 
kafka.controller.PartitionStateMachine.deregisterPartitionChangeListener(PartitionStateMachine.scala:387)
        at 
kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$completeDeleteTopic(TopicDeletionManager.scala:282)
        at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:407)
        at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
        at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
        at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
        at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
        at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-07-21 18:25:27,809] INFO [delete-topics-thread-61], Stopped  
(kafka.controller.TopicDeletionManager$DeleteTopicsThread)



YuanJia Li
 
From: Guozhang Wang
Date: 2016-07-22 02:07
To: users@kafka.apache.org
Subject: Re: KafkaConsumer position block
Hello Yuanjia,
 
Could you share your code example on calling consumer.position()? Is the
partition that you are getting the offset from assigned to the consumer?
 
 
Guozhang
 
On Wed, Jul 20, 2016 at 11:50 PM, yuanjia8...@163.com <yuanjia8...@163.com>
wrote:
 
> Hi,
>      With kafka-clients-0.10.0.0, I use KafkaConsumer.position() to get
> the offset, the process block in ConsumerNetworkClient.awaitMetadataUpdate.
> Block until the meadata has been refreshed.
>      My questions are:
>      1. Why the metadata not refresh?
>      2. Could it use timeout or throw exception instead of block?
>
>      Thanks.
>
>
>
> YuanJia Li
>
 
 
 
-- 
-- Guozhang

Reply via email to