Hi, Yi, Navina and Benjamin: Thanks a lot to spending your time to help me this issue.
The configuration is below. Do you think it could be the configuration problem? I tried props.put("request.required.acks", "0"); and props.put("request.required.acks", "1"); both did not work. -------- Properties props = new Properties(); private final Producer<String, String> producer; public KafkaProducer() { //BOOTSTRAP.SERVERS props.put("metadata.broker.list", "localhost:9092"); props.put("bootstrap.servers", "localhost:9092 "); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "com.kafka.SimplePartitioner"); props.put("request.required.acks", "0"); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } -------------- Exceptions at log are list below. Your help is highly appreciated. Sincerely, Selina Wu Exceptions at log deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_000001/samza-application-master.log 2015-07-25 22:03:52 Shell [DEBUG] Failed to detect a valid hadoop home directory *java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set*. at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265) at org.apache.hadoop.util.Shell.<clinit>(Shell.java:290) at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76) at org.apache.hadoop.yarn.conf.YarnConfiguration.<clinit>(YarnConfiguration.java:517) at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:77) at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) 2015-07-25 22:03:52 Shell [DEBUG] setsid is not available on this machine. So not using it. 2015-07-25 22:03:52 Shell [DEBUG] setsid exited with exit code 0 2015-07-25 22:03:52 ClientHelper [INFO] trying to connect to RM 127.0.0.1:8032 2015-07-25 22:03:52 AbstractService [DEBUG] Service: org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state INITED 2015-07-25 22:03:52 RMProxy [INFO] Connecting to ResourceManager at /127.0.0.1:8032 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=, always=false, type=DEFAULT, value=[Rate of successful kerberos logins and latency (milliseconds)], valueName=Time) 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=, always=false, type=DEFAULT, value=[Rate of failed kerberos logins and latency (milliseconds)], valueName=Time) 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups with annotation @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=, always=false, type=DEFAULT, value=[GetGroups], valueName=Time) 2015-07-25 22:03:52 MetricsSystemImpl [DEBUG] UgiMetrics, User and group related metrics 2015-07-25 22:03:52 KerberosName [DEBUG] Kerberos krb5 configuration not found, setting default realm to empty 2015-07-25 22:03:52 Groups [DEBUG] Creating new Groups object 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Trying to load the custom-built native-hadoop library... 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] java.library.path=/home//Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. 2015-07-25 22:03:52 NativeCodeLoader [WARN] Unable to load native-hadoop library for your platform... using builtin-java classes where applicable.... 2015-07-25 22:03:53 KafkaCheckpointManager [WARN] While trying to validate topic __samza_checkpoint_ver_1_for_demo-parser7_1: *kafka.common.LeaderNotAvailableException. Retrying.* 2015-07-25 22:03:53 KafkaCheckpointManager [DEBUG] Exception detail: kafka.common.LeaderNotAvailableException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:84) at org.apache.samza.util.KafkaUtil$.maybeThrowException(KafkaUtil.scala:63) at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$validateTopic$2.apply(KafkaCheckpointManager.scala:389) at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$validateTopic$2.apply(KafkaCheckpointManager.scala:386) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82) at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.validateTopic(KafkaCheckpointManager.scala:385) at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.start(KafkaCheckpointManager.scala:336) at org.apache.samza.coordinator.JobCoordinator$.buildJobModel(JobCoordinator.scala:127) at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:55) at org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:72) at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:93) at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) 2015-07-25 22:03:53 VerifiableProperties [INFO] Verifying properties 2015-07-25 22:03:53 VerifiableProperties [INFO] Property client.id is overridden to samza_checkpoint_manager-demo_parser7-1-1437887032962-0 2015-07-25 22:03:53 VerifiableProperties [INFO] Property metadata.broker.list is overridden to localhost:9092 2015-07-25 22:03:53 VerifiableProperties [INFO] Property request.timeout.ms is overridden to 30000 2015-07-25 22:03:53 ClientUtils$ [INFO] Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 1 for 1 topic(s) Set(__samza_checkpoint_ver_1_for_demo-parser7_1) 2015-07-25 22:03:53 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 408300 (requested -1), SO_SNDBUF = 114324 (requested 102400), connectTimeoutMs = 30000. 2015-07-25 22:03:53 SyncProducer [INFO] Connected to localhost:9092 for producing 2015-07-25 22:03:53 SyncProducer [INFO] Disconnecting from localhost:9092 2015-07-25 22:03:53 ClientUtils$ [DEBUG] Successfully fetched metadata for 1 topic(s) Set(__samza_checkpoint_ver_1_for_demo-parser7_1) 2015-07-25 22:03:53 KafkaCheckpointManager [INFO] Successfully validated checkpoint topic __samza_checkpoint_ver_1_for_demo-parser7_1. Exception at log deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_000002/stderr 2015-07-25 22:06:03 HttpDemoParserStreamTask [INFO] key=123: message=timestamp=06-20-2015 id=123 ip=22.231.113.69 browser=Chrome postalCode=95131 url=http://sample1.com language=ENG mobileBrand=Apple *count=3860* 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 2015-07-25 22:06:04 Selector [WARN] *Error in I/O with localhost/127.0.0.1 <http://127.0.0.1>java.io.EOFException* at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745) 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from selinas-mbp.attlocal.net:9092 2015-07-25 22:06:04 Selector [WARN] Error in I/O with Selinas-MBP.attlocal.net/192.168.1.227 java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745) 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from selinas-mbp.attlocal.net:9092 2015-07-25 22:06:04 Selector [WARN] Error in I/O with Selinas-MBP.attlocal.net/192.168.1.227 java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745) 2015-07-25 22:06:04 Selector [WARN] Error in I/O with localhost/127.0.0.1 java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745) 2015-07-25 22:06:04 NetworkClient [DEBUG] Node -1 disconnected. 2015-07-25 22:06:04 NetworkClient [DEBUG] Node 0 disconnected. 2015-07-25 22:06:04 NetworkClient [DEBUG] Node 0 disconnected. 2015-07-25 22:06:04 NetworkClient [DEBUG] Node -1 disconnected. 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata request to node 0 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata request to node 0 2015-07-25 22:06:04 NetworkClient [DEBUG] Init connection to node 0 for sending metadata request in the next iteration 2015-07-25 22:06:04 NetworkClient [DEBUG] Init connection to node 0 for sending metadata request in the next iteration 2015-07-25 22:06:04 NetworkClient [DEBUG] Initiating connection to node 0 at selinas-mbp.attlocal.net:9092. 2015-07-25 22:06:04 NetworkClient [DEBUG] Initiating connection to node 0 at selinas-mbp.attlocal.net:9092. 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata request to node 0 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata request to node 0 2015-07-25 22:06:04 Selector [WARN] Error in I/O with selinas-mbp.attlocal.net/192.168.1.227 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.kafka.common.network.Selector.poll(Selector.java:238) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745) 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from selinas-mbp.attlocal.net:9092 2015-07-25 22:06:04 Selector [WARN] Error in I/O On Fri, Jul 24, 2015 at 5:03 PM, Benjamin Black <b...@b3k.us> wrote: > what are the log messages from the kafka brokers? these look like client > messages indicating a broker problem. > > On Fri, Jul 24, 2015 at 1:18 PM, Job-Selina Wu <swucaree...@gmail.com> > wrote: > > > Hi, Yi: > > > > I am wondering if the problem can be fixed by the parameter " > > max.message.size" at kafka.producer.ProducerConfig for the topic size? > > > > My Http Server send message to Kafka. The last message shown on > > console is > > "message=timestamp=06-20-2015 id=678 ip=22.231.113.68 browser=Safari > > postalCode=95066 url=http://sample2.com language=ENG mobileBrand=Apple > > count=4269" > > > > However the Kafka got Exception from message 4244th > > The error is below and Kafka do not accept any new message after this. > > > > "[2015-07-24 12:46:11,078] WARN > > > [console-consumer-61156_Selinas-MacBook-Pro.local-1437766693294-a68fc532-leader-finder-thread], > > Failed to find leader for Set([http-demo,0]) > > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) > > kafka.common.KafkaException: fetching topic metadata for topics > > [Set(http-demo)] from broker > [ArrayBuffer(id:0,host:10.1.10.173,port:9092)] > > failed > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > > at > > > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > Caused by: java.nio.channels.ClosedChannelException > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > > at > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > ... 3 more > > [2015-07-24 12:46:11,287] WARN Fetching topic metadata with correlation > id > > 21 for topics [Set(http-demo)] from broker > > [id:0,host:10.1.10.173,port:9092] failed (kafka.client.ClientUtils$) > > java.nio.channels.ClosedChannelException > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > > at > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > > at > > > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)" > > > > > > After the Error: > > I show the topic, it is right, but can not show the content by command > line > > > > Selinas-MacBook-Pro:samza-Demo selina$ deploy/kafka/bin/kafka-topics.sh > > --list --zookeeper localhost:2181 > > http-demo > > Selinas-MacBook-Pro:samza-Demo selina$ > deploy/kafka/bin/kafka-console-consumer.sh > > --zookeeper localhost:2181 --from-beginning --topic http-demo > > [2015-07-24 12:47:38,730] WARN > > > [console-consumer-10297_Selinas-MacBook-Pro.local-1437767258570-1a809d87], > > no brokers found when trying to rebalance. > > (kafka.consumer.ZookeeperConsumerConnector) > > > > Attached is my Kafka properties for server and producer. > > > > Your help is highly appreciated > > > > > > Sincerely, > > Selina > > > > > > > > On Thu, Jul 23, 2015 at 11:16 PM, Yi Pan <nickpa...@gmail.com> wrote: > > > >> Hi, Selina, > >> > >> Your question is not clear. > >> {quote} > >> When the messages was send to Kafka by KafkaProducer, It always failed > >> when the message more than 3000 - 4000 messages. > >> {quote} > >> > >> What's failing? The error stack shows errors on the consumer side and > you > >> were referring to failures to produce to Kafka. Could you be more > specific > >> regarding to what's your failure scenario? > >> > >> -Yi > >> > >> On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu <swucaree...@gmail.com> > >> wrote: > >> > >> > Hi, > >> > > >> > When the messages was send to Kafka by KafkaProducer, It always > >> failed > >> > when the message more than 3000 - 4000 messages. The error is shown > >> below. > >> > I am wondering if any topic size I need to set at Samza configuration? > >> > > >> > > >> > [2015-07-23 17:30:03,792] WARN > >> > > >> > > >> > [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread], > >> > Failed to find leader for Set([http-demo,0]) > >> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) > >> > kafka.common.KafkaException: fetching topic metadata for topics > >> > [Set(http-demo)] from broker [ArrayBuffer()] failed > >> > at > >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > >> > at > >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > >> > at > >> > > >> > > >> > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > >> > at > >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > >> > ^CConsumed 4327 messages > >> > > >> > Your reply and comment will be highly appreciated. > >> > > >> > > >> > Sincerely, > >> > Selina > >> > > >> > > > > >