Named Pipe and Kafka Producer

2017-07-20 Thread Milind Vaidya
Hi

I am using named pipe and reading from it using Java and sending events to
Kafka Cluster.

The std out of a process is `tee` ed to

But I am observing data loss. I am yet to debug this issue. I was wondering
if anybody has already interfaced name pipe for sending data to kafka and
what are the things to be taken care of.

Thanks


Re: Kafka 0.10.1 cluster using 100% disk usage (reads)

2017-06-12 Thread Milind Vaidya
This sound exactly similar to what I experienced in the similar scenario.

Can you please take a look at the File System time stamp of the actual log
files on one of the broker hosts ?

For me when I restarted the new brokers with version 0.10.0 it changes to
current ts. Meaning if I have set 48 hrs of retention window, the file
which is about to get deleted is stamped with latest ts so it will be
scheduled to be deleted 48 hrs later. More times you restart the broker it
will push deletion operation further into future. So this will lead to
additional data retained on the disk.

I posted the same question to which I was told it should be fixed in 0.10.1
but I observed same behaviour.
Apparently the change from 0.8 to 0.10 is that, the file ts of the log
chunk is not more taken into consideration but the time stamp with in the
message as set by new protocol is used and it is backward compatible,
meaning 0.8 logs files will be treated for their actual file TS. But does
not look like in practical.

Here are solutions I used.

1. Reduce the retention period if you can so the disk will still hold the
logs till they expire in near future after which things become smooth
2. Use log.retention.bytes option for brokers to limit size of the logs
retained.

My request is that this should be clearly added to documentation of
upgrade.

Hope this helps.


On Sun, Jun 11, 2017 at 7:46 PM, ext-gfenol...@eramet-sln.nc <
ext-gfenol...@eramet-sln.nc> wrote:

> Hello,
>
> Since we upgraded from Kafka 0.8 to Kafka0.10.1, our brokers are using
> 100% of our disk capacity (100% util in iostat), with from 100MB/s to
> 1000MB/s constant read stats.
>
> It’s been half a week now, and usage is not decreasing. Note that we
> didn’t experience that before the upgrade.
>
>
>
> There’s nothing particularly helpful in the logs (except the fact that we
> have corrupted index files at startup that kafka recreated corrupted again,
> but it was already present in Kafka 0.8).
>
> It leads to a fairly high amount of CPU iowait, in all our environments.
>
> Even weirder thing is that the usage is not exactly the same on all
> brokers. In first one, we have 100MB/s, 200MB/s for the 2nd one, and
> 1000MB/s for the 3rd one, all of them having the same datastore, same
> disk speed.
>
>
>
> With htop, I can see that only one Java thread is doing that, but I’m not
> sure how to gather much info about it.
>
>
>
> Can you help ?
>
> Thanks,
>
>
>
> Guillaume
>
>
>
>
>
> *Guillaume FENOLLAR*
>
> Prestataire pour DSI/ESI
>
> Société Le Nickel - SLN
>
> Courriel : ext-gfenol...@eramet-sln.nc
>
> Site internet : www.sln.nc
>
> Page Facebook : LeNickel.SLN
>
>
>
>
>
>
> --
> CONFIDENTIALITE
> L'information contenue dans ce courrier électronique et ses pièces jointes
> est confidentielle, et est établie à l'intention exclusive de ses
> destinataires. Dans le cas où ce message ne vous serait pas destiné, nous
> vous remercions de bien vouloir en aviser immédiatement l'émetteur et de
> procéder à sa suppression. Toutes copies, diffusions ou accès non autorisés
> à ce message sont interdits à toutes personnes, autre que le(s)
> destinataire(s). Un courrier électronique est susceptible d’altération ou
> de falsification et peut entrainer des pertes et/ou la destruction de
> données. Le Groupe ERAMET et/ou ses filiales déclinent toute responsabilité
> en la matière. En conséquence ce courrier électronique ainsi que ses pièces
> jointes sont utilisés à votre propre risque.
>
> CONFIDENTIALITY
> The information contained in this e-mail and any accompanying documents is
> confidential or otherwise protected from disclosure. If you are not the
> intended recipient, please immediately alert the sender by reply e-mail and
> delete this message and any attachments. Any copy, dissemination or
> unauthorized access of the contents of this message by anyone other than
> the intended recipient is strictly prohibited. E-mails may be susceptible
> to falsification or alteration and cause data corruption and/or loss of
> data. ERAMET and/or any of its subsidiaries decline any liability resulting
> from the consequences thereof. Therefore, this e-mail and any attachments
> are used at your own risk.
>


