[ 
https://issues.apache.org/jira/browse/KAFKA-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-4832.
--------------------------------
    Resolution: Auto Closed

The Scala producers have been deprecated for a while and no further work is 
planned. Please upgrade to the Java producer whenever possible.

> kafka producer send Async message to the wrong IP cannot to stop 
> producer.close()
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-4832
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4832
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.2.2
>         Environment: JDK8 Eclipse Mars Win7
>            Reporter: Wang Hong
>
> 1.When I tried to send msg by Async with wrong IP in loop 1400times 10batches.
> 2.I use javaapi.kafkaproducer designed by Factory.
> 3.1 of 10 batches I take Producer.Connected() and Producer.Closed().
> 4.I know I send msg to a wrong IP finally, But I noticed the terminal was 
> blocking. It can't close normally.
> function just like that :
>       public static void go(int s) throws Exception {
>                 KafkaService kf = new KafkaServiceImpl();//init properties
>               for (int i = 0; i < 1400; i++) {
>                       String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + 
> i;
>                       System.out.println(msg);
>                       kf.push(msg); //producer.send()
>               }
>                 kf.closeProducerFactory();//producer.closed()
>               System.out.println(s);
>               Thread.sleep(1000);
>       }
> kf.closeProducerFactory() is used by producer.closed(),
> But Async send was always waiting for kafka server .I gave it a wrong IP.
> I think it waits for long time Will bring problem with whole system.it occupy 
> resources.
> And another problem was I sending kafka msg with true IP and Runnable 
> ,Threadpools, all is right .Also use ↑ examples for loop.
> It take error that said wait for 3 tries.
> I also configered 
> advertised.host.name=xxx.xxx.xxx.xxx
> advertised.port=9092
> Now I think it maybe cannot get so much concurrent volume in a time.
> Our System is  over 1000tps.
> Thank you .
> Resource Code part:
> package kafka.baseconfig;
> import java.util.Properties;
> import com.travelsky.util.ConFile;
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
> /**
>  * kafka工厂模式
>  * 
>  * 1.替代Producer方法.//多线程效率不适合.
>  * 2.使用三部: 
>  * ProducerFactory fac = new ProducerFactory();
>  * fac.openProducer(); ->初始化对象
>  * fac.push(msg); ->发消息主体
>  * fac.closeProducer(); ->关闭对象
>  * @author 王宏
>  *
>  */
> public class ProducerFactory {
>       protected Producer<String, String> producer = null;
>       protected ConFile conf = null;
>       private Properties props = new Properties();
>       private String topic = null;
>       {
>               try {
>                       conf = new ConFile("KafkaProperties.conf");
>                       topic = conf.getString("topic");
>                       if (conf == null) {
>                               throw new Exception("kafka配置文件有问题");
>                       }
>               } catch (Exception e) {
>                       e.printStackTrace();
>               }
>       }
>       
>       /**
>        * 发送消息方法
>        * @param msg
>        */
>       public void push(String msg) {
>               if (producer == null) {
>                       throw new RuntimeException("producer实例为空");
>               }
>               KeyedMessage<String, String> messageForSend = new 
> KeyedMessage<String, String>(topic, msg);
>               producer.send(messageForSend);
>       }
>       
>       /**
>        * 打开生产者
>        */
>       public void openProducer() {
>               props.put("serializer.class", "kafka.serializer.StringEncoder");
>               props.put("metadata.broker.list", 
> conf.getString("kafkaserverurl"));
>               // 异步发送
>               props.put("producer.type", conf.getString("synctype"));
>               // 每次发送多少条
>               props.put("batch.num.messages", conf.getString("batchmsgnums"));
>               
>               //
>               props.put("request.required.acks", "1");
>               //
>               props.put("queue.enqueue.timeout.ms", "1");
>               //
>               props.put("request.timeout.ms", "1");
>               //
>               props.put("timeout.ms", "1");
>               //
>               props.put("reconnect.backoff.ms", "1");
>               //
>               props.put("retry.backoff.ms", "1");
>               //
>               props.put("message.send.max.retries", "1");
>               //
>               props.put("retry.backoff.ms", "1");
>               //
>               props.put("linger.ms", "1");
>               //
>               props.put("max.block.ms", "1");
>               //
>               props.put("metadata.fetch.timeout.ms", "1");
>               //
>               props.put("metadata.max.age.ms", "1");
>               //
>               props.put("metrics.sample.window.ms ", "1");
>               producer = new Producer<String, String>(new 
> ProducerConfig(props));
>               if (producer == null) {
>                       throw new RuntimeException("kafka producer 打开失败");
>               }
>       }
>       /**
>        * 关闭生产对象
>        */
>       public void closeProducer() {
>               if (producer != null) {
>                       producer.close();
>               }
>       }
>       /**
>        * 判断producer是否开启
>        * @return
>        */
>       public boolean isOpenProduer() {
>               return producer != null;
>       }
> }
> package kafka.service.impl;
> import kafka.baseconfig.ProducerFactory;
> import kafka.service.KafkaService;
> public class KafkaServiceImpl implements KafkaService {
>       private ProducerFactory factory = null;
>       
>       public KafkaServiceImpl() {
>               factory = new ProducerFactory();
>               factory.openProducer();
>       }
>       
>       /**
>        * 往卡呼卡灌装数据并且可以修改topic
>        * @param msg 数据
>        * @param topic 发送的主题
>        * 
>        * @Deprecated 這個方法已經過期.不建議使用.
>        */
>       @Override
>       @Deprecated
>       public void push(String msg) throws Exception {
>               //new Producer(msg).start();
>               if (factory.isOpenProduer()) {
>                       factory.push(msg);
>               }else {
>                       throw new RuntimeException("factory沒有初始化");
>               }
>       }
>       
>       /**
>        * 过期方法
>        * 
>        * @param msg
>        * @param topic
>        * @throws Exception
>        */
>       @Override
>       public void push(String msg, String topic) throws Exception {
>               //new Producer(msg, topic).start();
>               if (factory.isOpenProduer()) {
>                       factory.push(msg);
>               }else {
>                       throw new RuntimeException("factory沒有初始化");
>               }
>       }
>       
>       /**
>        * 释放资源.
>        */
>       @Override
>       public void closeProducerFactory()throws Exception{
>               if (factory.isOpenProduer()) {
>                       factory.closeProducer();
>               }
>       }
> }
> public static void main(String[] args) throws Exception {
>               long l = System.currentTimeMillis();
>               for (int i = 0; i < 10; i++) {
>                       go(i);
>               }
>               System.out.println(System.currentTimeMillis() - l);
> }
>       public static void go(int s) throws Exception {
>               for (int i = 0; i < 1400; i++) {
>                       KafkaService kf = new KafkaServiceImpl();
>                       String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + 
> i;
>                       System.out.println(msg);
>                       kf.push(msg);
>                       kf.closeProducerFactory();
>               }
>               System.out.println(s);
>               Thread.sleep(1000);
>       }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to