Re: Is kafka suitable for our architecture?

2014-10-09 Thread Albert Vila
Hi

We process data in real time, and we are taking a look at Storm and Spark
streaming too, however our actions are atomic, done at a document level so
I don't know if it fits on something like Storm/Spark.

Regarding what you Christian said, isn't Kafka used for scenarios like the
one I described? I mean, we do have work queues right now with Gearman, but
with a bunch of workers on each step. I thought we could change that to a
producer and a bunch of consumers (where the message should only reach one
and exact one consumer).

And what I said about the data locally, it was only an optimization we did
some time ago because we was moving more data back then. Maybe now its not
necessary and we could move messages around the system using Kafka, so it
will allow us to simplify the architecture a little bit. I've seen people
saying they move Tb of data every day using Kafka.

Just to be clear on the size of each document/message, we are talking about
tweets, blog posts, ... (on 90% of cases the size is less than 50Kb)

Regards

On 9 October 2014 20:02, Christian Csar  wrote:

> Apart from your data locality problem it sounds like what you want is a
> workqueue. Kafka's consumer structure doesn't lend itself too well to
> that use case as a single partition of a topic should only have one
> consumer instance per logical subscriber of the topic, and that consumer
> would not be able to mark jobs as completed except in a strict order
> (while maintaining a processed successfully at least once guarantee).
> This is not to say it cannot be done, but I believe your workqueue would
> end up working a bit strangely if built with Kafka.
>
> Christian
>
> On 10/09/2014 06:13 AM, William Briggs wrote:
> > Manually managing data locality will become difficult to scale. Kafka is
> > one potential tool you can use to help scale, but by itself, it will not
> > solve your problem. If you need the data in near-real time, you could
> use a
> > technology like Spark or Storm to stream data from Kafka and perform your
> > processing. If you can batch the data, you might be better off pulling it
> > into a distributed filesystem like HDFS, and using MapReduce, Spark or
> > another scalable processing framework to handle your transformations.
> Once
> > you've paid the initial price for moving the document into HDFS, your
> > network traffic should be fairly manageable; most clusters built for this
> > purpose will schedule work to be run local to the data, and typically
> have
> > separate, high-speed network interfaces and a dedicated switch in order
> to
> > optimize intra-cluster communications when moving data is unavoidable.
> >
> > -Will
> >
> > On Thu, Oct 9, 2014 at 7:57 AM, Albert Vila 
> wrote:
> >
> >> Hi
> >>
> >> I just came across Kafta when I was trying to find solutions to scale
> our
> >> current architecture.
> >>
> >> We are currently downloading and processing 6M documents per day from
> >> online and social media. We have a different workflow for each type of
> >> document, but some of the steps are keyword extraction, language
> detection,
> >> clustering, classification, indexation,  We are using Gearman to
> >> dispatch the job to workers and we have some queues on a database.
> >>
> >> I'm wondering if we could integrate Kafka on the current workflow and if
> >> it's feasible. One of our main discussions are if we have to go to a
> fully
> >> distributed architecture or to a semi-distributed one. I mean,
> distribute
> >> everything or process some steps on the same machine (crawling, keyword
> >> extraction, language detection, indexation). We don't know which one
> scales
> >> more, each one has pros and cont.
> >>
> >> Now we have a semi-distributed one as we had network problems taking
> into
> >> account the amount of data we were moving around. So now, all documents
> >> crawled on server X, later on are dispatched through Gearman to the same
> >> server. What we dispatch on Gearman is only the document id, and the
> >> document data remains on the crawling server on a Memcached, so the
> network
> >> traffic is keep at minimum.
> >>
> >> What do you think?
> >> It's feasible to remove all database queues and Gearman and move to
> Kafka?
> >> As Kafka is mainly based on messages I think we should move the messages
> >> around, should we take into account the network? We may face the same
> >> problems?
> >> If so, there is a way to isolate some steps to be processed on the same
> >> machine, to avoid network traffic?
> >>
> >> Any help or comment will be appreciate. And If someone has had a similar
> >> problem and has knowledge about the architecture approach will be more
> than
> >> welcomed.
> >>
> >> Thanks
> >>
> >
>
>


-- 
*Albert Vila*
R&D Manager & Software Developer


Tél. : +34 972 982 968

*www.augure.com*  | *Blog.* Reputation in action
 | *Twitter. *@AugureSpain

*Skype *: albert.vila | *Access map.* Au

Re: Auto Purging Consumer Group Configuration [Especially Kafka Console Group]

2014-10-09 Thread Bhavesh Mistry
We just want to clean-up old configuration from ZK.   We can check from the
offset API so we can delete based on offset ..  is that right ? there is no
date last associated with Consumer Group ?  Is that right in ZK
configuration ?

Thanks,

Bhavesh

On Thu, Oct 9, 2014 at 9:23 PM, Gwen Shapira  wrote:

> The problem with Kafka is that we never know when a consumer is
> "truly" inactive.
>
> But - if you decide to define inactive as consumer who's last offset
> is lower than anything available on the log (or perhaps lagging by
> over X messages?), its fairly easy to write a script to detect and
> clean them directly on ZK.
>
> BTW. Why do you need to clean them? What issue do you see with just
> letting them hang around?
>
> Gwen
>
> On Thu, Oct 9, 2014 at 9:18 PM, Bhavesh Mistry
>  wrote:
> > Hi Kafka,
> >
> > We have lots of lingering console consumer group people have created for
> > testing or debugging purpose for one time use via
> > bin/kafka-console-consumer.sh.  Is there auto purging that clean script
> > that Kafka provide ?  Is three any API to find out inactive Consumer
> group
> > and delete consumer group configuration.
> >
> > Thanks,
> >
> > Bhavesh
>


Re: Clarification about Custom Encoder/Decoder for serialization

2014-10-09 Thread Abraham Jacob
Thanks Jun. Appreciate your quick response.

Once the encoder is instantiated, is it possible to get a reference to it?
I tried to see if I could get it trough anything that the Producer exposes.
Apparently, not...