Re: 0.10.0.0 cluster : segments getting latest ts

2017-05-30 Thread Milind Vaidya
Upgraded prod cluster to 0.10.0.1
<https://issues.apache.org/jira/browse/KAFKA/fixforversion/12334962>.

But the issue did not go away. The moment brokers were upgraded and
restarted in rolling fashion, the File system TS changed to current one for
all log files. Fortunately we knew what to expect so we had retention set
to 24 hrs instead of 48 hrs. So accumulated data was less (worth 24 hr
old). With new restart we had reset the retention to 48 hrs. So It took
another 48 hrs for 'current' data (which was already 24 hrs old at the time
of restart) to  expire and go away. This increased disk space
utilisation from ~30% (24 hrs of data) to ~80% (24 + 48 = 72 hrs of data).
As 48 hrs window was over this data was rapidly deleted and disk
utilisation  dropped to ~60%.

After that it is stable around ~60% which was the case before version
upgrade and I guess was expected behaviour. This was consistently  observed
across 5 brokers we upgraded.

Now here I am not sure if the internal kafka ts is being used or the file
ts. At least the behaviour observed indicates it is the file ts. The 24 hrs
data mensioned above was with protocol 0.10.0.0 which was the previous
upgrade done 2 days ago.

I think elaborated explanation regarding this should be added to
documentation for version upgrade, so that there be no nasty surprise
leading to disks getting full and brokers starting to throw file
descriptor related error, eventually shutting down.

Please also point me to any setting that I may have missed which led to
File system TS consideration instead of internal kafka ts one.







On Thu, May 25, 2017 at 2:26 PM, Hans Jespersen <h...@confluent.io> wrote:

>
> If the last message (or all the messages) in the earliest segment has no
> timestamp it will use the filesystem timestamp for expiring.
> Since the timestamps on your 3 brokers got reset then it will be 
> log.retention.hours=24
> (1 day) before these segments can be deleted (unless you reset the file
> timestamp back to something over a day ago).
> Even though later segments have timestamps in the messages they cannot be
> expired until all the earlier segments are deleted so they are stuck
> waiting for 24 hours as well.
>
> The latest distribution of Kafka is 0.10.2.1 so if you can, you should
> also probably upgrade to a newer version but that is a separate discussion.
>
> -hans
>
>
>
>
> On May 25, 2017, at 11:50 AM, Milind Vaidya <kava...@gmail.com> wrote:
>
> In  short it should work regardless as per "During the migration phase, if
> the first message in a segment does not have a timestamp, the log rolling
> will still be based on the (current time - create time of the segment)."
>
> But that is not happening This is also for 3 out of 6 brokers.
> The 3 good ones deleted the data properly but these 3 do not show the same
> behaviour.
>
> I came across this JIRA : https://issues.apache.org/jira/browse/KAFKA-3802
>
> It says it is fixed in next version 0.10.0.1
> <https://issues.apache.org/jira/browse/KAFKA/fixforversion/12334962>. I
>
> even tried that. On QA hosts it retains TS for .log files across restart.
> But when tried the new version on one of the prod host, same old story.
>
> So internal or File system ts, it should get deleted when expired. What
> could be other reason and way out ot  this ?
>
> On Thu, May 25, 2017 at 10:43 AM, Hans Jespersen <h...@confluent.io>
> wrote:
>
> I quoted the wrong paragraph in my earlier response. The same KIP has a
> section on log retention as well.
>
> "Enforce time based log retention
>
> To enforce time based log retention, the broker will check from the oldest
> segment forward to the latest segment. For each segment, the broker checks
> the last time index entry of a log segment. The timestamp will be the
> latest timestamp of the messages in the log segment. So if that timestamp
> expires, the broker will delete the log segment. The broker will stop at
> the first segment which is not expired. i.e. the broker will not expire a
> segment even if it is expired, unless all the older segment has been
> expired."
>
> If none of the messages in a segment has a timestamp, last modified time
> will be used.
>
> -hans
>
> /**
> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> * h...@confluent.io (650)924-2670 <(650)%20924-2670>
> */
>
> On Thu, May 25, 2017 at 9:53 AM, Hans Jespersen <h...@confluent.io> wrote:
>
> 0.10.x format messages have timestamps within them so retention and
> expiring of messages isn't entirely based on the filesystem timestamp of
> the log segments anymore.
>
> From KIP-33 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 33+-+Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-
>

