Re: Potential socket leak in kafka sync producer

2015-01-30 Thread Jaikiran Pai

Hi Ankit,

Would you be able to share the trimmed down application code which 
reproduces this issue (maybe as a repo on github)? That way, some of us 
will get more context about the issue and probably be able to try it out 
ourselves and see what's wrong.


On a related note, have you tried this against a later version (like the 
0.8.2.0 CR3) and see if it's still reproducible there?


-Jaikiran
On Friday 30 January 2015 02:59 PM, ankit tyagi wrote:

  Jaikiran,

I have already investigated that this is kafka related. I made a small
application which is used only for publishing messages to kafka. If I use
dynamic thread pool means where number of maxPoolSize  is very large
comparative to corePoolSize and I publish each batch of messages only after
all threads gets destroyed after keepAliveSeconds then FD leak problem
occurs . I suspect when threads gets destroyed some how file handlers is
not getting cleared . So when I trigger explict GC, descriptor count get
reduced by signifact amont because of clean up of those destroyed thread.


we got this problem in our production box where soft and hard limit of file
descriptor was 5 but for reproducing this issue on my local machine i
have reduced hard limit to 6000 and used 1000 threads to send message to
  kafka (topic had 100 partition with 1 replication factor)





On Fri, Jan 30, 2015 at 2:14 PM, Jaikiran Pai 
wrote:


Looking at that heap dump, this probably is a database connection/resource
leak (298 connections?) than anything to do with Kafka. Have you
investigated if there's any DB resource leak in the application and ruled
out that part?

-Jaikiran


On Friday 30 January 2015 01:08 PM, ankit tyagi wrote:


I have shared object histogram after and before gc on gist
https://gist.github.com/ankit1987/f4a04a1350fdd609096d

On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai 
wrote:

  What kind of a (managed) component is that which has the @PreDestroy?

Looking at the previous snippet you added, it looks like you are creating
the Producer in some method? If  you are going to close the producer in a
@PreDestroy of the component, then you should be creating the producer in
the @PostConstruct of the same component, so that you have proper
lifecycle
management of those resources.


-Jaikiran

On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:

  Hi,

I am closing my producer at the time of shutting down my application.

@PreDestroy
   public void stop()
   {
   LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
   if (myProducer != null) {
   myProducer.close();
   }
   }



On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy 
gist/patebin

On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi <
ankittyagi.mn...@gmail.com>
wrote:

   Hi Jaikiran,


I am using ubuntu and was able to reproduce on redhat too. Please find

  the

  more information below.


*DISTRIB_ID=Ubuntu*
*DISTRIB_RELEASE=12.04*
*DISTRIB_CODENAME=precise*
*DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*

*java version "1.7.0_72"*

This is happening on client side. Output of lsof was showing that
maximum
fd were FIFO and anon. But after GC FD count was reduced
significantly.

Below is my Client Code which i am using for publishing message.


* private Producer myProducer;*

* myProducer =new Producer<>(new
ProducerConfig(myProducerProperties));*

*   public void send(*
*List>
msgs)*
*{*
*myProducer.send(msgs);*
*}*


we are using sync producer. I am attaching object histo before

  GC(histo_1)

  and after GC(histo_2) in my application.

On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai <
jai.forums2...@gmail.com
wrote:

   Which operating system are you on and what Java version? Depending
on
the
OS, you could get tools (like lsof) to show which file descriptors are


being held on to. Is it the client JVM which ends up with these
leaks?

Also, would it be possible to post a snippet of your application code
which shows how you are using the Kafka APIs?

-Jaikiran
On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:

   Hi,


Currently we are using sync producer client of 0.8.1 version in our
production box . we are getting the following exception while

  publishing

kafka message


*[2015-01-29

13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]

  Fetching

topic metadata with correlation id 10808 for topics [Set(*


*kafka_topic_coms_FD_test1)] from broker

  [id:0,host:localhost,port:9092]

failed*


*java.net.ConnectException: Connection refused*

