Igor Khomenko created KAFKA-1913:
------------------------------------

             Summary: App hungs when calls producer.send to wrong IP of Kafka 
broker
                 Key: KAFKA-1913
                 URL: https://issues.apache.org/jira/browse/KAFKA-1913
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 0.8.1.1
         Environment: OS X 10.10.1, Java 7
            Reporter: Igor Khomenko
            Assignee: Jun Rao
             Fix For: 0.8.1.2


I have next test code to check the Kafka functionality:

{code}
package com.company;

import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Date;
import java.util.Properties;

public class Main {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("metadata.broker.list", "192.168.9.3:9092");
        props.put("serializer.class", "com.company.KafkaMessageSerializer");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        // The first is the type of the Partition key, the second the type of 
the message.
        Producer<String, String> messagesProducer = new Producer<String, 
String>(config);

        // Send
        String topicName = "my_messages";
        String message = "hello world";
        KeyedMessage<String, String> data = new KeyedMessage<String, 
String>(topicName, message);

        try {
            System.out.println(new Date() + ": sending...");

            messagesProducer.send(data);

            System.out.println(new Date() +  ": sent");

        }catch (FailedToSendMessageException e){
            System.out.println("e: " + e);
            e.printStackTrace();

        }catch (Exception exc){
            System.out.println("e: " + exc);
            exc.printStackTrace();
        }
    }
}
{code}

{code}
package com.company;

import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;

/**
 * Created by igorkhomenko on 2/2/15.
 */
public class KafkaMessageSerializer implements Encoder<String> {

    public KafkaMessageSerializer(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(String entity) {
        byte [] serializedMessage = doCustomSerialization(entity);
        return serializedMessage;
    }

    private byte[] doCustomSerialization(String entity) {
        return entity.getBytes();
    }
}
{code}

Here is also GitHub version https://github.com/soulfly/Kafka-java-producer

So it just hungs on next line:
{code}
messagesProducer.send(data)
{code}

When I replaced the brokerlist to
{code}
props.put("metadata.broker.list", "localhost:9092");
{code}

then I got an exception:
{code}
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
{code}

so it's okay

Why it hungs with wrong brokerlist? Any ideas?





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to