Re: 0.10.0.0 cluster : segments getting latest ts

2017-05-25 Thread Milind Vaidya
In  short it should work regardless as per "During the migration phase, if
the first message in a segment does not have a timestamp, the log rolling
will still be based on the (current time - create time of the segment)."

But that is not happening This is also for 3 out of 6 brokers.
The 3 good ones deleted the data properly but these 3 do not show the same
behaviour.

I came across this JIRA : https://issues.apache.org/jira/browse/KAFKA-3802

It says it is fixed in next version 0.10.0.1
<https://issues.apache.org/jira/browse/KAFKA/fixforversion/12334962>. I
even tried that. On QA hosts it retains TS for .log files across restart.
But when tried the new version on one of the prod host, same old story.

So internal or File system ts, it should get deleted when expired. What
could be other reason and way out ot  this ?

On Thu, May 25, 2017 at 10:43 AM, Hans Jespersen <h...@confluent.io> wrote:

> I quoted the wrong paragraph in my earlier response. The same KIP has a
> section on log retention as well.
>
> "Enforce time based log retention
>
> To enforce time based log retention, the broker will check from the oldest
> segment forward to the latest segment. For each segment, the broker checks
> the last time index entry of a log segment. The timestamp will be the
> latest timestamp of the messages in the log segment. So if that timestamp
> expires, the broker will delete the log segment. The broker will stop at
> the first segment which is not expired. i.e. the broker will not expire a
> segment even if it is expired, unless all the older segment has been
> expired."
>
> If none of the messages in a segment has a timestamp, last modified time
> will be used.
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * h...@confluent.io (650)924-2670
>  */
>
> On Thu, May 25, 2017 at 9:53 AM, Hans Jespersen <h...@confluent.io> wrote:
>
> > 0.10.x format messages have timestamps within them so retention and
> > expiring of messages isn't entirely based on the filesystem timestamp of
> > the log segments anymore.
> >
> > From KIP-33 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 33+-+Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-
> > Enforcetimebasedlogrolling
> >
> > "Enforce time based log rolling
> >
> > Currently time based log rolling is based on the creating time of the log
> > segment. With this KIP, the time based rolling would be changed to only
> > based on the message timestamp. More specifically, if the first message
> in
> > the log segment has a timestamp, A new log segment will be rolled out if
> > timestamp in the message about to be appended is greater than the
> timestamp
> > of the first message in the segment + log.roll.ms. When
> > message.timestamp.type=CreateTime, user should set
> > max.message.time.difference.ms appropriately together with log.roll.ms
> to
> > avoid frequent log segment roll out.
> >
> > During the migration phase, if the first message in a segment does not
> > have a timestamp, the log rolling will still be based on the (current
> time
> > - create time of the segment)."
> >
> > -hans
> >
> > /**
> >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >  * h...@confluent.io (650)924-2670 <(650)%20924-2670>
> >  */
> >
> > On Thu, May 25, 2017 at 12:44 AM, Milind Vaidya <kava...@gmail.com>
> wrote:
> >
> >> I have 6 broker cluster.
> >>
> >> I upgraded it from 0.8.1.1 to 0.10.0.0.
> >>
> >> Kafka Producer to cluster to consumer (apache storm) upgrade went smooth
> >> without any errors.
> >> Initially keeping protocol to 0.8 and after clients were upgraded it was
> >> promoted to 0.10.
> >>
> >> Out of 6 brokers, 3 are honouring  log.retention.hours. For other 3 when
> >> broker is restarted the time stamp for segment changes to current time.
> >> That leads to segments not getting deleted hence disk gets full.
> >>
> >> du -khc /disk1/kafka-broker/topic-1
> >>
> >> 71G /disk1/kafka-broker/topic-1
> >>
> >> 71G total
> >>
> >> Latest segment timestamp : May 25 07:34
> >>
> >> Oldest segment timestamp : May 25 07:16
> >>
> >>
> >> It is impossible that 71 GB data was collected in mere 15 mins of
> >> time. The log.retention.hours=24
> >> and this is not new broker so oldest data should be around 24 hrs old.
> >>
> >> As mentioned above only 3 out of 6 are showing same behaviour.  Why is
> >> this
> >> happening ?
> >>
> >
> >
>