-abe

On Thu, Oct 9, 2014 at 9:28 PM, Jun Rao  wrote:

> The encoder is instantiated once when the producer is constructed.
>
> Thanks,
>
> Jun
>
> On Thu, Oct 9, 2014 at 6:45 PM, Abraham Jacob 
> wrote:
>
> > Hi All,
> >
> > I wanted to get some clarification on Kafka's Encoder/Decoder usage.
> >
> > Lets say I want to implement a custom Encoder.
> >
> > public class CustomMessageSerializer implements Encoder
> {
> >
> > @Override
> > public byte[] toBytes(String arg0) {
> > // serialize the MyCustomObject
> > return serializedCustomObject ;
> > }
> > }
> >
> >
> >
> > now in my producer properties I can set serializer.class to
> > CustomMessageSerializer
> >
> >
> > Now, i presume that when Kafka is ready to send the message, it will run
> > the message through the CustomMessageSerializer.
> >
> > My question is - for each message (list of messages) that is to be sent,
> > does a new instance of CustomMessageSerializer instantiated, or is it
> that,
> > it instantiates the CustomMessageSerializer once, and calls  toBytes(...)
> > for every message?
> >
> >
> > Also, as a side note does Kafka support Kryo serializer?
> >
> > Regards,
> > -Jacob
> >
>



-- 
~


Re: Clarification about Custom Encoder/Decoder for serialization

2014-10-09 Thread Jun Rao
The encoder is instantiated once when the producer is constructed.

Thanks,

Jun

On Thu, Oct 9, 2014 at 6:45 PM, Abraham Jacob  wrote:

> Hi All,
>
> I wanted to get some clarification on Kafka's Encoder/Decoder usage.
>
> Lets say I want to implement a custom Encoder.
>
> public class CustomMessageSerializer implements Encoder  {
>
> @Override
> public byte[] toBytes(String arg0) {
> // serialize the MyCustomObject
> return serializedCustomObject ;
> }
> }
>
>
>
> now in my producer properties I can set serializer.class to
> CustomMessageSerializer
>
>
> Now, i presume that when Kafka is ready to send the message, it will run
> the message through the CustomMessageSerializer.
>
> My question is - for each message (list of messages) that is to be sent,
> does a new instance of CustomMessageSerializer instantiated, or is it that,
> it instantiates the CustomMessageSerializer once, and calls  toBytes(...)
> for every message?
>
>
> Also, as a side note does Kafka support Kryo serializer?
>
> Regards,
> -Jacob
>


Re: Auto Purging Consumer Group Configuration [Especially Kafka Console Group]

2014-10-09 Thread Gwen Shapira
The problem with Kafka is that we never know when a consumer is
"truly" inactive.

But - if you decide to define inactive as consumer who's last offset
is lower than anything available on the log (or perhaps lagging by
over X messages?), its fairly easy to write a script to detect and
clean them directly on ZK.

BTW. Why do you need to clean them? What issue do you see with just
letting them hang around?

Gwen

On Thu, Oct 9, 2014 at 9:18 PM, Bhavesh Mistry
 wrote:
> Hi Kafka,
>
> We have lots of lingering console consumer group people have created for
> testing or debugging purpose for one time use via
> bin/kafka-console-consumer.sh.  Is there auto purging that clean script
> that Kafka provide ?  Is three any API to find out inactive Consumer group
> and delete consumer group configuration.
>
> Thanks,
>
> Bhavesh


Auto Purging Consumer Group Configuration [Especially Kafka Console Group]

2014-10-09 Thread Bhavesh Mistry
Hi Kafka,

We have lots of lingering console consumer group people have created for
testing or debugging purpose for one time use via
bin/kafka-console-consumer.sh.  Is there auto purging that clean script
that Kafka provide ?  Is three any API to find out inactive Consumer group
and delete consumer group configuration.

Thanks,

Bhavesh


Clarification about Custom Encoder/Decoder for serialization

2014-10-09 Thread Abraham Jacob
Hi All,

I wanted to get some clarification on Kafka's Encoder/Decoder usage.

Lets say I want to implement a custom Encoder.

public class CustomMessageSerializer implements Encoder  {

@Override
public byte[] toBytes(String arg0) {
// serialize the MyCustomObject
return serializedCustomObject ;
}
}



now in my producer properties I can set serializer.class to
CustomMessageSerializer


Now, i presume that when Kafka is ready to send the message, it will run
the message through the CustomMessageSerializer.

My question is - for each message (list of messages) that is to be sent,
does a new instance of CustomMessageSerializer instantiated, or is it that,
it instantiates the CustomMessageSerializer once, and calls  toBytes(...)
for every message?


Also, as a side note does Kafka support Kryo serializer?

Regards,
-Jacob


including KAFKA-1555 in 0.8.2?

2014-10-09 Thread Jun Rao
Hi, Everyone,

I just committed KAFKA-1555 (min.isr support) to trunk. I felt that it's
probably useful to include it in the 0.8.2 release. Any objections?

Thanks,

Jun


Re: Load Balancing Consumers or Multiple consumers reading off same topic

2014-10-09 Thread Neha Narkhede
With SimpleConsumer, you will have to handle leader discovery as well as
zookeeper based rebalancing. You can see an example here -
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

On Wed, Oct 8, 2014 at 11:45 AM, Sharninder  wrote:

> Thanks Gwen. This really helped.
>
> Yes, Kafka is the best thing ever :)
>
> Now how would this be done with the Simple consumer? I'm guessing I'll have
> to maintain my own state in Zookeeper or something of that sort?
>
>
> On Thu, Oct 9, 2014 at 12:01 AM, Gwen Shapira 
> wrote:
>
> > Here's an example (from ConsumerOffsetChecker tool) of 1 topic (t1)
> > and 1 consumer group (flume), each of the 3 topic partitions is being
> > read by a different machine running the flume consumer:
> > Group   Topic  Pid Offset
> > logSize Lag Owner
> > flume   t1 0   50172068
> > 100210042   50037974
> > flume_kafkacdh-1.ent.cloudera.com-1412722833783-3d6d80db-0
> > flume   t1 1   49914701
> > 499147010
> > flume_kafkacdh-2.ent.cloudera.com-1412722838536-a6a4915d-0
> > flume   t1 2   54218841
> > 8273338028514539
> > flume_kafkacdh-3.ent.cloudera.com-1412722832793-b23eaa63-0
> >
> > If flume_kafkacdh-1 crashed, another broker will pick up the partition:
> > Group   Topic  Pid Offset
> > logSize Lag Owner
> > flume   t1 0   59669715
> > 100210042   40540327
> > flume_kafkacdh-2.ent.cloudera.com-1412792880818-b4aa6feb-0
> > flume   t1 1   49914701
> > 499147010
> > flume_kafkacdh-2.ent.cloudera.com-1412792880818-b4aa6feb-0
> > flume   t1 2   65796205
> > 8273338016937175
> > flume_kafkacdh-3.ent.cloudera.com-1412792871089-cabd4934-0
> >
> > Then I can start flume_kafkacdh-4 and see things rebalance again:
> > flume   t1 0   60669715
> > 100210042   39540327
> > flume_kafkacdh-2.ent.cloudera.com-1412792880818-b4aa6feb-0
> > flume   t1 1   49914701
> > 499147010
> > flume_kafkacdh-3.ent.cloudera.com-1412792871089-cabd4934-0
> > flume   t1 2   66829740
> > 8273338015903640
> > flume_kafkacdh-4.ent.cloudera.com-1412793053882-9bfddff9-0
> >
> > Isn't Kafka the best thing ever? :)
> >
> > Gwen
> >
> > On Wed, Oct 8, 2014 at 11:23 AM, Gwen Shapira 
> > wrote:
> > > yep. exactly.
> > >
> > > On Wed, Oct 8, 2014 at 11:07 AM, Sharninder 
> > wrote:
> > >> Thanks Gwen.
> > >>
> > >> When you're saying that I can add consumers to the same group, does
> that
> > >> also hold true if those consumers are running on different machines?
> Or
> > in
> > >> different JVMs?
> > >>
> > >> --
> > >> Sharninder
> > >>
> > >>
> > >> On Wed, Oct 8, 2014 at 11:35 PM, Gwen Shapira 
> > wrote:
> > >>
> > >>> If you use the high level consumer implementation, and register all
> > >>> consumers as part of the same group - they will load-balance
> > >>> automatically.
> > >>>
> > >>> When you add a consumer to the group, if there are enough partitions
> > >>> in the topic, some of the partitions will be assigned to the new
> > >>> consumer.
> > >>> When a consumer crashes, once its node in ZK times out, other
> > >>> consumers will get its partitions.
> > >>>
> > >>> Gwen
> > >>>
> > >>> On Wed, Oct 8, 2014 at 10:39 AM, Sharninder 
> > wrote:
> > >>> > Hi,
> > >>> >
> > >>> > I'm not even sure if this is a valid use-case, but I really wanted
> > to run
> > >>> > it by you guys. How do I load balance my consumers? For example, if
> > my
> > >>> > consumer machine is under load, I'd like to spin up another VM with
> > >>> another
> > >>> > consumer process to keep reading messages off any topic. On similar
> > >>> lines,
> > >>> > how do you guys handle consumer failures? Suppose one consumer
> > process
> > >>> gets
> > >>> > an exception and crashes, is it possible for me to somehow make
> sure
> > that
> > >>> > there is another process that is still reading the queue for me?
> > >>> >
> > >>> > --
> > >>> > Sharninder
> > >>>
> >
>


Re: refactoring ZK so it is plugable, would this make sense?

2014-10-09 Thread S Ahmed
I want kafka features (w/o the redundancy) but don't want to have to run 3
zookeeper instances to save $$.

On Thu, Oct 9, 2014 at 2:59 PM, Jun Rao  wrote:

> This may not be easy since you have to implement things like watcher
> callbacks. What's your main concern with the ZK dependency?
>
> Thanks,
>
> Jun
>
> On Thu, Oct 9, 2014 at 8:20 AM, S Ahmed  wrote:
>
> > Hi,
> >
> > I was wondering if the zookeeper library (zkutils.scala etc) was designed
> > in a more modular way, would it make it possible to run a more "lean"
> > version of kafka?
> >
> > The idea is I want to run kafka but with a less emphasis on it being
> > durable with failover and more on it being a replacement for a standard
> > queue like kestrel.
> >
> > This way you could take advantage of how the other aspects of Kafka
> > (permanent log, etc etc.)
> >
> > I was just thinking if the zookeeper access was wrapped in something
> like:
> >
> > class DiscoverService
> >
> >def electLeader ..
> >def getFollower ...
> >
> > (I'm just making those methods up, but you get the point they are simply
> > the same calls zkutils etc. will be making to connect to zookeeper)
> >
> > Now the idea is, if you don't want to dedicate 3 servers to run
> zookeeper,
> > you could create your own implementation that e.g. returns data based on
> a
> > configuration file that is static and not a discover service like
> > zookeeper.
> >
> > Would wrapping the zookeper calls into a plugable/swapable service make
> > sense and allow you to still use Kakfa at a smaller scale or would this
> not
> > work for other reasons that I am overlooking?
> >
>


Re: create topic in multiple node kafka cluster

2014-10-09 Thread Sa Li
Hi,

I kinda doubt whether I make it as an ensemble, since it shows

root@DO-mq-dev:/etc/zookeeper/conf# zkServer.sh status
JMX enabled by default
Using config: /etc/zookeeper/conf/zoo.cfg
Mode: standalone

Mode is standalone instead of something else, here is my zoo.cfg, I did
follow the instruction to config it

# http://hadoop.apache.org/zookeeper/docs/current/zookeeperAdmin.html

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper
# Place the dataLogDir to a separate physical disc for better performance
# dataLogDir=/disk2/zookeeper

