Re: Facing Issue to create asyn producer in kafka 0.8.2
Thanks its work ... Regards Prateek On Wed, Oct 14, 2015 at 11:46 AM, Grant Henke wrote: > Looks like you may be mixing the new producer with old producer configs. > See the new config documentation here: > http://kafka.apache.org/documentation.html#newproducerconfigs. You will > likely want to set the "batch.size" and "linger.ms" to achieve your goal. > > Thanks, > Grant > > On Wed, Oct 14, 2015 at 1:29 PM, prateek arora > > wrote: > > > Hi > > > > Thanks for help . > > > > but same behavior even after changing batch.size > > > > I have changes batch.size value to 33554432. > > props.put("batch.size","33554432"); > > > > > > > > On Wed, Oct 14, 2015 at 11:09 AM, Zakee wrote: > > > > > Hi Prateek, > > > > > > Looks like you are using default batch.size which is ~16K and it forces > > > the send of messages immediately as your single message is larger than > > > that. Try using larger batch.size. > > > > > > Thanks > > > Zakee > > > > > > > > > > > > > On Oct 14, 2015, at 10:29 AM, prateek arora < > > prateek.arora...@gmail.com> > > > wrote: > > > > > > > > Hi > > > > > > > > I want to create async producer so i can buffer messages in queue and > > > send > > > > after every 5 sec . > > > > > > > > my kafka version is 0.8.2.0. > > > > > > > > and i am using kafka-clients 0.8.2.0 to create kafka producer in > java. > > > > > > > > > > > > below is my sample code : > > > > > > > > package com.intel.labs.ive.cloud.testKafkaProducerJ; > > > > > > > > import java.nio.charset.Charset; > > > > import java.util.HashMap; > > > > > > > > import java.util.Map; > > > > > > > > import org.apache.kafka.clients.producer.KafkaProducer; > > > > import org.apache.kafka.clients.producer.Producer; > > > > import org.apache.kafka.clients.producer.ProducerConfig; > > > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > import org.apache.kafka.common.Metric; > > > > import org.apache.kafka.common.MetricName; > > > > import org.apache.kafka.common.serialization.Serializer; > > > > import org.apache.kafka.common.serialization.StringSerializer; > > > > import org.apache.kafka.common.serialization.ByteArraySerializer; > > > > > > > > import java.nio.file.DirectoryStream; > > > > import java.nio.file.Files; > > > > import java.nio.file.Path; > > > > import java.nio.file.Paths; > > > > > > > > public class TestKafkaProducer { > > > > > > > > Map props = new HashMap(); > > > >props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > > > metadataBroker); > > > >props.put("producer.type", "async"); > > > >props.put("queue.buffering.max.ms", "5000"); > > > > > > > > Serializer keySerializer = new StringSerializer(); > > > >Serializer valueSerializer = new > ByteArraySerializer(); > > > > > > > >producer = new KafkaProducer(props, > > keySerializer, > > > > valueSerializer); > > > > > > > > ProducerRecord imageRecord; > > > > > > > > while ( true ) { > > > > imageRecord = new ProducerRecord(topicName, > > > > recordKey,imageBytes); > > > > > > > >producer.send(imageRecord); > > > > } > > > > } > > > > > > > > size of my message is around 77K > > > > > > > > but its work like a synchronous producer , send every message to > broker > > > . > > > > not buffering a message in to queue and send after 5 sec > > > > > > > > > > > > please help to find out a solution. > > > > > > > > > > > > Regards > > > > Prateek > > > > > > > > > A Balance Transfer Card With An Outrageously Long Intro Rate And No > > > Balance Transfer Fees That Can Save You Thousands > > > > http://thirdpartyoffers.netzero.net/TGL3231/561e9a75a77071a74763fst04vuc > > > > > > -- > Grant Henke > Software Engineer | Cloudera > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke >
Re: Facing Issue to create asyn producer in kafka 0.8.2
Looks like you may be mixing the new producer with old producer configs. See the new config documentation here: http://kafka.apache.org/documentation.html#newproducerconfigs. You will likely want to set the "batch.size" and "linger.ms" to achieve your goal. Thanks, Grant On Wed, Oct 14, 2015 at 1:29 PM, prateek arora wrote: > Hi > > Thanks for help . > > but same behavior even after changing batch.size > > I have changes batch.size value to 33554432. > props.put("batch.size","33554432"); > > > > On Wed, Oct 14, 2015 at 11:09 AM, Zakee wrote: > > > Hi Prateek, > > > > Looks like you are using default batch.size which is ~16K and it forces > > the send of messages immediately as your single message is larger than > > that. Try using larger batch.size. > > > > Thanks > > Zakee > > > > > > > > > On Oct 14, 2015, at 10:29 AM, prateek arora < > prateek.arora...@gmail.com> > > wrote: > > > > > > Hi > > > > > > I want to create async producer so i can buffer messages in queue and > > send > > > after every 5 sec . > > > > > > my kafka version is 0.8.2.0. > > > > > > and i am using kafka-clients 0.8.2.0 to create kafka producer in java. > > > > > > > > > below is my sample code : > > > > > > package com.intel.labs.ive.cloud.testKafkaProducerJ; > > > > > > import java.nio.charset.Charset; > > > import java.util.HashMap; > > > > > > import java.util.Map; > > > > > > import org.apache.kafka.clients.producer.KafkaProducer; > > > import org.apache.kafka.clients.producer.Producer; > > > import org.apache.kafka.clients.producer.ProducerConfig; > > > import org.apache.kafka.clients.producer.ProducerRecord; > > > import org.apache.kafka.common.Metric; > > > import org.apache.kafka.common.MetricName; > > > import org.apache.kafka.common.serialization.Serializer; > > > import org.apache.kafka.common.serialization.StringSerializer; > > > import org.apache.kafka.common.serialization.ByteArraySerializer; > > > > > > import java.nio.file.DirectoryStream; > > > import java.nio.file.Files; > > > import java.nio.file.Path; > > > import java.nio.file.Paths; > > > > > > public class TestKafkaProducer { > > > > > > Map props = new HashMap(); > > >props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > > metadataBroker); > > >props.put("producer.type", "async"); > > >props.put("queue.buffering.max.ms", "5000"); > > > > > > Serializer keySerializer = new StringSerializer(); > > >Serializer valueSerializer = new ByteArraySerializer(); > > > > > >producer = new KafkaProducer(props, > keySerializer, > > > valueSerializer); > > > > > > ProducerRecord imageRecord; > > > > > > while ( true ) { > > > imageRecord = new ProducerRecord(topicName, > > > recordKey,imageBytes); > > > > > >producer.send(imageRecord); > > > } > > > } > > > > > > size of my message is around 77K > > > > > > but its work like a synchronous producer , send every message to broker > > . > > > not buffering a message in to queue and send after 5 sec > > > > > > > > > please help to find out a solution. > > > > > > > > > Regards > > > Prateek > > > > > > A Balance Transfer Card With An Outrageously Long Intro Rate And No > > Balance Transfer Fees That Can Save You Thousands > > http://thirdpartyoffers.netzero.net/TGL3231/561e9a75a77071a74763fst04vuc > -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: Facing Issue to create asyn producer in kafka 0.8.2
Hi Thanks for help . but same behavior even after changing batch.size I have changes batch.size value to 33554432. props.put("batch.size","33554432"); On Wed, Oct 14, 2015 at 11:09 AM, Zakee wrote: > Hi Prateek, > > Looks like you are using default batch.size which is ~16K and it forces > the send of messages immediately as your single message is larger than > that. Try using larger batch.size. > > Thanks > Zakee > > > > > On Oct 14, 2015, at 10:29 AM, prateek arora > wrote: > > > > Hi > > > > I want to create async producer so i can buffer messages in queue and > send > > after every 5 sec . > > > > my kafka version is 0.8.2.0. > > > > and i am using kafka-clients 0.8.2.0 to create kafka producer in java. > > > > > > below is my sample code : > > > > package com.intel.labs.ive.cloud.testKafkaProducerJ; > > > > import java.nio.charset.Charset; > > import java.util.HashMap; > > > > import java.util.Map; > > > > import org.apache.kafka.clients.producer.KafkaProducer; > > import org.apache.kafka.clients.producer.Producer; > > import org.apache.kafka.clients.producer.ProducerConfig; > > import org.apache.kafka.clients.producer.ProducerRecord; > > import org.apache.kafka.common.Metric; > > import org.apache.kafka.common.MetricName; > > import org.apache.kafka.common.serialization.Serializer; > > import org.apache.kafka.common.serialization.StringSerializer; > > import org.apache.kafka.common.serialization.ByteArraySerializer; > > > > import java.nio.file.DirectoryStream; > > import java.nio.file.Files; > > import java.nio.file.Path; > > import java.nio.file.Paths; > > > > public class TestKafkaProducer { > > > > Map props = new HashMap(); > >props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > metadataBroker); > >props.put("producer.type", "async"); > >props.put("queue.buffering.max.ms", "5000"); > > > > Serializer keySerializer = new StringSerializer(); > >Serializer valueSerializer = new ByteArraySerializer(); > > > >producer = new KafkaProducer(props, keySerializer, > > valueSerializer); > > > > ProducerRecord imageRecord; > > > > while ( true ) { > > imageRecord = new ProducerRecord(topicName, > > recordKey,imageBytes); > > > >producer.send(imageRecord); > > } > > } > > > > size of my message is around 77K > > > > but its work like a synchronous producer , send every message to broker > . > > not buffering a message in to queue and send after 5 sec > > > > > > please help to find out a solution. > > > > > > Regards > > Prateek > > > A Balance Transfer Card With An Outrageously Long Intro Rate And No > Balance Transfer Fees That Can Save You Thousands > http://thirdpartyoffers.netzero.net/TGL3231/561e9a75a77071a74763fst04vuc
Re: Facing Issue to create asyn producer in kafka 0.8.2
Hi Prateek, Looks like you are using default batch.size which is ~16K and it forces the send of messages immediately as your single message is larger than that. Try using larger batch.size. Thanks Zakee > On Oct 14, 2015, at 10:29 AM, prateek arora > wrote: > > Hi > > I want to create async producer so i can buffer messages in queue and send > after every 5 sec . > > my kafka version is 0.8.2.0. > > and i am using kafka-clients 0.8.2.0 to create kafka producer in java. > > > below is my sample code : > > package com.intel.labs.ive.cloud.testKafkaProducerJ; > > import java.nio.charset.Charset; > import java.util.HashMap; > > import java.util.Map; > > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.Producer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.Metric; > import org.apache.kafka.common.MetricName; > import org.apache.kafka.common.serialization.Serializer; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.kafka.common.serialization.ByteArraySerializer; > > import java.nio.file.DirectoryStream; > import java.nio.file.Files; > import java.nio.file.Path; > import java.nio.file.Paths; > > public class TestKafkaProducer { > > Map props = new HashMap(); >props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, metadataBroker); >props.put("producer.type", "async"); >props.put("queue.buffering.max.ms", "5000"); > > Serializer keySerializer = new StringSerializer(); >Serializer valueSerializer = new ByteArraySerializer(); > >producer = new KafkaProducer(props, keySerializer, > valueSerializer); > > ProducerRecord imageRecord; > > while ( true ) { > imageRecord = new ProducerRecord(topicName, > recordKey,imageBytes); > >producer.send(imageRecord); > } > } > > size of my message is around 77K > > but its work like a synchronous producer , send every message to broker . > not buffering a message in to queue and send after 5 sec > > > please help to find out a solution. > > > Regards > Prateek A Balance Transfer Card With An Outrageously Long Intro Rate And No Balance Transfer Fees That Can Save You Thousands http://thirdpartyoffers.netzero.net/TGL3231/561e9a75a77071a74763fst04vuc
Facing Issue to create asyn producer in kafka 0.8.2
Hi I want to create async producer so i can buffer messages in queue and send after every 5 sec . my kafka version is 0.8.2.0. and i am using kafka-clients 0.8.2.0 to create kafka producer in java. below is my sample code : package com.intel.labs.ive.cloud.testKafkaProducerJ; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; public class TestKafkaProducer { Map props = new HashMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, metadataBroker); props.put("producer.type", "async"); props.put("queue.buffering.max.ms", "5000"); Serializer keySerializer = new StringSerializer(); Serializer valueSerializer = new ByteArraySerializer(); producer = new KafkaProducer(props, keySerializer, valueSerializer); ProducerRecord imageRecord; while ( true ) { imageRecord = new ProducerRecord(topicName, recordKey,imageBytes); producer.send(imageRecord); } } size of my message is around 77K but its work like a synchronous producer , send every message to broker . not buffering a message in to queue and send after 5 sec please help to find out a solution. Regards Prateek