Disks full after upgrading kafka version : 0.8.1.1 to 0.10.0.0

2017-05-24 Thread Milind Vaidya
In 24 hours the brokers started getting killed due to disk full.

The retention period is 48 hrs and with 0.8 disks used to fill ~65%

What is going wrong here ?

This is production system. I am reducing the retention for the time being
to 24 hrs.


Re: Question regarding buffer.memory, max.request.size and send.buffer.bytes

2017-05-23 Thread Milind Vaidya
I am looking for Producer tuning as mentioned in the mail, all the
properties are related to producer config.

This is where the property is mentioned :
https://kafka.apache.org/0100/documentation.html#producerconfigs

Consumer in this case if KafkaSpout from Apache-Storm.





On Tue, May 23, 2017 at 3:28 PM, Mohammed Manna <manme...@gmail.com> wrote:

>  This could be for various reasons:
>
> 1) Your consumer.property settings - if you have not been acknowledging
> automatically, you need to provide a sufficient polling time and commit in
> sync/async.
> 2) You are not consuming the messages how you think.
>
> I don't know how you got this buffer.memory property. Doesn't sound right,
> could you kindly check this again? Also, could you please provide a snippet
> of your Consumer and how you are reading from the stream?
>
> By default, the buffer is about 10% of the message.max.bytes. Perhaps you
> are looking for a Producer tuning by using the following:
>
> batch.size
> message.max.bytes
> send.buffer.bytes
> Cloudtera and Confluent.io have some nice articles on Kafka. Have a read
> through this
> https://www.cloudera.com/documentation/kafka/latest/
> topics/kafka_performance.html
>
>
>
> On 23 May 2017 at 20:09, Milind Vaidya <kava...@gmail.com> wrote:
>
> > I have set the producer properties as follows (0.10.0.0)
> >
> > *"linger.ms <http://linger.ms>"** : **"500"** ,*
> >
> >  *"batch.size"** : **"1000"**,*
> >
> > *"buffer.memory"** :**"**1**"**,*
> >
> >  *"send.buffer.bytes"** : **"512000"*
> >
> > *and default *
> >
> > * max.request.size = *1048576
> >
> >
> >  If records are sent faster than they can be delivered, they will be
> > buffered. Now with buffer.memory having *1 *bytes value, if a record
> > has
> >  more size than this what will happen ? say 11629 bytes in size. What is
> > the minimum value of buffer.memory in terms of other params ? Should it
> be
> > atleast equal to *send.buffer.bytes or **max.request.size or* better left
> > to default which is 33554432 ?
> >
> > I am trying to debug some events not reaching consumer, so wondering if
> > this could be the reason.
> >
>