*at sun.nio.ch.Net.connect0(Native Method)*
*at sun.nio.ch.Net.connect(Net.java:465)*
*at sun.nio.ch.Net.connect(Net.java:457)*
*at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
at

  kafka.network.BlockingChannel.connect(BlockingChannel.scala:

57)


at

  kafka.producer.SyncProducer.connect(SyncProducer.scala:141)

at


   kafka.producer.SyncProducer.getOrMake

Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 3

2015-01-30 Thread Joe Stein
+1 (binding)

verified signatures, ran quick start, tests all passed.

- Joe Stein

On Fri, Jan 30, 2015 at 12:04 PM, Jun Rao  wrote:

> This is a reminder that the vote will close tomorrow night. Please test
> RC3 out and vote before the deadline.
>
> Thanks,
>
> Jun
>
> On Wed, Jan 28, 2015 at 11:22 PM, Jun Rao  wrote:
>
>> This is the third candidate for release of Apache Kafka 0.8.2.0.
>>
>> Release Notes for the 0.8.2.0 release
>>
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
>>
>> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
>> (SHA256) checksum.
>>
>> * Release artifacts to be voted upon (source and binary):
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>>
>> * Maven artifacts to be voted upon prior to release:
>> https://repository.apache.org/content/groups/staging/
>>
>> * scala-doc
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
>>
>> * java-doc
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
>>
>> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
>>
>> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
>> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
>>
>> /***
>>
>> Thanks,
>>
>> Jun
>>
>>
>  --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at http://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G9pGUpo7jp4BXY3Y_doPTzZiJKRCPPY-oYj1sqAapUx-Q%40mail.gmail.com
> 
> .
>
> For more options, visit https://groups.google.com/d/optout.
>


Re: One or multiple instances of MM to aggregate kafka data to one hadoop

2015-01-30 Thread Mingjie Lai
Really appreciate you guys' recommendations.

On Thu, Jan 29, 2015 at 9:22 AM, Jon Bringhurst <
jbringhu...@linkedin.com.invalid> wrote:

> Hey Mingjie,
>
> Here's how we have our mirror makers configured. For some context, let me
> try to describe this using the example datacenter layout as described in:
>
> https://engineering.linkedin.com/samza/operating-apache-samza-scale
>
> In that example, there are four data centers (A, B, C, and D). However, we
> only need Datacenter A and B to describe this.
>
> Datacenter A mirrors data from local(A) to aggregate(A) as well as
> local(B) to aggregate(A).
>
> Datacenter B mirrors data from local(B) to aggregate(B) as well as
> local(A) to aggregate(B).
>
> The diagram in the article should make easy to visualize. Note that the
> mirror makers are running in the destination datacenter and pull the
> traffic in.
>
> Let's say we have two physical machines (lets call them servers 1 and 2 in
> datacenter A; servers 3 and 4 in datacenter B) in each datacenter dedicated
> to running mirror makers. This is how the layout of mirror maker processes
> would look like:
>
> * Datacenter A MirrorMaker Cluster
> * Server 1
> * local(A) to aggregate(A) MirrorMaker Instance
> * local(B) to aggregate(A) MirrorMaker Instance
> * Server 2
> * local(A) to aggregate(A) MirrorMaker Instance
> * local(B) to aggregate(A) MirrorMaker Instance
>
> * Datacenter B MirrorMaker Cluster
> * Server 3
> * local(B) to aggregate(B) MirrorMaker Instance
> * local(A) to aggregate(B) MirrorMaker Instance
> * Server 4
> * local(B) to aggregate(B) MirrorMaker Instance
> * local(A) to aggregate(B) MirrorMaker Instance
>
> The benefit of this layout is that if the load becomes too high, we would
> then add on another server to each cluster that looks exactly like the
> others in the cluster (easy to provision). If you get really huge, you can
> start creating multiple mirror maker clusters that each handle a specific
> flow (but still have homogeneous processes within each cluster).
>
> Of course, YMMV, but this is what works well for us. :)
>
> -Jon
>
> On Jan 28, 2015, at 3:54 PM, Daniel Compton <
> daniel.compton.li...@gmail.com> wrote:
>
> > Hi Mingjie
> >
> > I would recommend the first option of running one mirrormaker instance
> > pulling from multiple DC's.
> >
> > A single MM instance will be able to make more efficient use of the
> machine
> > resources in two ways:
> > 1. You will only have to run one process which will be able to be
> allocated
> > the full amount of resources
> > 2. Within the process, if you run enough consumer threads, I think that
> > they should be able to rebalance and pick up the load if they don't have
> > anything to do. I'm not 100% sure on this, but 1 still holds.
> >
> > A single MM instance should handle connectivity issues with one DC
> without
> > affecting the rest of the consumer threads for other DC's.
> >
> > You would gain process isolation running a MM per DC, but this would
> raise
> > the operational burden and resource requirements. I'm not sure what
> benefit
> > you'd actually get from process isolation, so I'd recommend against it.
> > However I'd be interested to hear if others do things differently.
> >
> > Daniel.
> >
> > On Thu Jan 29 2015 at 11:14:29 AM Mingjie Lai  wrote:
> >
> >> Hi.
> >>
> >> We have a pretty typical data ingestion use case that we use
> mirrormaker at
> >> one hadoop data center, to mirror kafka data from multiple remote
> >> application data centers. I know mirrormaker can support to consume
> kafka
> >> data from multiple kafka source, by one instance at one physical node.
> By
> >> this, we can give one instance of mm multiple consumer config files, so
> it
> >> can consume data from muti places.
> >>
> >> Another option is to have multiple mirrormaker instances at one node,
> each
> >> mm instance is dedicated to grab data from one single source data
> center.
> >> Certainly there will be multiple mm nodes to balance the load.
> >>
> >> The second option looks better since it kind of has an isolation for
> >> different data centers.
> >>
> >> Any recommendation for this kind of data aggregation cases?
> >>
> >> Still new to kafka and mirrormaker. Welcome any information.
> >>
> >> Thanks,
> >> Mingjie
> >>
>
>


Re: Using Only one partition to store object change log

2015-01-30 Thread Steven Wu
do you need total ordering among all events? or you just need ordering by
some partitionKey (e.g. events regarding one particular database key or
user id)? if it's the later, you can create multiple partitions and just
partition your events using the key to different kafka partitions.

On Fri, Jan 30, 2015 at 12:57 AM, noodles  wrote:

> HI Group:
>
> I'm currently working to push object changes into external services. Now we
> are trying to append the change record into a Kafka.
>
> **My problem**
>
> Only one partition can be created for one kind of data, so that the
> sequence of change can be guaranteed. If I do like that, I guess I will
> lost the performance and the load balance feature.
>
> Do I need to worry about this problem?
>
> --
> *noodles!*
>


KafkaWordCount

2015-01-30 Thread Eduardo Costa Alfaia
Hi Guys,

I would like to put in the kafkawordcount scala code the kafka parameter:  val 
kafkaParams = Map(“fetch.message.max.bytes” -> “400”). I’ve put this 
variable like this

val KafkaDStreams = (1 to numStreams) map {_ => 

  
KafkaUtils.createStream(ssc, kafkaParams, zkQuorum, group, 
topicpMap).map(_._2)


However I’ve gotten these erros:

 (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum: 
String,groupId: String,topics: jav  
   a.util.Map[String,Integer],storageLevel: 
org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.Jav   
 
aPairReceiverInputDStream[String,String]   

   
[error]   (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum: 
String,groupId: String,topics: scala.collection.
 
immutable.Map[String,Int],storageLevel: 
org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.Recei  
   
verInputDStream[(String, String)]

Thanks
-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: Resilient Producer

2015-01-30 Thread Otis Gospodnetic
Fernando, have a look -
http://blog.sematext.com/2014/10/06/top-5-most-popular-log-shippers/

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Wed, Jan 28, 2015 at 1:39 PM, Fernando O.  wrote:

> Hi all,
> I'm evaluating using Kafka.
>
> I liked this thing of Facebook scribe that you log to your own machine and
> then there's a separate process that forwards messages to the central
> logger.
>
> With Kafka it seems that I have to embed the publisher in my app, and deal
> with any communication problem managing that on the producer side.
>
> I googled quite a bit trying to find a project that would basically use
> daemon that parses a log file and send the lines to the Kafka cluster
> (something like a tail file.log but instead of redirecting the output to
> the console: send it to kafka)
>
> Does anyone knows about something like that?
>
>
> Thanks!
> Fernando.
>


java.nio.channels.ClosedChannelException

2015-01-30 Thread Sa Li
Hi, All

I send messages from one VM to production, but getting such error

[2015-01-30 18:43:44,810] WARN Failed to send producer request with
correlation id 126 to broker 101 with data for partitions
[test-rep-three,5],[test-rep-three,2]
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
   at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
   at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
   at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
   at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
   at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
   at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
   at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
   at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
   at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:256)
   at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
   at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:99)
   at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
   at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)

I actually had this kind of error before in another VM, now it works fine
in that VM, but when I start a new VM and get kafka build in this new VM,
it comes again, really can't recall what did I do to fix this problem
before, any ideas?  BTW, I telnet, it says connected.

thanks

AL

-- 

Alec Li


Re: [VOTE] 0.8.2.0 Candidate 3

2015-01-30 Thread Jun Rao
This is a reminder that the vote will close tomorrow night. Please test RC3
out and vote before the deadline.

Thanks,

Jun

On Wed, Jan 28, 2015 at 11:22 PM, Jun Rao  wrote:

> This is the third candidate for release of Apache Kafka 0.8.2.0.
>
> Release Notes for the 0.8.2.0 release
>
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
>
> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> (SHA256) checksum.
>
> * Release artifacts to be voted upon (source and binary):
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>
> * Maven artifacts to be voted upon prior to release:
> https://repository.apache.org/content/groups/staging/
>
> * scala-doc
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
>
> * java-doc
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
>
> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
>
> /***
>
> Thanks,
>
> Jun
>
>


RE: per-topic configuration names... unit suffix ?

2015-01-30 Thread Thunder Stumpges
Well, in lack of a response, I rolled up my sleeves and took to the code. It 
appears that these ARE indeed in Milliseconds!

So, while you set the log.retention.minutes in increments of minutes, you must 
set the topic-level configuration in milliseconds. Pretty odd. Though it does 
appear that you can set the global config in other units..


if(props.containsKey("log.retention.ms")){
   props.getIntInRange("log.retention.ms", (1, Int.MaxValue))
}
else if(props.containsKey("log.retention.minutes")){
   millisInMinute * props.getIntInRange("log.retention.minutes", (1, 
Int.MaxValue))
} 
else {
   millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, 
Int.MaxValue))
}

Cheers,
Thunder


-Original Message-
From: Thunder Stumpges [mailto:tstump...@ntent.com] 
Sent: Thursday, January 29, 2015 2:39 PM
To: users@kafka.apache.org
Subject: per-topic configuration names... unit suffix ?

Hi all,

I am reading about the per-topic 
configs and the unit 
suffixes on a couple don't seem to match to the global/server ones.

Specifically:

retention.ms => log.retention.minutes
segment.ms => log.roll.hours

Can someone clear this up?

Is the per-topic key really 'ms'? If so, are the units actually in milliseconds?

I would expect the units to match the global, and therefore I would expect the 
key suffix to match. Maybe this is just a documentation error?

Thanks!
Thunder



Re: Potential socket leak in kafka sync producer

2015-01-30 Thread ankit tyagi
 Jaikiran,

I have already investigated that this is kafka related. I made a small
application which is used only for publishing messages to kafka. If I use
dynamic thread pool means where number of maxPoolSize  is very large
comparative to corePoolSize and I publish each batch of messages only after
all threads gets destroyed after keepAliveSeconds then FD leak problem
occurs . I suspect when threads gets destroyed some how file handlers is
not getting cleared . So when I trigger explict GC, descriptor count get
reduced by signifact amont because of clean up of those destroyed thread.


we got this problem in our production box where soft and hard limit of file
descriptor was 5 but for reproducing this issue on my local machine i
have reduced hard limit to 6000 and used 1000 threads to send message to
 kafka (topic had 100 partition with 1 replication factor)





On Fri, Jan 30, 2015 at 2:14 PM, Jaikiran Pai 
wrote:

> Looking at that heap dump, this probably is a database connection/resource
> leak (298 connections?) than anything to do with Kafka. Have you
> investigated if there's any DB resource leak in the application and ruled
> out that part?
>
> -Jaikiran
>
>
> On Friday 30 January 2015 01:08 PM, ankit tyagi wrote:
>
>> I have shared object histogram after and before gc on gist
>> https://gist.github.com/ankit1987/f4a04a1350fdd609096d
>>
>> On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai 
>> wrote:
>>
>>  What kind of a (managed) component is that which has the @PreDestroy?
>>> Looking at the previous snippet you added, it looks like you are creating
>>> the Producer in some method? If  you are going to close the producer in a
>>> @PreDestroy of the component, then you should be creating the producer in
>>> the @PostConstruct of the same component, so that you have proper
>>> lifecycle
>>> management of those resources.
>>>
>>>
>>> -Jaikiran
>>>
>>> On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:
>>>
>>>  Hi,

 I am closing my producer at the time of shutting down my application.

 @PreDestroy
   public void stop()
   {
   LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
   if (myProducer != null) {
   myProducer.close();
   }
   }



 On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy >>> >
 wrote:

   Hope you are closing the producers. can you share the attachment
 through

> gist/patebin
>
> On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi <
> ankittyagi.mn...@gmail.com>
> wrote:
>
>   Hi Jaikiran,
>
>> I am using ubuntu and was able to reproduce on redhat too. Please find
>>
>>  the
>
>  more information below.
>>
>>
>> *DISTRIB_ID=Ubuntu*
>> *DISTRIB_RELEASE=12.04*
>> *DISTRIB_CODENAME=precise*
>> *DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*
>>
>> *java version "1.7.0_72"*
>>
>> This is happening on client side. Output of lsof was showing that
>> maximum
>> fd were FIFO and anon. But after GC FD count was reduced
>> significantly.
>>
>> Below is my Client Code which i am using for publishing message.
>>
>>
>> * private Producer myProducer;*
>>
>> * myProducer =new Producer<>(new
>> ProducerConfig(myProducerProperties));*
>>
>> *   public void send(*
>> *List>
>> msgs)*
>> *{*
>> *myProducer.send(msgs);*
>> *}*
>>
>>
>> we are using sync producer. I am attaching object histo before
>>
>>  GC(histo_1)
>
>  and after GC(histo_2) in my application.
>>
>> On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai <
>> jai.forums2...@gmail.com
>> wrote:
>>
>>   Which operating system are you on and what Java version? Depending
>> on
>> the
>> OS, you could get tools (like lsof) to show which file descriptors are
>>
>>> being held on to. Is it the client JVM which ends up with these
>>> leaks?
>>>
>>> Also, would it be possible to post a snippet of your application code
>>> which shows how you are using the Kafka APIs?
>>>
>>> -Jaikiran
>>> On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:
>>>
>>>   Hi,
>>>
 Currently we are using sync producer client of 0.8.1 version in our
 production box . we are getting the following exception while

  publishing
>>>
>> kafka message
>>
>>> *[2015-01-29
 13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]

  Fetching
>>> topic metadata with correlation id 10808 for topics [Set(*
>>>
 *kafka_topic_coms_FD_test1)] from broker

  [id:0,host:localhost,port:9092]
>>>
>> failed*
>>
>>> *java.net.ConnectException: Connection refused*
 *at sun.nio.ch.Net.connect0(Native Method)*
>>

Using Only one partition to store object change log

2015-01-30 Thread noodles
HI Group:

I'm currently working to push object changes into external services. Now we
are trying to append the change record into a Kafka.

**My problem**

Only one partition can be created for one kind of data, so that the
sequence of change can be guaranteed. If I do like that, I guess I will
lost the performance and the load balance feature.

Do I need to worry about this problem?

-- 
*noodles!*


Re: Potential socket leak in kafka sync producer

2015-01-30 Thread Jaikiran Pai
Looking at that heap dump, this probably is a database 
connection/resource leak (298 connections?) than anything to do with 
Kafka. Have you investigated if there's any DB resource leak in the 
application and ruled out that part?


-Jaikiran

On Friday 30 January 2015 01:08 PM, ankit tyagi wrote:

I have shared object histogram after and before gc on gist
https://gist.github.com/ankit1987/f4a04a1350fdd609096d

On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai 
wrote:


What kind of a (managed) component is that which has the @PreDestroy?
Looking at the previous snippet you added, it looks like you are creating
the Producer in some method? If  you are going to close the producer in a
@PreDestroy of the component, then you should be creating the producer in
the @PostConstruct of the same component, so that you have proper lifecycle
management of those resources.


-Jaikiran

On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:


Hi,

I am closing my producer at the time of shutting down my application.

@PreDestroy
  public void stop()
  {
  LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
  if (myProducer != null) {
  myProducer.close();
  }
  }



On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy 
wrote:

  Hope you are closing the producers. can you share the attachment through

gist/patebin

On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi <
ankittyagi.mn...@gmail.com>
wrote:

  Hi Jaikiran,

I am using ubuntu and was able to reproduce on redhat too. Please find


the


more information below.


*DISTRIB_ID=Ubuntu*
*DISTRIB_RELEASE=12.04*
*DISTRIB_CODENAME=precise*
*DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*

*java version "1.7.0_72"*

This is happening on client side. Output of lsof was showing that
maximum
fd were FIFO and anon. But after GC FD count was reduced significantly.

Below is my Client Code which i am using for publishing message.


* private Producer myProducer;*

* myProducer =new Producer<>(new
ProducerConfig(myProducerProperties));*

*   public void send(*
*List>
msgs)*
*{*
*myProducer.send(msgs);*
*}*


we are using sync producer. I am attaching object histo before


GC(histo_1)


and after GC(histo_2) in my application.

On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai 
being held on to. Is it the client JVM which ends up with these leaks?

Also, would it be possible to post a snippet of your application code
which shows how you are using the Kafka APIs?

-Jaikiran
On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:

  Hi,

Currently we are using sync producer client of 0.8.1 version in our
production box . we are getting the following exception while


publishing

kafka message

*[2015-01-29
13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]


Fetching
topic metadata with correlation id 10808 for topics [Set(*

*kafka_topic_coms_FD_test1)] from broker


[id:0,host:localhost,port:9092]

failed*

*java.net.ConnectException: Connection refused*
*at sun.nio.ch.Net.connect0(Native Method)*
*at sun.nio.ch.Net.connect(Net.java:465)*
*at sun.nio.ch.Net.connect(Net.java:457)*
*at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
   at


kafka.network.BlockingChannel.connect(BlockingChannel.scala:

57)

   at


kafka.producer.SyncProducer.connect(SyncProducer.scala:141)

   at

  kafka.producer.SyncProducer.getOrMakeConnection(

SyncProducer.scala:156)


   at

kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
doSend(SyncProducer.scala:68)
   at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
   at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
   at
kafka.producer.BrokerPartitionInfo.updateInfo(
BrokerPartitionInfo.scala:82)


we are using dynamic thread pool to publish message to kafka. My
observation is when after keep alive time when threads in my executor


gets
destroyed, somehow file descriptor is not getting cleared but when i
did

explicitly ran the full gc, fd count got reduced by a signification

amout.