client/producer : 0.9.0.1
server/broker : 0.9.0.0

> On Apr 26, 2016, at 10:05 PM, Sharma Vinay <vinaysharm...@gmail.com> wrote:
> 
> What versions of kafka and client API are you using now?
> On Apr 26, 2016 3:35 PM, "Fumo, Vincent" <vincent_f...@cable.comcast.com>
> wrote:
> 
>> I spoke too soon. It's back doing the same thing.. which is really odd.
>> 
>>> On Apr 26, 2016, at 12:24 PM, Fumo, Vincent <
>> vincent_f...@cable.comcast.com> wrote:
>>> 
>>> Hello. I found the issue. The Ops team deployed kafka 0.8.1 and all my
>> code was 0.9.0. Simple mistake and one that I should have thought of
>> sooner. Once I had them bump up to the latest kafka all was well. Thank you
>> for your help!
>>> 
>>> v
>>> 
>>> 
>>>> On Apr 25, 2016, at 2:54 PM, vinay sharma <vinsharma.t...@gmail.com>
>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Your code looks good. i don't see any reason there for frequent meta
>> data
>>>> fetch but i will run it to verify.
>>>> 
>>>> I was able to replicate this issue with my consumer today where I
>> killed 2
>>>> brokers out or 3 and ran consumer. I saw a lot of meta data requests in
>>>> wait for new leader for partitions for which those 2 brokers were
>> leader. i
>>>> see below 2 lines frequently as you mentioned in logs
>>>> 
>>>> [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Metadata:172 - Updated
>> cluster
>>>> metadata version 275 to Cluster(nodes = [Node(1, node1.example.com,
>> 9093)],
>>>> partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader =
>> 1,
>>>> replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition
>> =
>>>> 2, leader = none, replicas = [], isr = [], Partition(topic =
>> kafkaPOCTopic,
>>>> partition = 0, leader = none, replicas = [], isr = []])
>>>> 
>>>> [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Fetcher:453 - Leader for
>>>> partition kafkaPOCTopic-0 unavailable for fetching offset, wait for
>>>> metadata refresh
>>>> 
>>>> I also saw this behavior due to below error where i might have killed
>> the
>>>> only ISR available for the topic at that time.
>>>> 
>>>> "Error while fetching metadata with correlation id 242 :
>>>> {kafkaPOCTopic=LEADER_NOT_AVAILABLE}"
>>>> 
>>>> Do you see any such error in your logs?
>>>> 
>>>> Regards,
>>>> Vinay Sharma
>>>> 
>>>> 
>>>> On Mon, Apr 25, 2016 at 9:38 AM, Fumo, Vincent <
>>>> vincent_f...@cable.comcast.com> wrote:
>>>> 
>>>>> 
>>>>> 
>>>>> My code is very straightforward. I create a producer, and then call it
>> to
>>>>> send messages. Here is the factory method::
>>>>> 
>>>>> public Producer<String, String> createProducer() {
>>>>> 
>>>>>  Properties props = new Properties();
>>>>> 
>>>>>  props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092");
>>>>>  props.put("key.serializer",
>>>>> "org.apache.kafka.common.serialization.StringSerializer");
>>>>>  props.put("value.serializer",
>>>>> "org.apache.kafka.common.serialization.StringSerializer");
>>>>>  props.put("acks", "all");
>>>>>  props.put("retries", "0");
>>>>>  props.put("batch.size", "1638");
>>>>>  props.put("linger.ms", "1");
>>>>>  props.put("buffer.memory", "33554432");
>>>>>  props.put("compression.type", "gzip");
>>>>>  props.put("client.id", "sds-merdevl");
>>>>> 
>>>>>  return new KafkaProducer<>(props);
>>>>> }
>>>>> 
>>>>> That is injected into a single instance of KafkaNotifier::
>>>>> 
>>>>> public class KafkaNotifier implements MessageNotifier {
>>>>>  private static Logger logger =
>>>>> LoggerFactory.getLogger(KafkaNotifier.class);
>>>>> 
>>>>>  private Producer<String, String> producer;
>>>>>  private String topicName;
>>>>> 
>>>>>  @Override
>>>>>  public void sendNotificationMessage(DataObjectModificationMessage
>>>>> message) {
>>>>> 
>>>>>      String json = message.asJSON();
>>>>>      String key = message.getObject().getId().toString();
>>>>> 
>>>>>      producer.send(new ProducerRecord<>(topicName, key, json));
>>>>> 
>>>>>      if (logger.isDebugEnabled()) {
>>>>>          logger.debug("Sent Kafka message for object with id={}",
>> key);
>>>>>      }
>>>>>  }
>>>>> 
>>>>>  @Required
>>>>>  public void setProducer(Producer<String, String> producer) {
>>>>>      this.producer = producer;
>>>>>  }
>>>>> 
>>>>>  @Required
>>>>>  public void setTopicName(String topicName) {
>>>>>      this.topicName = topicName;
>>>>>  }
>>>>> }
>>>>> 
>>>>> Here is the topic config info :
>>>>> 
>>>>> ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe
>>>>> --topic s.notifications.dev
>>>>> Topic:s.notifications.dev       PartitionCount:1
>>>>> ReplicationFactor:1     Configs:retention.ms
>>>>> =172800000,cleanup.policy=compact
>>>>>      Topic: s.notifications.dev      Partition: 0    Leader: 0
>>>>> Replicas: 0     Isr: 0
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>>> On Apr 22, 2016, at 6:27 PM, vinay sharma <vinsharma.t...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>> 2 producer's to same topic should not be a problem. There can be
>> multiple
>>>>>> producers and consumers of same kafka topic.
>>>>>> 
>>>>>> I am not sure what can be wrong here. I can this at my end If you can
>>>>> share
>>>>>> producer code and any config of topic ot broker that you changed and
>> is
>>>>> not
>>>>>> default.
>>>>>> 
>>>>>> Please also check that you are not creating producer every time you
>> send
>>>>> a
>>>>>> message but reusing producer once created to send multiple messages.
>>>>> Before
>>>>>> a send and after creation a producers sends this request to fetch
>>>>> metadata.
>>>>>> I ran a test to publish messages from 2 producers to a topic with 3
>>>>>> partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more
>>>>> than
>>>>>> a minute and saw just once for both producers before their 1st send.
>>>>>> 
>>>>>> Regards,
>>>>>> Vinay Sharma
>>>>>> On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <
>> vincent_f...@cable.comcast.com
>>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi. I've not set that value. My producer properties are as follows :
>>>>>>> 
>>>>>>> acks=all
>>>>>>> retries=0
>>>>>>> bath.size=1638
>>>>>>> linger.ms=1
>>>>>>> buffer.memory=33554432
>>>>>>> compression.type=gzip
>>>>>>> client.id=sds-merdevl
>>>>>>> 
>>>>>>> I have this running on two hosts with the same config. I thought that
>>>>>>> having the same client.id on each would just consolidate the
>> tracking
>>>>>>> (same logical name). You don't think there is an issue with 2
>> producers
>>>>> to
>>>>>>> the same topic?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> On Apr 22, 2016, at 3:05 PM, vinay sharma <vinsharma.t...@gmail.com
>>> 
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> Generally a proactive metadata refresh request is sent by producer
>> and
>>>>>>>> consumer every 5 minutes but this interval can be overriden with
>>>>>>> property "
>>>>>>>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes.
>>>>>>> Check if
>>>>>>>> you have set this property very low in your producer?
>>>>>>>> 
>>>>>>>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent <
>>>>>>>> vincent_f...@cable.comcast.com> wrote:
>>>>>>>> 
>>>>>>>>> I'm testing a kafka install and using the java client. I have a
>> topic
>>>>>>> set
>>>>>>>>> up and it appears to work great, but after a while I noticed my log
>>>>>>>>> starting to fill up with what appears to be some kind of loop for
>>>>>>> metadata
>>>>>>>>> updates.
>>>>>>>>> 
>>>>>>>>> example::
>>>>>>>>> 
>>>>>>>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out  env="md"
>>>>>>>>> [kafka-producer-network-thread | sds-merdevl]
>>>>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>>>>>>> 6196 to
>>>>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
>>>>> partitions
>>>>>>> =
>>>>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>>>>>>> replicas
>>>>>>>>> = [0,], isr = [0,]])
>>>>>>>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out  env="md"
>>>>>>>>> [kafka-producer-network-thread | sds-merdevl]
>>>>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>>>>>>> 6197 to
>>>>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
>>>>> partitions
>>>>>>> =
>>>>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>>>>>>> replicas
>>>>>>>>> = [0,], isr = [0,]])
>>>>>>>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out  env="md"
>>>>>>>>> [kafka-producer-network-thread | sds-merdevl]
>>>>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>>>>>>> 6198 to
>>>>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
>>>>> partitions
>>>>>>> =
>>>>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>>>>>>> replicas
>>>>>>>>> = [0,], isr = [0,]])
>>>>>>>>> 
>>>>>>>>> etc.
>>>>>>>>> 
>>>>>>>>> It hasn't stopped..
>>>>>>>>> 
>>>>>>>>> I'm curious about what's going on here. Can anyone help?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
>> 
>> 

Reply via email to