Question regarding buffer.memory, max.request.size and send.buffer.bytes

2017-05-23 Thread Milind Vaidya
I have set the producer properties as follows (0.10.0.0)

*"linger.ms "** : **"500"** ,*

 *"batch.size"** : **"1000"**,*

*"buffer.memory"** :**"**1**"**,*

 *"send.buffer.bytes"** : **"512000"*

*and default *

* max.request.size = *1048576


 If records are sent faster than they can be delivered, they will be
buffered. Now with buffer.memory having *1 *bytes value, if a record has
 more size than this what will happen ? say 11629 bytes in size. What is
the minimum value of buffer.memory in terms of other params ? Should it be
atleast equal to *send.buffer.bytes or **max.request.size or* better left
to default which is 33554432 ?

I am trying to debug some events not reaching consumer, so wondering if
this could be the reason.


Advantages of 0.10.0 protocol over 0.8.0

2017-05-14 Thread Milind Vaidya
Hi

We are using 0.8.1.1 for producer, broker(cluster) as well as for storm
integration.

We are planning to upgrade it to 0.10.0 the main reason being producer API
supporting flush().

That said, we have test it in QA and look like as long as protocol is not
bumped with newer dependencies, roll back is possible and it can go back to
0.8.1.1.

Just to confirm, can some expert second that ?

On the other hand the documentation mentions

*"Note:* Bumping the protocol version and restarting can be done any time
after the brokers were upgraded. It does not have to be immediately after."

So is it a good idea to change dependancies to 0.10.0 and keep the protocol
0.8.0 ?

What are advantages of using 0.10.0 protocol ?


Setting API version while upgrading

2017-05-04 Thread Milind Vaidya
>
>
> The documentation says "Upgrading from 0.8.x or 0.9.x to 0.10.0.0"
>
> I am upgrading from kafka_2.9.2-0.8.1.1 so which one is correct
>
> A. 0.8.1.1
>
> *inter.broker.protocol.version**=**0.8.1.1*
>
> *log.message.format.version**=**0.8.1.1*
>
> B. 0.8.1
>
> *inter.broker.protocol.version**=**0.8.1*
>
> *log.message.format.version**=**0.8.1*
>


Is rollback supported while upgrade ?

2017-05-04 Thread Milind Vaidya
Upgrading from kafka_2.9.2-0.8.1.1 to kafka_2.11-0.10.0.0


The new version kafka will look at the same location for log files as older
one is what I am assuming.

As per documentation following properties will be set in the new broker

inter.broker.protocol.version=0.8.1.1
log.message.format.version=0.8.1.1

In case if something goes wrong, is it possible to stop new broker and
start old ones again ?

Any potential risks involved in this ? or it is not supported at all ?

Any other suggestions to achieve this by somebody who has already done it ?


Upgrading from kafka_2.9.2-0.8.1.1 to kafka_2.11-0.10.0.0

2017-05-02 Thread Milind Vaidya
Hi

We are required to upgrade this in out production system. We have observed
some data loss on producer side and want to try out new producer with
flush() api.

The procedure look like following as per the documents

1. Upgrade the cluster (brokers) with rolling upgrade.

2. Upgrade clients.


I have following questions regarding this procedure

1. While upgrading clients, is there any preferred order as in should
producers or consumers be updated first ?

2. What are the things that can go wrong ? Is there any recommended
rollback plan ? OR with
settings

inter.broker.protocol.version=0.8.1.1
log.message.format.version=0.8.1.1

 things will be taken care of ?

3. The consumer in this set up is Kakfa Spout from Apache Storm, any
recommendations as far as the upgrade procedure is concerned on that front ?

