Hi Jun,
I modified the UpdateOffsetsInZK so that it could take a custom offset and
set it not sure of the consequences though.
The ZKConsumer can now get the messages from the desired offset , So Its
working as of now need to test more though .
Modified method
----------------------------------
def getAndSetOffsets(zkClient: ZkClient, offsetOption: String, config:
ConsumerConfig, topic: String): Unit = {
val cluster = ZkUtils.getCluster(zkClient)
val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient,
List(topic).iterator)
var partitions: List[String] = Nil
partitionsPerTopicMap.get(topic) match {
case Some(l) => partitions = l.sortWith((s,t) => s < t)
case _ => throw new RuntimeException("Can't find topic " + topic)
}
var numParts = 0
for (partString <- partitions) {
val part = Partition.parse(partString)
val broker = cluster.getBroker(part.brokerId) match {
case Some(b) => b
case None => throw new IllegalStateException("Broker " +
part.brokerId + " is unavailable. Cannot issue " +
"getOffsetsBefore request")
}
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
println("updating partition " + part.name + " with new offset: " +
offsetOption)
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir +
"/" + part.name, offsetOption)
numParts += 1
}
println("updated the offset for " + numParts + " partitions")
}
Thanks
Ajit Koti
On Thu, Sep 13, 2012 at 8:46 AM, Ajit Koti <[email protected]> wrote:
> Hi Jun,
>
> Was trying out this last option or else would use the Simpleconsumer.
>
> I am tryting to set the offset of my choice using
>
> getZkClinet().createPersistent("/consumers/test_group/offsets/golf4/0-0",
> "22274");
>
> or
>
> ZkUtils.updatePersistentPath(getZkClinet(),"/consumers/test_group/offsets/golf4/0-0",
> "22274");
>
> But was getting this exception
>
> ZookeeperConsumerConnector:66 -
> test_group_ajits-machine-1347505711889-9df96cdb exception during rebalance
> java.lang.NumberFormatException: For input string: "��
> at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Long.parseLong(Long.java:441)
> at java.lang.Long.parseLong(Long.java:483)
> at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:232)
> at scala.collection.immutable.StringOps.toLong(StringOps.scala:31)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$addPartitionTopicInfo(ZookeeperConsumerConnector.scala:644)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11$$anonfun$apply$13.apply(ZookeeperConsumerConnector.scala:523)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11$$anonfun$apply$13.apply(ZookeeperConsumerConnector.scala:520)
> at scala.collection.immutable.Range.foreach(Range.scala:78)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11.apply(ZookeeperConsumerConnector.scala:520)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11.apply(ZookeeperConsumerConnector.scala:507)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:72)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:507)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:494)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
> at scala.collection.Iterator$class.foreach(Iterator.scala:652)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43)
>
>
>
> Anything I could do here other way to resolve this issue
>
> Thanks
> Ajit Koti
>
>
> On Wed, Sep 12, 2012 at 9:50 AM, Jun Rao <[email protected]> wrote:
>
>> If you also need this for Hadoop, then it's more difficult. Unfortunately,
>> we don't have an api for changing offsets programmatically right now. We
>> will revisit this post 0.8.0.
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Sep 11, 2012 at 8:32 AM, Ajit Koti <[email protected]> wrote:
>>
>> > Hi Jun,
>> >
>> > By keeping the log segment size and the log retention size to same value
>> > means I will lose out messages right,which I dont want to happen.
>> > As the Hadoop consumer would not have consumed the message
>> > and persisted them into database.
>> >
>> > Was trying to do this that is
>> > set the autooffset to largest and was trying to delete the corresponding
>> > offsets
>> >
>> >
>> >
>> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201202.mbox/%3ccalxo6kityw_xtbvxkkv2x31wynxfwoz47o-e6dbfo21msmq...@mail.gmail.com%3E
>> >
>> > Not particularly sure how to delete the offsets
>> > using the zkutils was trying to delete this path before consuming any
>> > messages.
>> > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id].
>> >
>> > Not sure about right approach .
>> >
>> >
>> > Thanks
>> > Ajit
>> >
>> > On Tue, Sep 11, 2012 at 8:05 PM, Jun Rao <[email protected]> wrote:
>> >
>> > > That's an interesting use case and it is not what the ZK consumer is
>> > really
>> > > designed for. First, having a topic per user may not be ideal,
>> especially
>> > > if you have millions of users. Second, the ZK consumer is designed for
>> > > continuously consuming data, not skipping data all the time.
>> > >
>> > > With that said, you can probably try the following trick and see if it
>> > > works. Set both the log segment size and the log retention size to a
>> > value
>> > > that matches the size of about 100 messages. This should make sure
>> that
>> > at
>> > > any given point time, only the most recent 100 messages are kept in
>> the
>> > > broker. Set autooffset.reset to earliest. This will make sure that if
>> the
>> > > consumer's offset is out of range, it will automatically switch to the
>> > > current smallest offset.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Mon, Sep 10, 2012 at 9:07 PM, Ajit Koti <[email protected]>
>> wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > No Consumers are not falling behind .
>> > > > What happens is the users of the consumers don't fetch messages
>> for
>> > > > certain period of time and when then the users want to fetch the
>> want
>> > to
>> > > > fetch the latest messages from the topic.
>> > > >
>> > > > For example in case of displaying news feed each user has got its
>> own
>> > > topic
>> > > > , if the users don't login for couple of day
>> > > > he wouldn't consume the messages from the topic.But when he logs in
>> he
>> > > want
>> > > > to get the latest update for which we have to fetch the latest
>> messages
>> > > > from the topic.
>> > > >
>> > > > So is it possible to use the zkconsumer and set the offset to last
>> 100
>> > > > messages and then fetch them.
>> > > >
>> > > > Thanks
>> > > > Ajit Koti
>> > > >
>> > > >
>> > > > On Tue, Sep 11, 2012 at 9:04 AM, Jun Rao <[email protected]> wrote:
>> > > >
>> > > > > Is consuming falling behind? Can you have more consumers in the
>> same
>> > > > group
>> > > > > to increase the consumption throughput?
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Mon, Sep 10, 2012 at 12:22 PM, Ajit Koti <[email protected]>
>> > > wrote:
>> > > > >
>> > > > > > Hi All ,
>> > > > > >
>> > > > > > Was happy consuming messages from the consumer .
>> > > > > > Recently got a new requirement where I have to process the
>> latest
>> > > > > message
>> > > > > > always .
>> > > > > > Is there anyways I can fetch the latest 100 messages.I know
>> apart
>> > > from
>> > > > > > simple consumer I cannot specify the offset .
>> > > > > > But still wondering is there any way to start consuming from a
>> > > > particular
>> > > > > > offset.
>> > > > > >
>> > > > > >
>> > > > > > Note : I have only one consumer per topic.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > Thanks
>> > > > > > Ajit
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>