# the port at which the clients will connect
clientPort=2181

# specify all zookeeper servers
# The fist port is used by followers to connect to the leader
# The second one is used for leader election
DO-mq-dev.1=10.100.70.128:2888:3888
pof-kstorm-dev1.2=10.100.70.28:2888:3888
pof-kstorm-dev2.3=10.100.70.29:2888:3888


# To avoid seeks ZooKeeper allocates space in the transaction log file in
# blocks of preAllocSize kilobytes. The default block size is 64M. One
reason
# for changing the size of the blocks is to reduce the block size if
snapshots
# are taken more often. (Also, see snapCount).
#preAllocSize=65536

# Clients can submit requests faster than ZooKeeper can process them,
# especially if there are a lot of clients. To prevent ZooKeeper from
running
# out of memory due to queued requests, ZooKeeper will throttle clients so
that
# there is no more than globalOutstandingLimit outstanding requests in the
# system. The default limit is 1,000.ZooKeeper logs transactions to a
# transaction log. After snapCount transactions are written to a log file a
# snapshot is started and a new transaction log file is started. The default
# snapCount is 10,000.
#snapCount=1000

# If this option is defined, requests will be will logged to a trace file
named
# traceFile.year.month.day.
#traceFile=

# Leader accepts client connections. Default value is "yes". The leader
machine
# coordinates updates. For higher update throughput at thes slight expense
of
# read throughput the leader can be configured to not accept clients and
focus
# on coordination.
leaderServes=yes

# Enable regular purging of old data and transaction logs every 24 hours
autopurge.purgeInterval=24
autopurge.snapRetainCount=5


And myid in dataDir is

At   10.100.70.128  /var/lib/zookeeper  contains1
At   10.100.70.28   /var/lib/zookeeper  contains2
At   10.100.70.29   /var/lib/zookeeper  contains3

I did make myid as 1, 2, 3 corresponding 3 nodes before, but seeing
something make such myid, it might be more accurate.

Is there anything wrong or missing to not able to make it an ensemble?

thanks

Alec



On Thu, Oct 9, 2014 at 12:06 PM, Guozhang Wang  wrote:

> Sa,
>
> Usually you would not want to set up kafka brokers at the same machines
> with zk nodes, as that will add depending failures to the server cluster.
>
> Back to your original question, it seems your zk nodes do not form an
> ensemble, since otherwise their zk data should be the same.
>
> Guozhang
>
> On Thu, Oct 9, 2014 at 11:37 AM, Sa Li  wrote:
>
> > Hi, All
> >
> > I setup a 3-node kafka cluster on top of 3-node zk ensemble. Now I
> launch 1
> > broker on each node,  the brokers will be randomly distributed to zk
> > ensemble, see
> >
> > DO-mq-dev.1
> > [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
> > [0, 1]
> > pof-kstorm-dev1.2
> > [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
> > []
> > pof-kstorm-dev2.3
> > [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
> > [2]
> >
> > which means zk1 hosts 2 brokers, zk3 hosts 1 brokers, that will raise a
> > problem, that I am unable to create a topic with replications, say 3, it
> > will throw such exceptions
> >
> > Error while executing topic command replication factor: 3 larger than
> > available brokers: 0
> >
> > Is there any ways that I can create a topic which can be replicated
> > throughout entire zk ensemble as I know we will have to introduce more
> than
> > 1 broker in single zk Server if we want to be able to create replicated
> > topics/
> >
> > thanks
> >
> > --
> >
> > Alec Li
> >
>
>
>
> --
> -- Guozhang
>



-- 

Alec Li


Re: create topic in multiple node kafka cluster

2014-10-09 Thread Guozhang Wang
Sa,

Usually you would not want to set up kafka brokers at the same machines
with zk nodes, as that will add depending failures to the server cluster.

Back to your original question, it seems your zk nodes do not form an
ensemble, since otherwise their zk data should be the same.

Guozhang

On Thu, Oct 9, 2014 at 11:37 AM, Sa Li  wrote:

> Hi, All
>
> I setup a 3-node kafka cluster on top of 3-node zk ensemble. Now I launch 1
> broker on each node,  the brokers will be randomly distributed to zk
> ensemble, see
>
> DO-mq-dev.1
> [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
> [0, 1]
> pof-kstorm-dev1.2
> [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
> []
> pof-kstorm-dev2.3
> [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
> [2]
>
> which means zk1 hosts 2 brokers, zk3 hosts 1 brokers, that will raise a
> problem, that I am unable to create a topic with replications, say 3, it
> will throw such exceptions
>
> Error while executing topic command replication factor: 3 larger than
> available brokers: 0
>
> Is there any ways that I can create a topic which can be replicated
> throughout entire zk ensemble as I know we will have to introduce more than
> 1 broker in single zk Server if we want to be able to create replicated
> topics/
>
> thanks
>
> --
>
> Alec Li
>



-- 
-- Guozhang


Re: refactoring ZK so it is plugable, would this make sense?

2014-10-09 Thread Jun Rao
This may not be easy since you have to implement things like watcher
callbacks. What's your main concern with the ZK dependency?

Thanks,

Jun

On Thu, Oct 9, 2014 at 8:20 AM, S Ahmed  wrote:

> Hi,
>
> I was wondering if the zookeeper library (zkutils.scala etc) was designed
> in a more modular way, would it make it possible to run a more "lean"
> version of kafka?
>
> The idea is I want to run kafka but with a less emphasis on it being
> durable with failover and more on it being a replacement for a standard
> queue like kestrel.
>
> This way you could take advantage of how the other aspects of Kafka
> (permanent log, etc etc.)
>
> I was just thinking if the zookeeper access was wrapped in something like:
>
> class DiscoverService
>
>def electLeader ..
>def getFollower ...
>
> (I'm just making those methods up, but you get the point they are simply
> the same calls zkutils etc. will be making to connect to zookeeper)
>
> Now the idea is, if you don't want to dedicate 3 servers to run zookeeper,
> you could create your own implementation that e.g. returns data based on a
> configuration file that is static and not a discover service like
> zookeeper.
>
> Would wrapping the zookeper calls into a plugable/swapable service make
> sense and allow you to still use Kakfa at a smaller scale or would this not
> work for other reasons that I am overlooking?
>


Re: create topic in multiple node kafka cluster

2014-10-09 Thread Joel Koshy
It looks like You set up three separate ZK clusters, not an ensemble.
You can take a look at
http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_zkMulitServerSetup
on how to set up an ensemble; and then register all three kafka
brokers on that single zk ensemble.

Joel

On Thu, Oct 09, 2014 at 11:37:58AM -0700, Sa Li wrote:
> Hi, All
> 
> I setup a 3-node kafka cluster on top of 3-node zk ensemble. Now I launch 1
> broker on each node,  the brokers will be randomly distributed to zk
> ensemble, see
> 
> DO-mq-dev.1
> [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
> [0, 1]
> pof-kstorm-dev1.2
> [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
> []
> pof-kstorm-dev2.3
> [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
> [2]
> 
> which means zk1 hosts 2 brokers, zk3 hosts 1 brokers, that will raise a
> problem, that I am unable to create a topic with replications, say 3, it
> will throw such exceptions
> 
> Error while executing topic command replication factor: 3 larger than
> available brokers: 0
> 
> Is there any ways that I can create a topic which can be replicated
> throughout entire zk ensemble as I know we will have to introduce more than
> 1 broker in single zk Server if we want to be able to create replicated
> topics/
> 
> thanks
> 
> -- 
> 
> Alec Li



create topic in multiple node kafka cluster

2014-10-09 Thread Sa Li
Hi, All

I setup a 3-node kafka cluster on top of 3-node zk ensemble. Now I launch 1
broker on each node,  the brokers will be randomly distributed to zk
ensemble, see

DO-mq-dev.1
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[0, 1]
pof-kstorm-dev1.2
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[]
pof-kstorm-dev2.3
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[2]

which means zk1 hosts 2 brokers, zk3 hosts 1 brokers, that will raise a
problem, that I am unable to create a topic with replications, say 3, it
will throw such exceptions

Error while executing topic command replication factor: 3 larger than
available brokers: 0

Is there any ways that I can create a topic which can be replicated
throughout entire zk ensemble as I know we will have to introduce more than
1 broker in single zk Server if we want to be able to create replicated
topics/

thanks

-- 

Alec Li


Re: MBeans, dashes, underscores, and KAFKA-1481

2014-10-09 Thread Neha Narkhede
I am going to vote for 1482 to be included in 0.8.2, if we have a patch
submitted in a week. I think we've had this JIRA opened for too long and we
held people back so it's only fair to release this.

On Wed, Oct 8, 2014 at 9:40 PM, Jun Rao  wrote:

> Otis,
>
> Just have the patch ready asap. We can make a call then.
>
> Thanks,
>
> Jun
>
> On Wed, Oct 8, 2014 at 6:13 AM, Otis Gospodnetic <
> otis.gospodne...@gmail.com
> > wrote:
>
> > Hi Jun,
> >
> > Would by the end of next week be acceptable for 0.8.2?
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Tue, Oct 7, 2014 at 4:04 PM, Jun Rao  wrote:
> >
> > > Otis,
> > >
> > > Yes, if you guys can help provide a patch in a few days, we can
> probably
> > > get it to the 0.8.2 release.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Oct 7, 2014 at 12:10 PM, Otis Gospodnetic <
> > > otis.gospodne...@gmail.com> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > I think your MBean renaming approach will work.  I see
> > > > https://issues.apache.org/jira/browse/KAFKA-1481 has Fix Version
> > 0.8.2,
> > > > but
> > > > is not marked as a Blocker.  We'd love to get the MBeans fixed so
> this
> > > > makes it in 0.8.2 release.  Do you know if this is on anyone's plate
> > (the
> > > > issue is currently Unassigned)?  If not, should we provide a new
> patch
> > > that
> > > > uses your approach?
> > > >
> > > > Thanks,
> > > > Otis
> > > > --
> > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> > > > Solr & Elasticsearch Support * http://sematext.com/
> > > >
> > > >
> > > > On Thu, Sep 18, 2014 at 4:49 PM, Jun Rao  wrote:
> > > >
> > > > > Otis,
> > > > >
> > > > > In kafka-1481, we will have to change the mbean names (at least the
> > > ones
> > > > > with clientid and topic) anyway. Using the name/value pair in the
> > mbean
> > > > > name allows us to do this in a cleaner way. Yes, "," is not allowed
> > in
> > > > > clientid or topic.
> > > > >
> > > > > Bhavesh,
> > > > >
> > > > > Yes, I was thinking of making changes in the new metrics package.
> > > > Something
> > > > > like allowing the sensor names to have name/value pairs. The jmx
> > names
> > > > will
> > > > > just follow accordingly. This is probably cleaner than doing the
> > > > escaping.
> > > > > Also, the metric names are more intuitive (otherwise, you have to
> > know
> > > > > which part is the clientid and which part is the topic).
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Sep 17, 2014 at 2:32 PM, Otis Gospodnetic <
> > > > > otis.gospodne...@gmail.com> wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > On Wed, Sep 17, 2014 at 12:35 PM, Jun Rao 
> > wrote:
> > > > > >
> > > > > > > Bhavesh,
> > > > > > >
> > > > > > > Yes, allowing dot in clientId and topic makes it a bit harder
> to
> > > > define
> > > > > > the
> > > > > > > JMX bean names. I see a couple of solutions here.
> > > > > > >
> > > > > > > 1. Disable dot in clientId and topic names. The issue is that
> dot
> > > may
> > > > > > > already be used in existing deployment.
> > > > > > >
> > > > > > > 2. We can represent the JMX bean name differently in the new
> > > > producer.
> > > > > > > Instead of
> > > > > > >   kafka.producer.myclientid:type=mytopic
> > > > > > > we could change it to
> > > > > > >   kafka.producer:clientId=myclientid,topic=mytopic
> > > > > > >
> > > > > > > I felt that option 2 is probably better since it doesn't affect
> > > > > existing
> > > > > > > users.
> > > > > > >
> > > > > >
> > > > > > If it doesn't affect existing users, great.
> > > > > >
> > > > > > If you are saying that each "piece" of MBean name could be
> > expressed
> > > as
> > > > > > name=value pair, with something like "," (forbidden in host
> names,
> > > > topic
> > > > > > names, client IDs, etc. I assume?) then yes, I think this would
> be
> > > > easier
> > > > > > to parse and it would be easier for people to understand what is
> > > what.
> > > > > >
> > > > > > Otis
> > > > > > --
> > > > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > Management
> > > > > > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > Otis,
> > > > > > >
> > > > > > > We probably can also use option 2 to address KAFKA-1481. For
> > > > > > topic/clientid
> > > > > > > specific metrics, we could explicitly specify the metric name
> so
> > > that
> > > > > it
> > > > > > > contains "topic=mytopic,clientid=myclientid". That seems to be
> a
> > > much
> > > > > > > cleaner way than having all parts included in a single string
> > > > separated
> > > > > > by
> > > > > > > '|'.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Sep 16, 2014 at 5:15 PM, Bhavesh Mistry <
> >

Re: Reassigning Partition Failing

2014-10-09 Thread Lung, Paul
Hi Joe,

I simply restarted the leader broker, and things seem to work again. Thank
you.

Best,
Paul Lung

On 10/2/14, 1:26 AM, "Joe Stein"  wrote:

>What version of zookeeper are you running?
>
>First check to see if there is a znode for the
>"/admin/reassign_partitions" in
>zookeeper.
>
>If so, you could try a graceful shutdown of the controller broker.
>
>Once the new controller leader elects on another broker look at zk the
>znode "/admin/reassign_partitions" for the reassignment operation should
>be
>gone.
>
>If not, you are going to have to remove it through the shell or executor
>or
>however you reliably and safely do that.
>
>Re-run the command again.
>
>I created a JIRA for this https://issues.apache.org/jira/browse/KAFKA-1665
>issue once we can reproduce it we should fix it.
>
>On Wed, Oct 1, 2014 at 6:43 PM, Lung, Paul  wrote:
>
>> Hi All,
>>
>> I had a 0.8.1.1 Kafka Broker go down, and I was trying to use the
>>reassign
>> partition script to move topics off that broker. When I describe the
>> topics, I see the following:
>>
>> Topic: mini__022active_120__33__mini Partition: 0 Leader: 2131118
>> Replicas: 2131118,2166601,2163421 Isr: 2131118,2166601
>>
>> This shows that the broker ³2163421² is down. So I create the following
>> file /tmp/move_topic.json:
>> {
>> "version": 1,
>> "partitions": [
>> {
>> "topic": "mini__022active_120__33__mini",
>> "partition": 0,
>> "replicas": [
>> 2131118, 2166601,  2156998
>> ]
>> }
>> ]
>> }
>>
>> And then do this:
>>
>> ./kafka-reassign-partitions.sh --execute --reassignment-json-file
>> /tmp/move_topic.json
>> Successfully started reassignment of partitions
>> 
>>{"version":1,"partitions":[{"topic":"mini__022active_120__33__mini","
>>partition":0,"replicas":[2131118,2166601,2156998]}]}
>>
>> However, when I try to verify this, I get the following error:
>> ./kafka-reassign-partitions.sh --verify --reassignment-json-file
>> /tmp/move_topic.json
>> Status of partition reassignment:
>> ERROR: Assigned replicas (2131118,2166601,2156998,2163421) don't match
>>the
>> list of replicas for reassignment (2131118,2166601,2156998) for
>>partition
>> [mini__022active_120__33__mini,0]
>> Reassignment of partition [mini__022active_120__33__mini,0] failed
>>
>> If I describe the topics, I now see there are 4 replicas. This has been
>> like this for many hours now, so it seems to have permanently moved to 4
>> replicas for some reason.
>> Topic:mini__022active_120__33__mini PartitionCount:1
>> ReplicationFactor:4 Configs:
>> Topic: mini__022active_120__33__mini Partition: 0 Leader: 2131118
>> Replicas: 2131118,2166601,2156998,2163421 Isr: 2131118,2166601
>>
>> If I re-execute and re-verify, I get the same error. So it seems to be
>> wedged.
>>
>> Can someone help?
>>
>> Paul Lung
>>
>>
>>



Re: Reassigning Partition Failing

2014-10-09 Thread Lung, Paul
Actually, reassigning the replica does work, even if the broker the
partition resides on is dead. My problem was that there were some unknown
issue with the leader. When I restarted the leader broker, it worked.

Paul

On 10/6/14, 11:41 AM, "Joe Stein"  wrote:

>Agreed, I think it is also a replacement scenario since we may want to
>bring in a new broker for the dead broker.  We should support explicit
>remove also. We could add to the ReassignPartitionsCommand for an explicit
>"make new broker just like the old broker".  This is important very much
>in
>cloud environments where the broker id is the ip address and instances
>come
>and go.  Many scripts get written to do a bunch of steps when it could
>just
>be one.  This makes automatically "in-servicing" a new broker and having
>it
>take up the balance of the work for the broker that it is replacing much
>simpler and straightforward. It also makes it more consistent across the
>community. I created a JIRA
>https://issues.apache.org/jira/browse/KAFKA-1678
>for those just now.
>
>On Mon, Oct 6, 2014 at 2:10 PM, Gwen Shapira 
>wrote:
>
>> Do we have a jira to support removal of dead brokers without having to
>> start a new broker with the same id?
>>
>> I think its something we'll want to allow.
>>
>> On Thu, Oct 2, 2014 at 7:45 AM, Jun Rao  wrote:
>> > The reassign partition process only completes after the new replicas
>>are
>> > fully caught up and the old replicas are deleted. So, if the old
>>replica
>> is
>> > down, the process can never complete, which is what you observed. In
>>your
>> > case, if you just want to replace a broker host with a new one,
>>instead
>> of
>> > using the reassign partition tool, simply start a new broker with the
>> same
>> > broker id as the old one, the new broker will replicate all the data
>> > automatically.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> > On Wed, Oct 1, 2014 at 3:43 PM, Lung, Paul  wrote:
>> >
>> >> Hi All,
>> >>
>> >> I had a 0.8.1.1 Kafka Broker go down, and I was trying to use the
>> reassign
>> >> partition script to move topics off that broker. When I describe the
>> >> topics, I see the following:
>> >>
>> >> Topic: mini__022active_120__33__mini Partition: 0 Leader: 2131118
>> >> Replicas: 2131118,2166601,2163421 Isr: 2131118,2166601
>> >>
>> >> This shows that the broker ³2163421² is down. So I create the
>>following
>> >> file /tmp/move_topic.json:
>> >> {
>> >> "version": 1,
>> >> "partitions": [
>> >> {
>> >> "topic": "mini__022active_120__33__mini",
>> >> "partition": 0,
>> >> "replicas": [
>> >> 2131118, 2166601,  2156998
>> >> ]
>> >> }
>> >> ]
>> >> }
>> >>
>> >> And then do this:
>> >>
>> >> ./kafka-reassign-partitions.sh --execute --reassignment-json-file
>> >> /tmp/move_topic.json
>> >> Successfully started reassignment of partitions
>> >>
>> 
>>{"version":1,"partitions":[{"topic":"mini__022active_120__33__mini","
>>partition":0,"replicas":[2131118,2166601,2156998]}]}
>> >>
>> >> However, when I try to verify this, I get the following error:
>> >> ./kafka-reassign-partitions.sh --verify --reassignment-json-file
>> >> /tmp/move_topic.json
>> >> Status of partition reassignment:
>> >> ERROR: Assigned replicas (2131118,2166601,2156998,2163421) don't
>>match
>> the
>> >> list of replicas for reassignment (2131118,2166601,2156998) for
>> partition
>> >> [mini__022active_120__33__mini,0]
>> >> Reassignment of partition [mini__022active_120__33__mini,0]
>>failed
>> >>
>> >> If I describe the topics, I now see there are 4 replicas. This has
>>been
>> >> like this for many hours now, so it seems to have permanently moved
>>to 4
>> >> replicas for some reason.
>> >> Topic:mini__022active_120__33__mini PartitionCount:1
>> >> ReplicationFactor:4 Configs:
>> >> Topic: mini__022active_120__33__mini Partition: 0 Leader: 2131118
>> >> Replicas: 2131118,2166601,2156998,2163421 Isr: 2131118,2166601
>> >>
>> >> If I re-execute and re-verify, I get the same error. So it seems to
>>be
>> >> wedged.
>> >>
>> >> Can someone help?
>> >>
>> >> Paul Lung
>> >>
>> >>
>> >>
>>



Re: Is kafka suitable for our architecture?

2014-10-09 Thread Christian Csar
Apart from your data locality problem it sounds like what you want is a
workqueue. Kafka's consumer structure doesn't lend itself too well to
that use case as a single partition of a topic should only have one
consumer instance per logical subscriber of the topic, and that consumer
would not be able to mark jobs as completed except in a strict order
(while maintaining a processed successfully at least once guarantee).
This is not to say it cannot be done, but I believe your workqueue would
end up working a bit strangely if built with Kafka.

Christian

On 10/09/2014 06:13 AM, William Briggs wrote:
> Manually managing data locality will become difficult to scale. Kafka is
> one potential tool you can use to help scale, but by itself, it will not
> solve your problem. If you need the data in near-real time, you could use a
> technology like Spark or Storm to stream data from Kafka and perform your
> processing. If you can batch the data, you might be better off pulling it
> into a distributed filesystem like HDFS, and using MapReduce, Spark or
> another scalable processing framework to handle your transformations. Once
> you've paid the initial price for moving the document into HDFS, your
> network traffic should be fairly manageable; most clusters built for this
> purpose will schedule work to be run local to the data, and typically have
> separate, high-speed network interfaces and a dedicated switch in order to
> optimize intra-cluster communications when moving data is unavoidable.
> 
> -Will
> 
> On Thu, Oct 9, 2014 at 7:57 AM, Albert Vila  wrote:
> 
>> Hi
>>
>> I just came across Kafta when I was trying to find solutions to scale our
>> current architecture.
>>
>> We are currently downloading and processing 6M documents per day from
>> online and social media. We have a different workflow for each type of
>> document, but some of the steps are keyword extraction, language detection,
>> clustering, classification, indexation,  We are using Gearman to
>> dispatch the job to workers and we have some queues on a database.
>>
>> I'm wondering if we could integrate Kafka on the current workflow and if
>> it's feasible. One of our main discussions are if we have to go to a fully
>> distributed architecture or to a semi-distributed one. I mean, distribute
>> everything or process some steps on the same machine (crawling, keyword
>> extraction, language detection, indexation). We don't know which one scales
>> more, each one has pros and cont.
>>
>> Now we have a semi-distributed one as we had network problems taking into
>> account the amount of data we were moving around. So now, all documents
>> crawled on server X, later on are dispatched through Gearman to the same
>> server. What we dispatch on Gearman is only the document id, and the
>> document data remains on the crawling server on a Memcached, so the network
>> traffic is keep at minimum.
>>
>> What do you think?
>> It's feasible to remove all database queues and Gearman and move to Kafka?
>> As Kafka is mainly based on messages I think we should move the messages
>> around, should we take into account the network? We may face the same
>> problems?
>> If so, there is a way to isolate some steps to be processed on the same
>> machine, to avoid network traffic?
>>
>> Any help or comment will be appreciate. And If someone has had a similar
>> problem and has knowledge about the architecture approach will be more than
>> welcomed.
>>
>> Thanks
>>
> 



RE: refactoring ZK so it is plugable, would this make sense?

2014-10-09 Thread S Ahmed
Hi,

I was wondering if the zookeeper library (zkutils.scala etc) was designed
in a more modular way, would it make it possible to run a more "lean"
version of kafka?

The idea is I want to run kafka but with a less emphasis on it being
durable with failover and more on it being a replacement for a standard
queue like kestrel.

This way you could take advantage of how the other aspects of Kafka
(permanent log, etc etc.)

I was just thinking if the zookeeper access was wrapped in something like:

class DiscoverService

   def electLeader ..
   def getFollower ...

(I'm just making those methods up, but you get the point they are simply
the same calls zkutils etc. will be making to connect to zookeeper)

Now the idea is, if you don't want to dedicate 3 servers to run zookeeper,
you could create your own implementation that e.g. returns data based on a
configuration file that is static and not a discover service like zookeeper.

Would wrapping the zookeper calls into a plugable/swapable service make
sense and allow you to still use Kakfa at a smaller scale or would this not
work for other reasons that I am overlooking?


Re: how to identify rogue consumer

2014-10-09 Thread Jun Rao
Yes.

Thanks,

Jun

On Wed, Oct 8, 2014 at 10:53 PM, Steven Wu  wrote:

> Jun, you mean trace level logging for requestAppender?
> log4j.logger.kafka.network.Processor=TRACE, requestAppender
>
> if it happens again, I can try to enable it.
>
> On Wed, Oct 8, 2014 at 9:54 PM, Jun Rao  wrote:
>
> > If enabled request logging, you can find this out.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Oct 8, 2014 at 8:57 PM, Steven Wu  wrote:
> >
> > > I have seen very high "Fetch-Consumer-RequestsPerSec" (like 15K) per
> > broker
> > > in a relatively idle cluster. My hypothesis some misbehaving consumer
> > has a
> > > tight polling loop without any back-off logic with empty fetch.
> > >
> > > Unfortunately, this metric doesn't have per-topic breakdown like
> > > "BytesInPerSec" or "MessagesInPerSec". So I can't really tell which
> > > topic/consumer is pounding the cluster.
> > >
> > > Also the storm already ended. So I can't use tcpdump to capture live
> > > traffic.
> > >
> > > So any suggestion?
> > >
> >
>


Re: Is kafka suitable for our architecture?

2014-10-09 Thread William Briggs
Manually managing data locality will become difficult to scale. Kafka is
one potential tool you can use to help scale, but by itself, it will not
solve your problem. If you need the data in near-real time, you could use a
technology like Spark or Storm to stream data from Kafka and perform your
processing. If you can batch the data, you might be better off pulling it
into a distributed filesystem like HDFS, and using MapReduce, Spark or
another scalable processing framework to handle your transformations. Once
you've paid the initial price for moving the document into HDFS, your
network traffic should be fairly manageable; most clusters built for this
purpose will schedule work to be run local to the data, and typically have
separate, high-speed network interfaces and a dedicated switch in order to
optimize intra-cluster communications when moving data is unavoidable.

-Will

On Thu, Oct 9, 2014 at 7:57 AM, Albert Vila  wrote:

> Hi
>
> I just came across Kafta when I was trying to find solutions to scale our
> current architecture.
>
> We are currently downloading and processing 6M documents per day from
> online and social media. We have a different workflow for each type of
> document, but some of the steps are keyword extraction, language detection,
> clustering, classification, indexation,  We are using Gearman to
> dispatch the job to workers and we have some queues on a database.
>
> I'm wondering if we could integrate Kafka on the current workflow and if
> it's feasible. One of our main discussions are if we have to go to a fully
> distributed architecture or to a semi-distributed one. I mean, distribute
> everything or process some steps on the same machine (crawling, keyword
> extraction, language detection, indexation). We don't know which one scales
> more, each one has pros and cont.
>
> Now we have a semi-distributed one as we had network problems taking into
> account the amount of data we were moving around. So now, all documents
> crawled on server X, later on are dispatched through Gearman to the same
> server. What we dispatch on Gearman is only the document id, and the
> document data remains on the crawling server on a Memcached, so the network
> traffic is keep at minimum.
>
> What do you think?
> It's feasible to remove all database queues and Gearman and move to Kafka?
> As Kafka is mainly based on messages I think we should move the messages
> around, should we take into account the network? We may face the same
> problems?
> If so, there is a way to isolate some steps to be processed on the same
> machine, to avoid network traffic?
>
> Any help or comment will be appreciate. And If someone has had a similar
> problem and has knowledge about the architecture approach will be more than
> welcomed.
>
> Thanks
>


Is kafka suitable for our architecture?

2014-10-09 Thread Albert Vila
Hi

I just came across Kafta when I was trying to find solutions to scale our
current architecture.

We are currently downloading and processing 6M documents per day from
online and social media. We have a different workflow for each type of
document, but some of the steps are keyword extraction, language detection,
clustering, classification, indexation,  We are using Gearman to
dispatch the job to workers and we have some queues on a database.

I'm wondering if we could integrate Kafka on the current workflow and if
it's feasible. One of our main discussions are if we have to go to a fully
distributed architecture or to a semi-distributed one. I mean, distribute
everything or process some steps on the same machine (crawling, keyword
extraction, language detection, indexation). We don't know which one scales
more, each one has pros and cont.

Now we have a semi-distributed one as we had network problems taking into
account the amount of data we were moving around. So now, all documents
crawled on server X, later on are dispatched through Gearman to the same
server. What we dispatch on Gearman is only the document id, and the
document data remains on the crawling server on a Memcached, so the network
traffic is keep at minimum.

What do you think?
It's feasible to remove all database queues and Gearman and move to Kafka?
As Kafka is mainly based on messages I think we should move the messages
around, should we take into account the network? We may face the same
problems?
If so, there is a way to isolate some steps to be processed on the same
machine, to avoid network traffic?

Any help or comment will be appreciate. And If someone has had a similar
problem and has knowledge about the architecture approach will be more than
welcomed.

Thanks