4. Will scala version conflict ? We are using everything java.


Java stdin producer loosing logs

2017-04-14 Thread Milind Vaidya
Hi

Background :

I have following set up

Apache server >> Apache Kafka Producer >> Apache Kafka Cluster >> Apache
Storm

As a normal scenario, front end boxes run the apache server and populate
the  log files. The requirement is to read every log and send it to kafka
cluster.

The java producer reads the logs from stdin and transfer to cluster.

Zero loss criteria definition : contents of error log files should match
the data received by Kafka cluster per hour eventually per day.

The error_log files get rotated per hour.

There are couple of ways already tried to connect log files and the producer

1. Custom startup script to start, stop and check status of the server :
  tail -n0 -F /var/log/httpd/error_log /var/log/httpd/ssl_error_log |
java consumer
2. Hooking up directly to  apache using httpd.conf setting  :
  ErrorLog "| /usr/bin/tee -a /var/log/httpd/error_log |  java consumer"


In case 1 loss of logs was observed but that reduced significantly in case
2, where apache restarts the process the data is piped to if it crashes and
restarts it along with server restart as well. Now the loss is seen across
the restart of apache server.

Questions :

1. what is appropriate way to interface apache httpd and kafka ?
2. Is there way to gracefully shut down the kafka producer so that the
pending buffers are flushed before the process dies ?


Version compatibility and flush() in Kafka Producer

2017-04-14 Thread Milind Vaidya
Is Kafka Producer 0.9.0 compatible with 0.8.* brokers ?

 I could not conclude to tried it out myself.

I tried using that setup, which works, in the sense messages to come
through on consumer side.

But with new producer I was trying to user flush() call to force sending of
messages from the producer. This call is blocking forever. Any suggestions
to get rid of that ? or this is due to incompatibility ?

Thanks


Failure scenarios for a java kafka producer reading from stdin

2017-04-13 Thread Milind Vaidya
Hi

Background :

I have following set up

Apache server >> Apache Kafka Producer >> Apache Kafka Cluster >> Apache
Storm

As a normal scenario, front end boxes run the apache server and populate
the  log files. The requirement is to read every log and send it to kafka
cluster.

The java producer reads the logs from stdin and transfer to cluster.

Zero loss criteria definition : contents of error log files should match
the data received by Kafka cluster per hour eventually per day.

The error_log files get rotated per hour.

There are couple of ways already tried to connect log files and the producer

1. Custom startup script to start, stop and check status of the server :
  tail -n0 -F /var/log/httpd/error_log /var/log/httpd/ssl_error_log |
java consumer
2. Hooking up directly to  apache using httpd.conf setting  :
  ErrorLog "| /usr/bin/tee -a /var/log/httpd/error_log |  java consumer"


In case 1 loss of logs was observed but that reduced significantly in case
2, where apache restarts the process the data is piped to if it crashes and
restarts it along with server restart as well. Now the loss is seen across
the restart of apache server.

Questions :

1. what is appropriate way to interface apache httpd and kafka ?
2. Is there way to gracefully shut down the kafka producer so that the
pending buffers are flushed before the process dies ?
3. Are there any known failure scenarios for kafka producer which are
documented ?


Java stdin producer loosing logs

2017-04-12 Thread Milind Vaidya
Hi

Background :

I have following set up

Apache server >> Apache Kafka Producer >> Apache Kafka Cluster >> Apache
Storm

As a normal scenario, front end boxes run the apache server and populate
the  log files. The requirement is to read every log and send it to kafka
cluster.

The java producer reads the logs from stdin and transfer to cluster.

Zero loss criteria definition : contents of error log files should match
the data received by Kafka cluster per hour eventually per day.

The error_log files get rotated per hour.

There are couple of ways already tried to connect log files and the producer

1. Custom startup script to start, stop and check status of the server :
  tail -n0 -F /var/log/httpd/error_log /var/log/httpd/ssl_error_log |
java consumer
2. Hooking up directly to  apache using httpd.conf setting  :
  ErrorLog "| /usr/bin/tee -a /var/log/httpd/error_log |  java consumer"


In case 1 loss of logs was observed but that reduced significantly in case
2, where apache restarts the process the data is piped to if it crashes and
restarts it along with server restart as well. Now the loss is seen across
the restart of apache server.

Questions :

1. what is appropriate way to interface apache httpd and kafka ?
2. Is there way to gracefully shut down the kafka producer so that the
pending buffers are flushed before the process dies ?


Re: Fast way search data in kafka

2017-03-23 Thread Milind Vaidya
Yup. I hacked a small script in bash to do it for all files and per file as
weil.

Thanks.

On Thu, Mar 23, 2017 at 2:31 PM, Marko Bonaći <marko.bon...@sematext.com>
wrote:

> You can use something like this to get a comma-separated list of all filed
> in a folder:
>
> ls -l | awk '{print $9}' ORS=','
>
> Marko Bonaći
> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> Solr & Elasticsearch Support
> Sematext <http://sematext.com/> | Contact
> <http://sematext.com/about/contact.html>
>
> On Thu, Mar 23, 2017 at 9:28 PM, Milind Vaidya <kava...@gmail.com> wrote:
>
> > That looks like a faster option.
> >
> > Now the thing is --file requires list of comma separated files. Is there
> > any way to look at all files in a directory ?
> >
> >
> > I tried *log but did not work or I will have to script something to do
> that
> > ?
> >
> > On Sat, Mar 4, 2017 at 9:04 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hi Milind,
> > >
> > > You can try the DumpSegmentTool to read the logs at broker machines
> > > directly as well:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > System+Tools#SystemTools-DumpLogSegment
> > >
> > > Guozhang
> > >
> > > On Sat, Mar 4, 2017 at 9:48 AM, Anish Mashankar <
> > an...@systeminsights.com>
> > > wrote:
> > >
> > > > Try Presto https://prestodb.io. It may solve your problem.
> > > >
> > > > On Sat, 4 Mar 2017, 03:18 Milind Vaidya, <kava...@gmail.com> wrote:
> > > >
> > > > > I have 6 broker kafka setup.
> > > > >
> > > > > I have retention period of  48 hrs.
> > > > >
> > > > > To debug if certain data has reached kafka or not I am using
> command
> > > line
> > > > > consumer to then piping to grep. But it will take huge amount of
> time
> > > and
> > > > > may not succeed as well.
> > > > >
> > > > > Is there an other way to search something in kafka without using
> > > > consumer?
> > > > >
> > > > --
> > > >
> > > > Regards,
> > > > Anish Samir Mashankar
> > > > R Engineer
> > > > System Insights
> > > > +91-9789870733
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: Fast way search data in kafka

2017-03-23 Thread Milind Vaidya
That looks like a faster option.

Now the thing is --file requires list of comma separated files. Is there
any way to look at all files in a directory ?


I tried *log but did not work or I will have to script something to do that
?

On Sat, Mar 4, 2017 at 9:04 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Milind,
>
> You can try the DumpSegmentTool to read the logs at broker machines
> directly as well:
>
> https://cwiki.apache.org/confluence/display/KAFKA/
> System+Tools#SystemTools-DumpLogSegment
>
> Guozhang
>
> On Sat, Mar 4, 2017 at 9:48 AM, Anish Mashankar <an...@systeminsights.com>
> wrote:
>
> > Try Presto https://prestodb.io. It may solve your problem.
> >
> > On Sat, 4 Mar 2017, 03:18 Milind Vaidya, <kava...@gmail.com> wrote:
> >
> > > I have 6 broker kafka setup.
> > >
> > > I have retention period of  48 hrs.
> > >
> > > To debug if certain data has reached kafka or not I am using command
> line
> > > consumer to then piping to grep. But it will take huge amount of time
> and
> > > may not succeed as well.
> > >
> > > Is there an other way to search something in kafka without using
> > consumer?
> > >
> > --
> >
> > Regards,
> > Anish Samir Mashankar
> > R Engineer
> > System Insights
> > +91-9789870733
> >
>
>
>
> --
> -- Guozhang
>


Re: Consumer Group, relabancing and partition uniqueness

2016-06-29 Thread Milind Vaidya
Florin,

Thanks, I got your point.

The documentation as well as diagram showing the mechanism of consumer
group indicates that,the partitions are shared disjointly by consumers in a
group.
You also stated above "Each of your consumer will receive message for its
allocated partition for that they subscribed."

e.g. P1..P5 are partitions and we have C1C5 consumers belonging to
same group. So is it correct to assume that C2 will consume for P4(say) and
not from any other partition. Similarly Ck will consume from Pm where 1 >=
k, m <= 5. If no rebalancing happens, as in none of the consumers dies, how
long will this combination sustain ? or random rebalance may happen after a
while leading to C2 consuming from P3 as against P4 from which it was
originally consuming.

I have my logs for the consumer, which indicate that partitions associated
with a consumer change periodically.
Is there any mechanism by which I can make sure a consumer consumes from a
particular partition for sufficient amount of time which is configurable
provided none of the consumers goes down triggering rebalance.




On Wed, Jun 29, 2016 at 3:02 PM, Spico Florin <spicoflo...@gmail.com> wrote:

> Hi!
>   By default kafka uses internally a round robin partitioner that will send
> the messages to the right partition based on the message key. Each of your
> consumer will receive message for its allocated partition for that they
> subscribed.
>   In case of rebalance, if you add more consumers than the partitions then
> some of the consumers will not get any data. If one of the consumers dies,
> then the remained consumers will get messages from the partitions depending
> on their client id. Kafka internally uses the client id (lexicogarphic
> order) to allocate the partitions.
>
> I hope that this give you an overview of what happens and somehow answer to
> your questions.
>
> Regards,
> florin
>
> On Thu, Jun 30, 2016 at 12:36 AM, Milind Vaidya <kava...@gmail.com> wrote:
>
> > Hi
> >
> > Background :
> >
> > I am using a java based multithreaded kafka consumer.
> >
> > Two instances of  this consumer are running on 2 different machines i.e.
> > one consumer process per box, and  belong to same consumer group.
> >
> > Internally each process has 2 threads each.
> >
> > Both the consumer processes consume from same topic "rawlogs" which has 4
> > partitions.
> >
> > Problem :
> >
> > As per the documentation of consumer group "each message published to a
> > topic is delivered to one consumer instance within each subscribing
> > consumer
> > group" . But is there any mechanism by which a each consumer consumes
> from
> > disjoint set of partitions too ? or each message from whichever partition
> > it is, will be given randomly to one of the consumers ?
> >
> > In case of rebalance, the partitions may get shuffled among consumers but
> > then again they should get divided into 2 disjoint sets one for each
> > consumer.
> >
>


Consumer Group, relabancing and partition uniqueness

2016-06-29 Thread Milind Vaidya
Hi

Background :

I am using a java based multithreaded kafka consumer.

Two instances of  this consumer are running on 2 different machines i.e.
one consumer process per box, and  belong to same consumer group.

Internally each process has 2 threads each.

Both the consumer processes consume from same topic "rawlogs" which has 4
partitions.

Problem :

As per the documentation of consumer group "each message published to a
topic is delivered to one consumer instance within each subscribing consumer
group" . But is there any mechanism by which a each consumer consumes from
disjoint set of partitions too ? or each message from whichever partition
it is, will be given randomly to one of the consumers ?

In case of rebalance, the partitions may get shuffled among consumers but
then again they should get divided into 2 disjoint sets one for each
consumer.