Re: Pros and cons of dockerizing kafka brokers?

2016-07-08 Thread Christian
We're using AWS ECS for our Kafka cluster of six nodes. We did some
performance testing on a three node cluster and the results were as good as
the Linkedin published results on bare metal machines.

We are using EBS st1 drives. The bottleneck is the network to the ebs
volumes. So for about 25% more cost, we doubled our vms, using twice as
many 1/2 sized EBS volumes.

-Christian

On Fri, Jul 8, 2016 at 12:07 PM Krish  wrote:

> Thanks, Christian.
> I am currently reading about kafka-on-mesos.
> I will hack something this weekend to see if I can bring up a kafka
> scheduler on mesos using dockerized brokers. .
>
>
>
> --
> κρισhναν
>
> On Thu, Jul 7, 2016 at 7:29 PM, Christian Posta  >
> wrote:
>
> > One thing I can think of is Kafka likes lots of OS page cache.
> Dockerizing
> > from the standpoint of packaging configs is a good idea, just make sure
> if
> > you're running many brokers together on the same host, they've got enough
> > resources (CPU/Mem) so they don't starve each other.
> >
> > On Thu, Jul 7, 2016 at 2:30 AM, Krish  wrote:
> >
> >> Hi,
> >> I am currently testing a custom docker volume driver plugin for AWS
> >> EFS/EBS
> >> access and mounting. So, running kafka broker inside a container makes
> >> will
> >> ease up a lot of configuration issues wrt storage for me.
> >>
> >> Are there any pros and cons of dockerizing kafka broker?
> >> Off the top of my head, since kafka forms the base of our setup, I can
> >> think of making is use the host networking stack, and increase ulimits
> for
> >> the container.
> >> I would like to know if and when kafka becomes greedy and cannibalizes
> >> resources; I can also ensure that it runs on a dedicated machine.
> >>
> >> Thanks.
> >>
> >> Best,
> >> Krish
> >>
> >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
> >
> >
>


Problem connecting to ZooKeeper with Kafka SASL enabled

2017-01-07 Thread Christian
Hi,

I'm trying to set up SASL_PLAINTEXT authentication between the
producer/consumer clients and the Kafka brokers only. I am not too worried
about the broker to broker SASL nor do I want Kafka Broker to ZooKeeper
SASL Authentication. I would prefer the just require authentication for the
clients to Kafka and that's it.

The problem I am having is that when I turn on SASL, my broker to zk
connection times out.

Initiating client connection, connectString=ZK1:2181,ZK2:2181,ZK3:2181
sessionTimeout=4
watcher=io.confluent.admin.utils.ZookeeperConnectionWatcher@5a2e4553
Opening socket connection to server ZK1:2181
Socket connection established to ZK1:2181, initiating session
Session establishment complete on server ZK1:2181  sessionid =
0x15976c66b690106, negotiated timeout = 4
Timed out waiting for connection to Zookeeper server
[ZK1:2181,ZK2:2181,ZK3:2181].
Session: 0x15976c66b690106 closed
EventThread shut down

I am using Kafka 0.10.1.0 and ZooKeeper 3.4.9

The following are the settings I have for SASL:

KAFKA_OPTS="-Djava.security.auth.login.config=/var/lib/kafka/config/kafka_server_jaas.conf
-Dzookeeper.sasl.client=false"

inter.broker.protocol=SASL_PLAINTEXT

sasl.enabled.mechanisms=PLAIN

*Kafka.listeners=S*ASL_PLAINTEXT://0.0.0.0:92


my kafka_server_jaas.conf file looks like the following:

KafkaServer {

   org.apache.kafka.common.security.plain.PlainLoginModule required

   username="admin"

   password="admin-secret"

   user_admin="admin-secret"

   user_alice="alice-secret";

};

Do you have any suggestions? I have tried many combinations. Without
setting zookeeper.sasl.client=false, I get a SASL error that says I
need to define the Client portion of the jaas config file to talk with
ZooKeeper. setting it to false, gives me this timeout, but only when I
also set the -Djava.security.auth... property.

I know, I'm missing a small thing.

Thanks,

Christian


Re: Problem connecting to ZooKeeper with Kafka SASL enabled

2017-01-07 Thread Christian
I figured it out. I am using Confluent 3.1.1's docker images. There is a
file in
https://github.com/confluentinc/cp-docker-images/blob/master/debian/base/include/cub
which looks for an environment variable of "ZOOKEEPER_SASL_ENABLED" which
defaults to true if using jaas. I simply set that var to false and I got
past the problem.



On Sat, Jan 7, 2017 at 7:54 AM, Christian  wrote:

> Hi,
>
> I'm trying to set up SASL_PLAINTEXT authentication between the
> producer/consumer clients and the Kafka brokers only. I am not too worried
> about the broker to broker SASL nor do I want Kafka Broker to ZooKeeper
> SASL Authentication. I would prefer the just require authentication for the
> clients to Kafka and that's it.
>
> The problem I am having is that when I turn on SASL, my broker to zk
> connection times out.
>
> Initiating client connection, connectString=ZK1:2181,ZK2:2181,ZK3:2181
> sessionTimeout=4 watcher=io.confluent.admin.utils.
> ZookeeperConnectionWatcher@5a2e4553
> Opening socket connection to server ZK1:2181
> Socket connection established to ZK1:2181, initiating session
> Session establishment complete on server ZK1:2181  sessionid =
> 0x15976c66b690106, negotiated timeout = 4
> Timed out waiting for connection to Zookeeper server
> [ZK1:2181,ZK2:2181,ZK3:2181].
> Session: 0x15976c66b690106 closed
> EventThread shut down
>
> I am using Kafka 0.10.1.0 and ZooKeeper 3.4.9
>
> The following are the settings I have for SASL:
>
> KAFKA_OPTS="-Djava.security.auth.login.config=/var/lib/
> kafka/config/kafka_server_jaas.conf -Dzookeeper.sasl.client=false"
>
> inter.broker.protocol=SASL_PLAINTEXT
>
> sasl.enabled.mechanisms=PLAIN
>
> *Kafka.listeners=S*ASL_PLAINTEXT://0.0.0.0:92
>
>
> my kafka_server_jaas.conf file looks like the following:
>
> KafkaServer {
>
>org.apache.kafka.common.security.plain.PlainLoginModule required
>
>username="admin"
>
>password="admin-secret"
>
>user_admin="admin-secret"
>
>user_alice="alice-secret";
>
> };
>
> Do you have any suggestions? I have tried many combinations. Without setting 
> zookeeper.sasl.client=false, I get a SASL error that says I need to define 
> the Client portion of the jaas config file to talk with ZooKeeper. setting it 
> to false, gives me this timeout, but only when I also set the 
> -Djava.security.auth... property.
>
> I know, I'm missing a small thing.
>
> Thanks,
>
> Christian
>
>


Kafka SASL_PLAINTEXT and authentication/authorization backend failure

2017-01-19 Thread Christian
I have successfully gotten SASL_PLAINTEXT configured on Kafka cluster. We
implemented our own LoginModule and Server with the following caveat that I
am guessing I am doing something wrong.

The LoginModule's login method acquires a session id from an internal
security system and populates the subject with the relevant information. In
the server evaluateResponse we then validate that session.  On success,
everything is great. However, when the evaulateResponse returns with a
failure (throws an exception), the producer client just hangs when sending
a message until the configured timeout occurs. Interestingly enough, we see
the evaulateResponse method is getting called about every second until the
the producer client finally times out.

We get this same behavior when using the PlainLoginModule provided with
Kafka after changing the password on the client side to something different
from the server side.

Is this expected behavior?

Thanks,
Christian


Re: Kafka SASL_PLAINTEXT and authentication/authorization backend failure

2017-01-19 Thread Christian
Thanks for the response Gerrit! It seems like authorization has the same
behavior. Have you experienced that as well?

On Thu, Jan 19, 2017 at 11:48 AM, Gerrit Jansen van Vuuren <
gerrit...@gmail.com> wrote:

> Hi,
>
> I've added kerberos support for https://github.com/gerritjvv/kafka-fast
> and
> have seen that the kafka brokers do not send any response if the SASL
> authentication is not correct or accepted, thus causing the client to hang
> while waiting for a response from kafka.
>
> Some things that might help to debug:
>
>- kafka 0.9's SASL auth is in-compatible with 0.10 and not using the
>correct version will cause the kafka client to hang.
>-  use -Dsun.security.krb5.debug=true and
> -Djava.security.debug=gssloginconfig,configfile,configparser,logincontext
> to see debug info about what's going on.
>
>
> Some reading material can be found at:
> https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/Kerberos.md
>
> and if you want to see or need for testing a vagrant env with kerberos +
> kafka configured see
> https://github.com/gerritjvv/kafka-fast/blob/master/kafka-
> clj/doc/vagrant.md
>
>
>
>
> On Thu, Jan 19, 2017 at 7:37 PM, Christian  wrote:
>
> > I have successfully gotten SASL_PLAINTEXT configured on Kafka cluster. We
> > implemented our own LoginModule and Server with the following caveat
> that I
> > am guessing I am doing something wrong.
> >
> > The LoginModule's login method acquires a session id from an internal
> > security system and populates the subject with the relevant information.
> In
> > the server evaluateResponse we then validate that session.  On success,
> > everything is great. However, when the evaulateResponse returns with a
> > failure (throws an exception), the producer client just hangs when
> sending
> > a message until the configured timeout occurs. Interestingly enough, we
> see
> > the evaulateResponse method is getting called about every second until
> the
> > the producer client finally times out.
> >
> > We get this same behavior when using the PlainLoginModule provided with
> > Kafka after changing the password on the client side to something
> different
> > from the server side.
> >
> > Is this expected behavior?
> >
> > Thanks,
> > Christian
> >
>


Re: Kafka SASL_PLAINTEXT and authentication/authorization backend failure

2017-01-20 Thread Christian
I get this same behavior with Kafka 0.10.1.0 using PlainLoginModule and
simply making the password different from expected on the client. We also
get this behavior when creating our own Authorizer and always returning
false. I can tell a retry is happing because the brokers get called at
least every second. The problem is we can't find a way to get the client to
recognize this and re-initiate the original session from the client side.
So instead, we are forced to wait for timeout time.

On Thu, Jan 19, 2017 at 6:09 PM, Gerrit Jansen van Vuuren <
gerrit...@gmail.com> wrote:

> Hi,
>
> I refer to the broker behaviour, for most part without SASL the brokers do
> respond but as soon as you put SASL into the mix it does hang whenever
> there is something that goes wrong, i.e the broker doesn't provide a
> response on the socket and the client waits reading a response that it
> never gets and timeout.
>
> e.g
>
> kafka 0.9 SASL protocol is [kerberos specific interchange]  protocol interchange>
> kafka 0.10   is [len][kafka protocol][kerberos specific
> interchange] 
>
> the kerberos specific interchange parts is where things can hang (meaning
> the client or broker are waiting for information from either that never
> arrives), and if you write [len][kafka protocol] to a SASL configured 0.9
> kafka broker or vice versa the client timeout eventually not on retrying
> but on waiting on a response from the broker.
>
> I would file a bug but don't think this behaviour can be helped here.
>
> Regards,
> Gerrit
>
>
>
>
> On Fri, Jan 20, 2017 at 12:05 AM, Ismael Juma  wrote:
>
> > Hi Gerrit,
> >
> > I think it's important to distinguish broker and client behaviour. The
> > clients can hang because they keep retrying when they get certain errors.
> > When it comes to the broker, it should give you errors as a general rule.
> > If you are aware of certain scenarios where it should give an error and
> it
> > doesn't, then please file a bug with steps to reproduce.
> >
> > Ismael
> >
> > On Thu, Jan 19, 2017 at 6:48 PM, Gerrit Jansen van Vuuren <
> > gerrit...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I've added kerberos support for https://github.com/gerritjvv/
> kafka-fast
> > > and
> > > have seen that the kafka brokers do not send any response if the SASL
> > > authentication is not correct or accepted, thus causing the client to
> > hang
> > > while waiting for a response from kafka.
> > >
> > > Some things that might help to debug:
> > >
> > >- kafka 0.9's SASL auth is in-compatible with 0.10 and not using the
> > >correct version will cause the kafka client to hang.
> > >-  use -Dsun.security.krb5.debug=true and
> > > -Djava.security.debug=gssloginconfig,configfile,
> > configparser,logincontext
> > > to see debug info about what's going on.
> > >
> > >
> > > Some reading material can be found at:
> > > https://github.com/gerritjvv/kafka-fast/blob/master/kafka-
> > clj/Kerberos.md
> > >
> > > and if you want to see or need for testing a vagrant env with kerberos
> +
> > > kafka configured see
> > > https://github.com/gerritjvv/kafka-fast/blob/master/kafka-
> > > clj/doc/vagrant.md
> > >
> > >
> > >
> > >
> > > On Thu, Jan 19, 2017 at 7:37 PM, Christian  wrote:
> > >
> > > > I have successfully gotten SASL_PLAINTEXT configured on Kafka
> cluster.
> > We
> > > > implemented our own LoginModule and Server with the following caveat
> > > that I
> > > > am guessing I am doing something wrong.
> > > >
> > > > The LoginModule's login method acquires a session id from an internal
> > > > security system and populates the subject with the relevant
> > information.
> > > In
> > > > the server evaluateResponse we then validate that session.  On
> success,
> > > > everything is great. However, when the evaulateResponse returns with
> a
> > > > failure (throws an exception), the producer client just hangs when
> > > sending
> > > > a message until the configured timeout occurs. Interestingly enough,
> we
> > > see
> > > > the evaulateResponse method is getting called about every second
> until
> > > the
> > > > the producer client finally times out.
> > > >
> > > > We get this same behavior when using the PlainLoginModule provided
> with
> > > > Kafka after changing the password on the client side to something
> > > different
> > > > from the server side.
> > > >
> > > > Is this expected behavior?
> > > >
> > > > Thanks,
> > > > Christian
> > > >
> > >
> >
>


Kafka SASL and custom LoginModule and Authorizer

2017-02-25 Thread Christian
We have implemented our own LoginModule and Authorizer. The LoginModule
does an authentication on the client side, obtains a token and passes that
token down to our custom SaslServer which then verifies that this token is
valid. Our Authorizer gets that token and asks another custom service if
the necessary topic permissions are there. This is a very simplified
description, but it should suffice for my question.

I've found that the LoginModule only authenticates once and passes that
token down once as well. Our service requires a heartbeat to keep the token
alive. I would like the SaslService to call our authentication service
every once in.a while and if the token ever times out (it times out after
24 hours; even with heartbeats, but heartbeats every so many minutes can
extend the session to 24 hours), then I'd like it to respond back to the
LoginModule with some sort of failed to authorize message or code.

Once this gets passed to the Authorizer, we can extend the session by
querying our internal Authentication/Authorization service. I was hoping,
as.a fallback plan that the Authorizer could do this, by simply throwing an
exception or failing the request when the authorization finally returns
false (due to session timeout), but I don't see anywhere in the
documentation where a certain kind of failure in the authorizer can bubble
up to the authenticator and I don't see how I can configure the loginmodule
to periodically redo authentication. Can anyone out there help me? Is the
Kafka SASL implementation not meant for such a complicated scenario or am I
just thinking about it all wrong?

Thanks,
Christian


Re: Kafka SASL and custom LoginModule and Authorizer

2017-02-26 Thread Christian
Thank you Harsha!

On Sun, Feb 26, 2017 at 10:27 AM, Harsha Chintalapani 
wrote:

> Hi Christian,
>  Kafka client connections are long-llving connections,
> hence the authentication part comes up during connection establishment and
> once we authenticate regular kafka protocols can be exchanged.
> Doing heartbeat to keep the token alive in a Authorizer is not a good idea.
> Authorizer' role is to tell if user A has permission on topic X etc.. not
> to invalidate a  user's session. Hence it won't propagate a exception into
> LoginModule. What you trying to do seems similar to DelegationToken . Have
> you checked this KIP
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 48+Delegation+token+support+for+Kafka
> .
>
> Thanks,
> Harsha
>
> On Sat, Feb 25, 2017 at 6:48 PM Christian  wrote:
>
> > We have implemented our own LoginModule and Authorizer. The LoginModule
> > does an authentication on the client side, obtains a token and passes
> that
> > token down to our custom SaslServer which then verifies that this token
> is
> > valid. Our Authorizer gets that token and asks another custom service if
> > the necessary topic permissions are there. This is a very simplified
> > description, but it should suffice for my question.
> >
> > I've found that the LoginModule only authenticates once and passes that
> > token down once as well. Our service requires a heartbeat to keep the
> token
> > alive. I would like the SaslService to call our authentication service
> > every once in.a while and if the token ever times out (it times out after
> > 24 hours; even with heartbeats, but heartbeats every so many minutes can
> > extend the session to 24 hours), then I'd like it to respond back to the
> > LoginModule with some sort of failed to authorize message or code.
> >
> > Once this gets passed to the Authorizer, we can extend the session by
> > querying our internal Authentication/Authorization service. I was hoping,
> > as.a fallback plan that the Authorizer could do this, by simply throwing
> an
> > exception or failing the request when the authorization finally returns
> > false (due to session timeout), but I don't see anywhere in the
> > documentation where a certain kind of failure in the authorizer can
> bubble
> > up to the authenticator and I don't see how I can configure the
> loginmodule
> > to periodically redo authentication. Can anyone out there help me? Is the
> > Kafka SASL implementation not meant for such a complicated scenario or
> am I
> > just thinking about it all wrong?
> >
> > Thanks,
> > Christian
> >
>


Re: Kafka take too long to update the client with metadata when a broker is gone

2016-06-03 Thread Christian
Hi Gerard,

When trying to reproduce this, did you use the go sarama client Safique
mentioned?


On Fri, Jun 3, 2016 at 5:10 AM, Gerard Klijs 
wrote:

> I asume you use a replication factor of 3 for the topics? When I ran some
> test with producer/consumers in a dockerized setup, there where only few
> failures before the producer switched to to correct new broker again. I
> don't know the exact time, but seemed like a few seconds at max, this was
> with with 0.9.0.0.
>
> On Fri, Jun 3, 2016 at 9:00 AM safique ahemad 
> wrote:
>
> > Hi Steve,
> >
> > There is no way to access that from public side so I won't be able to do
> > that. Sorry for that.
> > But the step is quite simple. The only difference is that we have
> deployed
> > Kafka cluster using mesos url.
> >
> > 1) launch 3 Kafka broker cluster and create a topic with multiple
> > partitions at least 3 so that one partition land at least on a broker.
> > 2) launch consumer/producer client.
> > 3) kill a broker
> > 4) just observe the behavior of producer client
> >
> >
> >
> > On Thu, Jun 2, 2016 at 8:15 PM, Steve Tian 
> > wrote:
> >
> > > I see.  I'm not sure if this is a known issue.  Do you mind share the
> > > brokers/topics setup and the steps to reproduce this issue?
> > >
> > > Cheers, Steve
> > >
> > > On Fri, Jun 3, 2016, 9:45 AM safique ahemad 
> > wrote:
> > >
> > > > you got it right...
> > > >
> > > > But DialTimeout is not a concern here. Client try fetching metadata
> > from
> > > > Kafka brokers but Kafka give them stale metadata near 30-40 sec.
> > > > It try to fetch 3-4 time in between until it get updated metadata.
> > > > This is completely different problem than
> > > > https://github.com/Shopify/sarama/issues/661
> > > >
> > > >
> > > >
> > > > On Thu, Jun 2, 2016 at 6:05 PM, Steve Tian 
> > > > wrote:
> > > >
> > > > > So you are coming from
> https://github.com/Shopify/sarama/issues/661
> > ,
> > > > > right?   I'm not sure if anything from broker side can help but
> looks
> > > > like
> > > > > you already found DialTimeout on client side can help?
> > > > >
> > > > > Cheers, Steve
> > > > >
> > > > > On Fri, Jun 3, 2016, 8:33 AM safique ahemad 
> > > > wrote:
> > > > >
> > > > > > kafka version:0.9.0.0
> > > > > > go sarama client version: 1.8
> > > > > >
> > > > > > On Thu, Jun 2, 2016 at 5:14 PM, Steve Tian <
> > steve.cs.t...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Client version?
> > > > > > >
> > > > > > > On Fri, Jun 3, 2016, 4:44 AM safique ahemad <
> > saf.jnu...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > We are using Kafka broker cluster in our data center.
> > > > > > > > Recently, It is realized that when a Kafka broker goes down
> > then
> > > > > client
> > > > > > > try
> > > > > > > > to refresh the metadata but it get stale metadata upto near
> 30
> > > > > seconds.
> > > > > > > >
> > > > > > > > After near 30-35 seconds, updated metadata is obtained by
> > client.
> > > > > This
> > > > > > is
> > > > > > > > really a large time for the client continuously gets send
> > failure
> > > > for
> > > > > > so
> > > > > > > > long.
> > > > > > > >
> > > > > > > > Kindly, reply if any configuration may help here or something
> > > else
> > > > or
> > > > > > > > required.
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Safique Ahemad
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Regards,
> > > > > > Safique Ahemad
> > > > > > GlobalLogic | Leaders in software R&D services
> > > > > > P :+91 120 4342000-2990 | M:+91 9953533367
> > > > > > www.globallogic.com
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Regards,
> > > > Safique Ahemad
> > > > GlobalLogic | Leaders in software R&D services
> > > > P :+91 120 4342000-2990 | M:+91 9953533367
> > > > www.globallogic.com
> > > >
> > >
> >
> >
> >
> > --
> >
> > Regards,
> > Safique Ahemad
> > GlobalLogic | Leaders in software R&D services
> > P :+91 120 4342000-2990 | M:+91 9953533367
> > www.globallogic.com
> >
>


Kafka and no guarantee that every published message is actually received by the broker

2014-02-04 Thread Christian Schuhegger

Hello all,

I was reading in the following paper:

http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf

the following paragraph:

-- snip start --
There are a few reasons why Kafka performed much better. First,
the Kafka producer currently doesn’t wait for acknowledgements
from the broker and sends messages as faster as the broker can
handle. This significantly increased the throughput of the
publisher. With a batch size of 50, a single Kafka producer almost
saturated the 1Gb link between the producer and the broker. This
is a valid optimization for the log aggregation case, as data must
be sent asynchronously to avoid introducing any latency into the
live serving of traffic. We note that without acknowledging the
producer, there is no guarantee that every published message is
actually received by the broker. For many types of log data, it is
desirable to trade durability for throughput, as long as the number
of dropped messages is relatively small. However, we do plan to
address the durability issue for more critical data in the future.
-- snip end --

And I was wondering if this is still true or if the plans for the future 
as described above to address the durability issue for more critical 
data were realized?


Many thanks,
--
Christian Schuhegger




0.8.1 Java Producer API Callbacks

2014-05-01 Thread Christian Csar
I'm looking at using the java producer api for 0.8.1 and I'm slightly
confused by this passage from section 4.4 of
https://kafka.apache.org/documentation.html#theproducer
"Note that as of Kafka 0.8.1 the async producer does not have a
callback, which could be used to register handlers to catch send errors.
Adding such callback functionality is proposed for Kafka 0.9, see
[Proposed Producer
API](https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI)."

org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to have
public Future send(ProducerRecord record, Callback
callback) which looks like the mentioned callback.

How do the callbacks with the async producer? Is it as described in the
comment on the send method (see
https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151
for reference)?

Looking around it seems plausible the language in the documentation
might refer to a separate sort of callback that existed in 0.7 but not
0.8. In our use case we have something useful to do if we can detect
messages failing to be sent.

Christian



signature.asc
Description: OpenPGP digital signature


Re: 0.8.1 Java Producer API Callbacks

2014-05-01 Thread Christian Csar
On 05/01/2014 07:22 PM, Christian Csar wrote:
> I'm looking at using the java producer api for 0.8.1 and I'm slightly
> confused by this passage from section 4.4 of
> https://kafka.apache.org/documentation.html#theproducer
> "Note that as of Kafka 0.8.1 the async producer does not have a
> callback, which could be used to register handlers to catch send errors.
> Adding such callback functionality is proposed for Kafka 0.9, see
> [Proposed Producer
> API](https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI)."
> 
> org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to have
> public Future send(ProducerRecord record, Callback
> callback) which looks like the mentioned callback.
> 
> How do the callbacks with the async producer? Is it as described in the
> comment on the send method (see
> https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151
> for reference)?
> 
> Looking around it seems plausible the language in the documentation
> might refer to a separate sort of callback that existed in 0.7 but not
> 0.8. In our use case we have something useful to do if we can detect
> messages failing to be sent.
> 
> Christian
> 

It appears that I was looking at the Java client rather than the Scala
java api referenced by the documentation
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala

Are both of these currently suited for use from java and still
supported? Given the support for callbacks in the event of failure I am
inclined to use the Java one despite the currently limited support for
specifying partitioners (though it supports specifying the partition) or
encoders.

Any guidance on this would be appreciated.

Christian



signature.asc
Description: OpenPGP digital signature


Re: Which producer to use?

2014-06-23 Thread Christian Csar
I ended up coding against the new one,
org.apache.kafka.clients.producer.Producer, though it is not yet in
production here. It might be slightly more painful to select a partition
since there isn't a place to plug in a partitioner class, but overall it
was quite easy and had the key feature of an Async callback.

Christian

On 06/23/2014 04:54 PM, Guozhang Wang wrote:
> Hi Kyle,
> 
> We have not fully completed the test in production yet for the new
> producer, currently some improvement jiras like KAFKA-1498 are still open.
> Once we have it stabilized in production at LinkedIn we plan to update the
> wiki in favor of the new producer.
> 
> Guozhang
> 
> 
> On Mon, Jun 23, 2014 at 3:39 PM, Kyle Banker  wrote:
> 
>> As of today, the latest Kafka docs show kafka.javaapi.producer.Producer in
>> their example of the producer API (
>> https://kafka.apache.org/documentation.html#producerapi).
>>
>> Is there a reason why the latest producer client
>> (org.apache.kafka.clients.producer.Producer)
>> isn't mentioned? Is this client not preferred or production-ready?
>>
> 
> 
> 




signature.asc
Description: OpenPGP digital signature


Re: EBCDIC support

2014-08-25 Thread Christian Csar
Having been spared any EBCDIC experience whatsoever (ie from a positio
of thorough ignorance), if you are transmitting text or things with a
designated textual form (presumably) I would recommend that your
conversion be to unicode rather than ascii if you don't already have
consumers expecting a given conversion. That way you will avoid losing
information, particularly if you expect any of your conversion tools to
be of more general use.

Christian

On 08/25/2014 05:36 PM, Gwen Shapira wrote:
> Personally, I like converting data before writing to Kafka, so I can
> easily support many consumers who don't know about EBCDIC.
> 
> A third option is to have a consumer that reads EBCDIC data from one
> Kafka topic and writes ASCII to another Kafka topic. This has the
> benefits of preserving the raw data in Kafka, in case you need it for
> troubleshooting, and also supporting non-EBCDIC consumers.
> 
> The cost is a more complex architecture, but if you already have a
> stream processing system around (Storm, Samza, Spark), it can be an
> easy addition.
> 
> 
> On Mon, Aug 25, 2014 at 5:28 PM,   wrote:
>> Thanks Gwen! makes sense. So I'll have to weigh the pros and cons of doing 
>> an EBCDIC to ASCII conversion before sending to Kafka Vs. using an ebcdic 
>> library after in the consumer
>>
>> Thanks!
>> S
>>
>> -Original Message-
>> From: Gwen Shapira [mailto:gshap...@cloudera.com]
>> Sent: Monday, August 25, 2014 5:22 PM
>> To: users@kafka.apache.org
>> Subject: Re: EBCDIC support
>>
>> Hi Sonali,
>>
>> Kafka doesn't really care about EBCDIC or any other format -  for Kafka bits 
>> are just bits. So they are all supported.
>>
>> Kafka does not "read" data from a socket though. Well, it does, but the data 
>> has to be sent by a Kafka producer. Most likely you'll need to implement a 
>> producer that will get the data from the socket and send it as a message to 
>> Kafka. The content of the message can be anything, including EBCDIC -.
>>
>> Then  you'll need a consumer to read the data from Kafka and do something 
>> with this - the consumer will need to know what to do with a message that 
>> contains EBCDIC data. Perhaps you have EBCDIC libraries you can reuse there.
>>
>> Hope this helps.
>>
>> Gwen
>>
>> On Mon, Aug 25, 2014 at 5:14 PM,   wrote:
>>> Hey all,
>>>
>>> This might seem like a silly question, but does kafka have support for 
>>> EBCDIC? Say I had to read data from an IBM mainframe via a TCP/IP socket 
>>> where the data resides in EBCDIC format, can Kafka read that directly?
>>>
>>> Thanks,
>>> Sonali
>>>
>>> 
>>>
>>> This message is for the designated recipient only and may contain 
>>> privileged, proprietary, or otherwise confidential information. If you have 
>>> received it in error, please notify the sender immediately and delete the 
>>> original. Any other use of the e-mail by you is prohibited. Where allowed 
>>> by local law, electronic communications with Accenture and its affiliates, 
>>> including e-mail and instant messaging (including content), may be scanned 
>>> by our systems for the purposes of information security and assessment of 
>>> internal compliance with Accenture policy.
>>> __
>>> 
>>>
>>> www.accenture.com




signature.asc
Description: OpenPGP digital signature


Re: Handling send failures with async producer

2014-08-26 Thread Christian Csar
TLDR: I use one Callback per job I send to Kafka and include that sort
of information by reference in the Callback instance.

Our system is currently moving data from beanstalkd to Kafka due to
historical reasons so we use the callback to either delete or release
the message depending on success. The
org.apache.kafka.clients.producer.Callback I give to the send method is
an instance of a class that stores all the additional information I need
to process the callback. Remember that the async call operates in the
Kafka producer thread so they must be fast to avoid constraining the
throughput. My call back ends up putting information about the call to
beanstalk into another executor service for later processing.

Christian

On 08/26/2014 12:35 PM, Ryan Persaud wrote:
> Hello,
> 
> I'm looking to insert log lines from log files into kafka, but I'm concerned 
> with handling asynchronous send() failures.  Specifically, if some of the log 
> lines fail to send, I want to be notified of the failure so that I can 
> attempt to resend them.
> 
> Based on previous threads on the mailing list 
> (http://comments.gmane.org/gmane.comp.apache.kafka.user/1322), I know that 
> the trunk version of kafka supports callbacks for dealing with failures.  
> However, the callback function is not passed any metadata that can be used by 
> the producer end to reference the original message.  Including the key of the 
> message in the RecordMetadata seems like it would be really useful for 
> recovery purposes.  Is anyone using the callback functionality to trigger 
> resends of failed messages?  If so, how are they tying the callbacks to 
> messages?  Is anyone using other methods for handling async errors/resending 
> today?  I can’t imagine that I am the only one trying to do this.  I asked 
> this question on the IRC channel today, and it sparked some discussion, but I 
> wanted to hear from a wider audience.
> 
> Thanks for the information,
> -Ryan
> 
> 




signature.asc
Description: OpenPGP digital signature


Re: Use case

2014-09-05 Thread Christian Csar
The thought experiment I did ended up having a set of front end servers
corresponding to a given chunk of the user id space, each of which was a
separate subscriber to the same set of partitions. The you have one or
more partitions corresponding to that same chunk of users. You want the
chunk/set of partitions to be of a size where each of those front end
servers can process all the messages in it and send out the
chats/notifications/status change notifications perhaps/read receipts,
to those users who happen to be connected to the particular front end node.

You would need to handle some deduplication on the consumers/FE servers
and would need to decide where to produce. Producing from every front
end server to potentially every broker could be expensive in terms of
connections and you might want to first relay the messages to the
corresponding front end cluster, but since we don't use large numbers of
producers it's hard for me to say.

For persistence and offline delivery you can probably accept a delay in
user receipt so you can use another set of consumers that persist the
messages to the longer latency datastore on the backend and then get the
last 50 or so messages with a bit of lag when the user first looks at
history (see hipchat and hangouts lag).

This gives you a smaller number of partitions and avoids the issue of
having to keep too much history on the Kafka brokers. There are
obviously a significant number of complexities to deal with. For example
if you are using default consumer code that commits offsets into
zookeeper it may be inadvisable at large scales you probably don't need
to worry about reaching. And remember I had done this only as a thought
experiment not a proper technical evaluation. I expect Kafka, used
correctly, can make aspects of building such a chat system much much
easier (you can avoid writing your own message replication system) but
it is definitely not plug and play using topics for users.

Christian


On 09/05/2014 09:46 AM, Jonathan Weeks wrote:
> +1
> 
> Topic Deletion with 0.8.1.1 is extremely problematic, and coupled with the 
> fact that rebalance/broker membership changes pay a cost per partition today, 
> whereby excessive partitions extend downtime in the case of a failure; this 
> means fewer topics (e.g. hundreds or thousands) is a best practice in the 
> published version of kafka. 
> 
> There are also secondary impacts on topic count — e.g. useful operational 
> tools such as: http://quantifind.com/KafkaOffsetMonitor/ start to become 
> problematic in terms of UX with a massive number of topics.
> 
> Once topic deletion is a supported feature, the use-case outlined might be 
> more tenable.
> 
> Best Regards,
> 
> -Jonathan
> 
> On Sep 5, 2014, at 4:20 AM, Sharninder  wrote:
> 
>> I'm not really sure about your exact use-case but I don't think having a
>> topic per user is very efficient. Deleting topics in kafka, at the moment,
>> isn't really straightforward. You should rethink your date pipeline a bit.
>>
>> Also, just because kafka has the ability to store messages for a certain
>> time, don't think of it as a data store. Kafka is a streaming system, think
>> of it as a fast queue that gives you the ability to move your pointer back.
>>
>> --
>> Sharninder
>>
>>
>>
>> On Fri, Sep 5, 2014 at 4:27 PM, Aris Alexis 
>> wrote:
>>
>>> Thanks for the reply. If I use it only for activity streams like twitter:
>>>
>>> I would want a topic for each #tag and a topic for each user and maybe
>>> foreach city. Would that be too many topics or it doesn't matter since most
>>> of them will be deleted in a specified interval.
>>>
>>>
>>>
>>> Best Regards,
>>> Aris Giachnis
>>>
>>>
>>> On Fri, Sep 5, 2014 at 6:57 AM, Sharninder  wrote:
>>>
>>>> Since you want all chats and mail history persisted all the time, I
>>>> personally wouldn't recommend kafka for your requirement. Kafka is more
>>>> suitable as a streaming system where events expire after a certain time.
>>>> Look at something more general purpose like hbase for persisting data
>>>> indefinitely.
>>>>
>>>> So, for example all activity streams can go into kafka from where
>>> consumers
>>>> will pick up messages to parse and put them to hbase or other clients.
>>>>
>>>> --
>>>> Sharninder
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Sep 5, 2014 at 12:05 AM, Aris Alexis 
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I 

Re: Is kafka suitable for our architecture?

2014-10-09 Thread Christian Csar
Apart from your data locality problem it sounds like what you want is a
workqueue. Kafka's consumer structure doesn't lend itself too well to
that use case as a single partition of a topic should only have one
consumer instance per logical subscriber of the topic, and that consumer
would not be able to mark jobs as completed except in a strict order
(while maintaining a processed successfully at least once guarantee).
This is not to say it cannot be done, but I believe your workqueue would
end up working a bit strangely if built with Kafka.

Christian

On 10/09/2014 06:13 AM, William Briggs wrote:
> Manually managing data locality will become difficult to scale. Kafka is
> one potential tool you can use to help scale, but by itself, it will not
> solve your problem. If you need the data in near-real time, you could use a
> technology like Spark or Storm to stream data from Kafka and perform your
> processing. If you can batch the data, you might be better off pulling it
> into a distributed filesystem like HDFS, and using MapReduce, Spark or
> another scalable processing framework to handle your transformations. Once
> you've paid the initial price for moving the document into HDFS, your
> network traffic should be fairly manageable; most clusters built for this
> purpose will schedule work to be run local to the data, and typically have
> separate, high-speed network interfaces and a dedicated switch in order to
> optimize intra-cluster communications when moving data is unavoidable.
> 
> -Will
> 
> On Thu, Oct 9, 2014 at 7:57 AM, Albert Vila  wrote:
> 
>> Hi
>>
>> I just came across Kafta when I was trying to find solutions to scale our
>> current architecture.
>>
>> We are currently downloading and processing 6M documents per day from
>> online and social media. We have a different workflow for each type of
>> document, but some of the steps are keyword extraction, language detection,
>> clustering, classification, indexation,  We are using Gearman to
>> dispatch the job to workers and we have some queues on a database.
>>
>> I'm wondering if we could integrate Kafka on the current workflow and if
>> it's feasible. One of our main discussions are if we have to go to a fully
>> distributed architecture or to a semi-distributed one. I mean, distribute
>> everything or process some steps on the same machine (crawling, keyword
>> extraction, language detection, indexation). We don't know which one scales
>> more, each one has pros and cont.
>>
>> Now we have a semi-distributed one as we had network problems taking into
>> account the amount of data we were moving around. So now, all documents
>> crawled on server X, later on are dispatched through Gearman to the same
>> server. What we dispatch on Gearman is only the document id, and the
>> document data remains on the crawling server on a Memcached, so the network
>> traffic is keep at minimum.
>>
>> What do you think?
>> It's feasible to remove all database queues and Gearman and move to Kafka?
>> As Kafka is mainly based on messages I think we should move the messages
>> around, should we take into account the network? We may face the same
>> problems?
>> If so, there is a way to isolate some steps to be processed on the same
>> machine, to avoid network traffic?
>>
>> Any help or comment will be appreciate. And If someone has had a similar
>> problem and has knowledge about the architecture approach will be more than
>> welcomed.
>>
>> Thanks
>>
> 



Re: Is kafka suitable for our architecture?

2014-10-10 Thread Christian Csar
Kafka might well be equivalent to a simple
>> linear Storm topology.
>>
> Exactly, that's why we are evaluating if only with Kafka is enough.
> Because if Storm gives us the same benefits than Kafka it's better to stick
> with only one technology to keep everything as simple as possible.
> 

I think it is more a question of will using Storm make managing your
various consumers easier. Since I haven't used Storm in a production
environment I can't speak to that. I don't think there is any reason you
*need* to use Storm rather than just Kafka to achieve your needs though.

Christian

> 
>> Christian
>>
> 
> Thanks
> 
> 
> 
>>
>> On Thu, Oct 9, 2014 at 11:57 PM, Albert Vila 
>> wrote:
>>
>>> Hi
>>>
>>> We process data in real time, and we are taking a look at Storm and Spark
>>> streaming too, however our actions are atomic, done at a document level
>> so
>>> I don't know if it fits on something like Storm/Spark.
>>>
>>> Regarding what you Christian said, isn't Kafka used for scenarios like
>> the
>>> one I described? I mean, we do have work queues right now with Gearman,
>> but
>>> with a bunch of workers on each step. I thought we could change that to a
>>> producer and a bunch of consumers (where the message should only reach
>> one
>>> and exact one consumer).
>>>
>>> And what I said about the data locally, it was only an optimization we
>> did
>>> some time ago because we was moving more data back then. Maybe now its
>> not
>>> necessary and we could move messages around the system using Kafka, so it
>>> will allow us to simplify the architecture a little bit. I've seen people
>>> saying they move Tb of data every day using Kafka.
>>>
>>> Just to be clear on the size of each document/message, we are talking
>> about
>>> tweets, blog posts, ... (on 90% of cases the size is less than 50Kb)
>>>
>>> Regards
>>>
>>> On 9 October 2014 20:02, Christian Csar  wrote:
>>>
>>>> Apart from your data locality problem it sounds like what you want is a
>>>> workqueue. Kafka's consumer structure doesn't lend itself too well to
>>>> that use case as a single partition of a topic should only have one
>>>> consumer instance per logical subscriber of the topic, and that
>> consumer
>>>> would not be able to mark jobs as completed except in a strict order
>>>> (while maintaining a processed successfully at least once guarantee).
>>>> This is not to say it cannot be done, but I believe your workqueue
>> would
>>>> end up working a bit strangely if built with Kafka.
>>>>
>>>> Christian
>>>>
>>>> On 10/09/2014 06:13 AM, William Briggs wrote:
>>>>> Manually managing data locality will become difficult to scale. Kafka
>>> is
>>>>> one potential tool you can use to help scale, but by itself, it will
>>> not
>>>>> solve your problem. If you need the data in near-real time, you could
>>>> use a
>>>>> technology like Spark or Storm to stream data from Kafka and perform
>>> your
>>>>> processing. If you can batch the data, you might be better off
>> pulling
>>> it
>>>>> into a distributed filesystem like HDFS, and using MapReduce, Spark
>> or
>>>>> another scalable processing framework to handle your transformations.
>>>> Once
>>>>> you've paid the initial price for moving the document into HDFS, your
>>>>> network traffic should be fairly manageable; most clusters built for
>>> this
>>>>> purpose will schedule work to be run local to the data, and typically
>>>> have
>>>>> separate, high-speed network interfaces and a dedicated switch in
>> order
>>>> to
>>>>> optimize intra-cluster communications when moving data is
>> unavoidable.
>>>>>
>>>>> -Will
>>>>>
>>>>> On Thu, Oct 9, 2014 at 7:57 AM, Albert Vila 
>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I just came across Kafta when I was trying to find solutions to
>> scale
>>>> our
>>>>>> current architecture.
>>>>>>
>>>>>> We are currently downloading and processing 6M documents per day
>> from
>>>>>> online and soc

Re: Node-Kafka Client Review and Question

2013-04-24 Thread Christian Carollo
Hi Everyone,

I have been experimenting with the libraries listed below and experienced the 
same problems. 
I have not found any another other node clients.  I am interested in finding a 
node solution as well.  
Happy to contribute on a common solution.

Christian Carollo

On Apr 24, 2013, at 10:19 AM, Christopher Alexander  
wrote:

> Hi Everyone,
> 
> I just wanted to follow-up on a previous thread concerning our investigation 
> of identifying a stable Node-Kafka client. To date we have tested the 
> following:
> 
> 1. Franz-Kafka (https://github.com/dannycoates/franz-kafka)
> 2. Node-Kafka (v2.1, https://github.com/radekg/node-kafka)
> 3. Node-Kafka (v2.3, https://github.com/marcuswestin/node-kafka)
> 4. Prozess (v0.3.5, https://github.com/cainus/Prozess)
> 
> Results:
> 
> 1. Could not get Franz-Kafka and Prozess to work. Requires funky dependencies.
> 2. Node-Kafka, v2.1 was successfully setup but performed less stable than #3.
> 3. Node-Kafka, v2.3 was successfully setup, exhibited the best performance 
> profile but the consumer is highly inconsistent - specifically, consumer 
> object remained in-memory regardless what we did (i.e. var consumer = 
> undefined after receiving message). Nothing appears to mitigate this and ALL 
> consumed messaged get replayed on reception of a new message.
> 
> With this said, is there a Node-Kafka client people are actually using in 
> production that doesn't exhibit the profiles we have seen? We have 
> back-tracked using Node-Kafka (v2.3) to only produce messages and rely on 
> Redis PubSub channels for asynchronous acking of these messages. We would be 
> willing to roll-up our sleeves with the community to develop a much more 
> stable Node-Kafka client.
> 
> Kind regards,
> 
> Chris Alexander
> Chief Technical Architect and Engineer
> Gravy, Inc.



questtion about log.retention.bytes

2013-08-16 Thread Paul Christian
According to the Kafka 8 documentation under broker configuration. There
are these parameters and their definitions.

log.retention.bytes -1 The maximum size of the log before deleting it
log.retention.bytes.per.topic "" The maximum size of the log for some
specific topic before deleting it

I'm curious what the first value 'log.retention.bytes' is for if the second
one is for per topic logs, because aren't all logs generated per topic? Is
this an aggregate value across topics?

Related question, is there a parameter for kafka where you can say only
hold this much TOTAL data across all topic ( logs/index together )? I.e.
our  hosts have this much available space and so value
log.retention.whatever.aggregate == 75% total  disk space.


re: questtion about log.retention.bytes

2013-08-19 Thread Paul Christian
Hi Jun,

Thank you for your reply. I'm still a little fuzzy on the concept.

Are you saying I can have topic A, B and C and with

log.retention.bytes.per.topic.A = 15MB
log.retention.bytes.per.topic.B = 20MB
log.retention.bytes = 30MB

And thus topic C will get the value 30MB? Since it's not defined like the
others' 'per topic'?

log.retention.bytes is for all topics that are not included in
log.retention.bytes.per.topic
(which defines a map of topic -> size).

Otherwise, log.retention.bytes.per.topic and log.retention.bytes seem very
similar to me.

Additionally, we've experimented with this value on our test cluster where
we set the log.retention.bytes to 11MB as a test. Below is a snippet from
our server.properties.

# A size-based retention policy for logs. Segments are pruned from the log
as long as the remaining
# segments don't drop below log.retention.bytes.
log.retention.bytes=11534336

Here is a ls -lh from one of the topics

-rw-r--r-- 1 kafka service  10M Aug 19 15:45 07021913.index
-rw-r--r-- 1 kafka service 114M Aug 19 15:45 07021913.log

The index file appears to be reflected in the property
log.index.size.max.bytes, but the log just keeps going.


re: questtion about log.retention.bytes

2013-08-20 Thread Paul Christian
Jun,

For my first example is that syntax correct? I.e.

log.retention.bytes.per.topic.A = 15MB
log.retention.bytes.per.topic.B = 20MB

I totally guessed there and was wondering if I guessed right? Otherwise is
there a document with the proper formatting to full out this map?

Thank you,

Paul


Re: questtion about log.retention.bytes

2013-08-20 Thread Paul Christian
Neha,

Correct, that is my question. We want to investigate capping our disk usage
so we don't fill up our hds. If you have any recommended configurations or
documents on these setting, please let us know.

Thank you,

Paul



On Tue, Aug 20, 2013 at 6:16 AM, Paul Christian
wrote:

> Jun,
>
> For my first example is that syntax correct? I.e.
>
> log.retention.bytes.per.topic.A = 15MB
> log.retention.bytes.per.topic.B = 20MB
>
> I totally guessed there and was wondering if I guessed right? Otherwise is
> there a document with the proper formatting to full out this map?
>
> Thank you,
>
> Paul
>
>
>
>


Re: Proper Relationship Between Partition and Threads

2015-01-28 Thread Christian Csar
Ricardo,
   The parallelism of each logical consumer (consumer group) is the number
of partitions. So with four partitions it could make sense to have one
logical consumer (application) have two processes on different machines
each with two threads, or one process with four. While with two logical
consumers (two different applications) you would want each to have 4
threads (4*2 = 8 threads total).

There are also considerations depending on which consumer code you are
using (which I'm decidedly not someone with good information on)

Christian

On Wed, Jan 28, 2015 at 1:28 PM, Ricardo Ferreira <
jricardoferre...@gmail.com> wrote:

> Hi experts,
>
> I'm newbie in the Kafka world, so excuse me for such basic question.
>
> I'm in the process of designing a client for Kafka, and after few hours of
> study, I was told that to achieve a proper level of parallelism, it is a
> best practice having one thread for each partition of an topic.
>
> My question is that this rule-of-thumb also applies for multiple consumer
> applications. For instance:
>
> Considering a topic with 4 partitions, it is OK to have one consumer
> application with 4 threads, just like would be OK to have two consumer
> applications with 2 threads each. But what about having two consumer
> applications with 4 threads each? It would break any load-balancing made by
> Kafka brokers?
>
> Anyway, I'd like to understand if the proper number of threads that should
> match the number of partitions is per application or if there is some other
> best practice.
>
> Thanks in advance,
>
> Ricardo Ferreira
>


Re: How to mark a message as needing to retry in Kafka?

2015-01-28 Thread Christian Csar
noodles,
   Without an external mechanism you won't be able to mark individual
messages/offsets as needing to be retried at a later time. Guozhang is
describing a way to get the offset of a message that's been received so
that you can find it later. You would need to save that into a 'failed
messages' store somewhere else and have code that looks in there to make
retries happen (assuming you want the failure/retry to persist beyond the
lifetime of the process).

Christian


On Wed, Jan 28, 2015 at 7:00 PM, Guozhang Wang  wrote:

> I see. If you are using the high-level consumer, once the message is
> returned to the application it is considered "consumed", and current it is
> not supported to "re-wind" to a previously consumed message.
>
> With the new consumer coming in 0.8.3 release, we have an api for you to
> get the offset of each message and do the rewinding based on offsets. For
> example, you can do sth. like
>
> 
>
>   message = // get one message from consumer
>
>   try {
> // process message
>   } catch {
> consumer.seek(message.offset)
>   }
>
> 
>
> Guozhang
>
> On Wed, Jan 28, 2015 at 6:26 PM, noodles  wrote:
>
> > I did not describe my problem clearly. In my case, I got the message from
> > Kakfa, but I could not handle this message because of some reason, for
> > example the external server is down. So I want to mark the message as not
> > being consumed directly.
> >
> > 2015-01-28 23:26 GMT+08:00 Guozhang Wang :
> >
> > > Hi,
> > >
> > > Which consumer are you using? If you are using a high level consumer
> then
> > > retry would be automatic upon network exceptions.
> > >
> > > Guozhang
> > >
> > > On Wed, Jan 28, 2015 at 1:32 AM, noodles 
> wrote:
> > >
> > > > Hi group:
> > > >
> > > > I'm working for building a webhook notification service based on
> > Kafka. I
> > > > produce all of the payloads into Kafka, and consumers consume these
> > > > payloads by offset.
> > > >
> > > > Sometimes some payloads cannot be consumed because of network
> exception
> > > or
> > > > http server exception. So I want to mark the failed payloads and
> retry
> > > them
> > > > by timers. But I have no idea if I don't use a storage (like MySQL)
> > > except
> > > > kafka and zookeeper.
> > > >
> > > >
> > > > --
> > > > *noodles!*
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > *Yeah, I'm noodles!*
> >
>
>
>
> --
> -- Guozhang
>


Re: Tips for working with Kafka and data streams

2015-02-25 Thread Christian Csar
I wouldn't say no to some discussion of encryption. We're running on Azure
EventHubs (with preparations for Kinesis for EC2, and Kafka for deployments
in customer datacenters when needed) so can't just use disk level
encryption (which would have its own overhead). We're putting all of our
messages inside of encrypted envelopes before sending them to the stream
which limits our opportunities for schema verification of the underlying
messages to the declared type of the message.

Encryption at rest mostly works out to a sales point for customers who want
assurances, and in a Kafka focused discussion might be dealt with by
covering disk encryption and how the conversations between Kafka instances
are protected.

Christian


On Wed, Feb 25, 2015 at 11:51 AM, Jay Kreps  wrote:

> Hey guys,
>
> One thing we tried to do along with the product release was start to put
> together a practical guide for using Kafka. I wrote this up here:
> http://blog.confluent.io/2015/02/25/stream-data-platform-1/
>
> I'd like to keep expanding on this as good practices emerge and we learn
> more stuff. So two questions:
> 1. Anything you think other people should know about working with data
> streams? What did you wish you knew when you got started?
> 2. Anything you don't know about but would like to hear more about?
>
> -Jay
>


Re: Tips for working with Kafka and data streams

2015-02-25 Thread Christian Csar
The questions we get from customers typically end up being general so we
break out our answer into network level and on disk scenarios.

On disk/at rest scenario may just be use full disk encryption at the OS
level and Kafka doesn't need to worry about it. But documenting any issues
around it would be good. For example what sort of Kafka specific
performance impacts does it have, ie budgeting for better processors.

The security story right now is to run on a private network, but I believe
some of our customers like to be told that within datacenter transmissions
are encrypted on the wire. Based on
https://cwiki.apache.org/confluence/display/KAFKA/Security that might mean
waiting for TLS support, or using a VPN/ssh tunnel for the network
connections.

Since we're in hosted stream land we can't do either of the above and
encrypt the messages themselves. For those enterprises that are like our
customers but would run Kafka or use Confluent, having a story like the
above so they don't give up the benefits of your schema management layers
would be good.

Since I didn't mention it before I did find your blog posts handy (though
I'm already moving us towards stream centric land).

Christian

On Wed, Feb 25, 2015 at 3:57 PM, Jay Kreps  wrote:

> Hey Christian,
>
> That makes sense. I agree that would be a good area to dive into. Are you
> primarily interested in network level security or encryption on disk?
>
> -Jay
>
> On Wed, Feb 25, 2015 at 1:38 PM, Christian Csar  wrote:
>
> > I wouldn't say no to some discussion of encryption. We're running on
> Azure
> > EventHubs (with preparations for Kinesis for EC2, and Kafka for
> deployments
> > in customer datacenters when needed) so can't just use disk level
> > encryption (which would have its own overhead). We're putting all of our
> > messages inside of encrypted envelopes before sending them to the stream
> > which limits our opportunities for schema verification of the underlying
> > messages to the declared type of the message.
> >
> > Encryption at rest mostly works out to a sales point for customers who
> want
> > assurances, and in a Kafka focused discussion might be dealt with by
> > covering disk encryption and how the conversations between Kafka
> instances
> > are protected.
> >
> > Christian
> >
> >
> > On Wed, Feb 25, 2015 at 11:51 AM, Jay Kreps  wrote:
> >
> > > Hey guys,
> > >
> > > One thing we tried to do along with the product release was start to
> put
> > > together a practical guide for using Kafka. I wrote this up here:
> > > http://blog.confluent.io/2015/02/25/stream-data-platform-1/
> > >
> > > I'd like to keep expanding on this as good practices emerge and we
> learn
> > > more stuff. So two questions:
> > > 1. Anything you think other people should know about working with data
> > > streams? What did you wish you knew when you got started?
> > > 2. Anything you don't know about but would like to hear more about?
> > >
> > > -Jay
> > >
> >
>


Re: Tips for working with Kafka and data streams

2015-02-25 Thread Christian Csar
Yeah, we do have scenarios where we use customer specific keys so our
envelopes end up containing key identification information for accessing
our key repository. I'll certainly follow any changes you propose in this
area with interest, but I'd expect that sort of centralized key thing to be
fairly separate from Kafka even if there's a handy optional layer that
integrates with it.

Christian

On Wed, Feb 25, 2015 at 5:34 PM, Julio Castillo <
jcasti...@financialengines.com> wrote:

> Although full disk encryption appears to be an easy solution, in our case
> that may not be sufficient. For cases where the actual payload needs to be
> encrypted, the cost of encryption is paid by the consumer and producers.
> Further complicating the matter would be the handling of encryption keys,
> etc. I think this is the area where enhancements to Kafka may facilitate
> that key exchange between consumers and producers, still leaving it up to
> the clients, but facilitating the key handling.
>
> Julio
>
> On 2/25/15, 4:24 PM, "Christian Csar"  wrote:
>
> >The questions we get from customers typically end up being general so we
> >break out our answer into network level and on disk scenarios.
> >
> >On disk/at rest scenario may just be use full disk encryption at the OS
> >level and Kafka doesn't need to worry about it. But documenting any issues
> >around it would be good. For example what sort of Kafka specific
> >performance impacts does it have, ie budgeting for better processors.
> >
> >The security story right now is to run on a private network, but I believe
> >some of our customers like to be told that within datacenter transmissions
> >are encrypted on the wire. Based on
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_conf
> >luence_display_KAFKA_Security&d=AwIBaQ&c=cKbMccWasSe6U4u_qE0M-qEjqwAh3shju
> >L5QPa1B7Yk&r=rJHFl4LhCQ-6kvKROhIocflKqVSHRTvT-PgdZ5MFuS0&m=jhFmJTJBQfbq0sN
> >jxtKA4M1tvSVgBLKOr2ePaK6zqww&s=HqZ4N2gLpCZ796dRG7Fo-KLOBc0tgnnvDnC_8VTUo84
> >&e=  that might mean
> >waiting for TLS support, or using a VPN/ssh tunnel for the network
> >connections.
> >
> >Since we're in hosted stream land we can't do either of the above and
> >encrypt the messages themselves. For those enterprises that are like our
> >customers but would run Kafka or use Confluent, having a story like the
> >above so they don't give up the benefits of your schema management layers
> >would be good.
> >
> >Since I didn't mention it before I did find your blog posts handy (though
> >I'm already moving us towards stream centric land).
> >
> >Christian
> >
> >On Wed, Feb 25, 2015 at 3:57 PM, Jay Kreps  wrote:
> >
> >> Hey Christian,
> >>
> >> That makes sense. I agree that would be a good area to dive into. Are
> >>you
> >> primarily interested in network level security or encryption on disk?
> >>
> >> -Jay
> >>
> >> On Wed, Feb 25, 2015 at 1:38 PM, Christian Csar 
> >>wrote:
> >>
> >> > I wouldn't say no to some discussion of encryption. We're running on
> >> Azure
> >> > EventHubs (with preparations for Kinesis for EC2, and Kafka for
> >> deployments
> >> > in customer datacenters when needed) so can't just use disk level
> >> > encryption (which would have its own overhead). We're putting all of
> >>our
> >> > messages inside of encrypted envelopes before sending them to the
> >>stream
> >> > which limits our opportunities for schema verification of the
> >>underlying
> >> > messages to the declared type of the message.
> >> >
> >> > Encryption at rest mostly works out to a sales point for customers who
> >> want
> >> > assurances, and in a Kafka focused discussion might be dealt with by
> >> > covering disk encryption and how the conversations between Kafka
> >> instances
> >> > are protected.
> >> >
> >> > Christian
> >> >
> >> >
> >> > On Wed, Feb 25, 2015 at 11:51 AM, Jay Kreps  wrote:
> >> >
> >> > > Hey guys,
> >> > >
> >> > > One thing we tried to do along with the product release was start to
> >> put
> >> > > together a practical guide for using Kafka. I wrote this up here:
> >> > >
> >>
> https://urldefense.proofpoint.com/v2/url?u=http-3A__blog.confluent.io_201
> >>5_02_25_stream-2Ddata-2Dplat

Re: kafka producer does not distribute messages to partitions evenly?

2015-03-02 Thread Christian Csar
I believe you are seeing the behavior where the random partitioner is
sticky.
http://mail-archives.apache.org/mod_mbox/kafka-users/201309.mbox/%3ccahwhrrxax5ynimqnacsk7jcggnhjc340y4qbqoqcismm43u...@mail.gmail.com%3E
has details. So with the default 10 minute refresh if your test is only an
hour or two with a single producer you would not expect to see all
partitions be hit.

Christian

On Mon, Mar 2, 2015 at 4:23 PM, Yang  wrote:

> thanks. just checked code below. in the code below, the line that calls
> Random.nextInt() seems to be called only *a few times* , and all the rest
> of the cases getPartition() is called, the
> cached sendPartitionPerTopicCache.get(topic) seems to be called, so
> apparently you won't get an even partition distribution ?
>
> the code I got is from commit 7847e9c703f3a0b70519666cdb8a6e4c8e37c3a7
>
>
> "./core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala" 336
> lines --66%--
> 222,4673%
>
>
>   private def getPartition(topic: String, key: Any, topicPartitionList:
> Seq[PartitionAndLeader]): Int = {
> val numPartitions = topicPartitionList.size
> if(numPartitions <= 0)
>   throw new UnknownTopicOrPartitionException("Topic " + topic + "
> doesn't exist")
> val partition =
>   if(key == null) {
> // If the key is null, we don't really need a partitioner
> // So we look up in the send partition cache for the topic to
> decide the target partition
> val id = sendPartitionPerTopicCache.get(topic)
> id match {
>   case Some(partitionId) =>
> // directly return the partitionId without checking
> availability of the leader,
> // since we want to postpone the failure until the send
> operation anyways
> partitionId
>   case None =>
> val availablePartitions =
> topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
> if (availablePartitions.isEmpty)
>   throw new LeaderNotAvailableException("No leader for any
> partition in topic " + topic)
> val index = Utils.abs(Random.nextInt) %
> availablePartitions.size
> val partitionId = availablePartitions(index).partitionId
> sendPartitionPerTopicCache.put(topic, partitionId)
> partitionId
> }
>   } else
> partitioner.partition(key, numPartitions)
> if(partition < 0 || partition >= numPartitions)
>   throw new UnknownTopicOrPartitionException("Invalid partition id: " +
> partition + " for topic " + topic +
> "; Valid values are in the inclusive range of [0, " +
> (numPartitions-1) + "]")
> trace("Assigning message of topic %s and key %s to a selected partition
> %d".format(topic, if (key == null) "[none]" else key.toString, partition))
> partition
>   }
>
>
> On Mon, Mar 2, 2015 at 3:58 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > Probably your keys are getting hashed to only those partitions. I don't
> > think anything is wrong here.
> > You can check how the default hashPartitioner is used in the code and try
> > to do the same for your keys before you send them and check which
> > partitions are those going to.
> >
> > The default hashpartitioner does something like this :
> >
> > hash(key) % numPartitions.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Mon, Mar 2, 2015 at 3:52 PM, Yang  wrote:
> >
> > > we have 10 partitions for a topic, and omit the explicit partition
> param
> > in
> > > the message creation:
> > >
> > > KeyedMessage data = new KeyedMessage
> > > (mytopic,   myMessageContent);   // partition key need to be polished
> > > producer.send(data);
> > >
> > >
> > >
> > > but on average 3--5 of the partitions are empty.
> > >
> > >
> > >
> > > what went wrong?
> > >
> > > thanks
> > > Yang
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>


Re: Kafka Poll: Version You Use?

2015-03-04 Thread Christian Csar
Do you have a anything on the number of voters, or audience breakdown?

Christian

On Wed, Mar 4, 2015 at 8:08 PM, Otis Gospodnetic  wrote:

> Hello hello,
>
> Results of the poll are here!
> Any guesses before looking?
> What % of Kafka users are on 0.8.2.x already?
> What % of people are still on 0.7.x?
>
>
> http://blog.sematext.com/2015/03/04/poll-results-kafka-version-distribution/
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Thu, Feb 26, 2015 at 3:32 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
> > Hi,
> >
> > With 0.8.2 out I thought it might be useful for everyone to see which
> > version(s) of Kafka people are using.
> >
> > Here's a quick poll:
> > http://blog.sematext.com/2015/02/23/kafka-poll-version-you-use/
> >
> > We'll publish the results next week.
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
>


Re: Kafka behind AWS ELB

2015-05-04 Thread Christian Csar
Dillian,

On Mon, May 4, 2015 at 1:52 PM, Dillian Murphey 
wrote:
>
> I'm interested in this topic as well.  If you put kafka brokers inside an
> autoscaling group, then AWS will automatically add brokers if demand
> increases, and the ELB will automatically round-robin across all of your
> kafka instances.  So in your config files and code, you only need to
> provide a single DNS name (the load balancer). You don't need to specify
> all your kafka brokers inside your config file.  If a broker dies, the ELB
> will only route to healthy nodes.
>
> So you get a lot of robustness, scalability, and fault-tolerance by using
> the AWS services. Kafka Brokers will automatically load balance, but the
> question is whether it is ok to put all your brokers behind an ELB and
> expect the system to work properly.
>
You should not expect it to work properly. Broker nodes are data bearing
which means that any operation to scale down would need to be aware of the
data distribution. The client connects to specific nodes to send them data
so even the Level 4 load balancing wouldn't work.

> What alternatives are there to dynamic/scalable broker clusters?  I don't
> want to have to modify my config files or code if I add more brokers, and
I
> want to be able to handle a broker going down. So these are the reasons
AWS
> questions like this come up.
>

The clients already give you options for specifying only a subset of
brokers
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
one of which must be alive to discover the rest of the cluster. The main
clients handle node failures (you'll still have some operational work).
Kafka and other data storage systems do not work the same as an HTTP driven
web application. While it can certainly be scaled, and automation could be
done to do so in response to load it's going to be more complicated. AWS's
off the shelf solution/low operations offering for some (definitely not
all) of Kafka's use cases is Kinesis, Azure's is EventHubs. Before using
Kakfa or any system in production you'll want to be sure you understand the
operational aspects of it.

Christian

>
> Thanks for any comments too. :)
>
>
>
>
> On Mon, May 4, 2015 at 9:03 AM, Mayuresh Gharat <
gharatmayures...@gmail.com>
> wrote:
>
> > Ok. You can deploy kafka in AWS. You can have brokers on AWS servers.
> > Kafka is not a push system. So you will need someone writing to kafka
and
> > consuming from kafka. It will work. My suggestion will be to try it out
on
> > a smaller instance in AWS and see the effects.
> >
> > As I do not know the actual use case about why you want to use kafka
for, I
> > cannot comment on whether it will work for you personalized use case.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Mon, May 4, 2015 at 8:55 AM, Chandrashekhar Kotekar <
> > shekhar.kote...@gmail.com> wrote:
> >
> > > I am sorry but I cannot reveal those details due to confidentiality
> > issues.
> > > I hope you understand.
> > >
> > >
> > > Regards,
> > > Chandrash3khar Kotekar
> > > Mobile - +91 8600011455
> > >
> > > On Mon, May 4, 2015 at 9:18 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com>
> > > wrote:
> > >
> > > > Hi Chandrashekar,
> > > >
> > > > Can you please elaborate the use case for Kafka here, like how you
are
> > > > planning to use it.
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Sat, May 2, 2015 at 9:08 PM, Chandrashekhar Kotekar <
> > > > shekhar.kote...@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I am new to Apache Kafka. I have played with it on my laptop.
> > > > >
> > > > > I want to use Kafka in AWS. Currently we have tomcat web servers
> > based
> > > > REST
> > > > > API. We want to replace REST API with Apache Kafka, web servers
are
> > > > behind
> > > > > ELB.
> > > > >
> > > > > I would like to know if we can keep Kafka brokers behind ELB?
Will it
> > > > work?
> > > > >
> > > > > Regards,
> > > > > Chandrash3khar Kotekar
> > > > > Mobile - +91 8600011455
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -Regards,
> > > > Mayuresh R. Gharat
> > > > (862) 250-7125
> > > >
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >


Re: Kafka security

2017-04-11 Thread Christian Csar
Don't hard code it. Martin's suggestion allows it to be read from a
configuration file or injected from another source such as an environment
variable at runtime.

If you neither of these are acceptable for corporate policy I suggest
asking how it has been handled before at your company.

Christian


On Apr 11, 2017 11:10, "IT Consultant" <0binarybudd...@gmail.com> wrote:

Thanks for your response .

We aren't allowed to hard code  password in any of our program

On Apr 11, 2017 23:39, "Mar Ian"  wrote:

> Since is a java property you could set the property (keystore password)
> programmatically,
>
> before you connect to kafka (ie, before creating a consumer or producer)
>
> System.setProperty("zookeeper.ssl.keyStore.password", password);
>
> martin
>
> 
> From: IT Consultant <0binarybudd...@gmail.com>
> Sent: April 11, 2017 2:01 PM
> To: users@kafka.apache.org
> Subject: Kafka security
>
> Hi All
>
> How can I avoid using password for keystore creation ?
>
> Our corporate policies doesn'tallow us to hardcore password. We are
> currently passing keystore password while accessing TLS enabled Kafka
> instance .
>
> I would like to use either passwordless keystore or avoid password for
> cleint accessing Kafka .
>
>
> Please help
>


How to reduce time for consumer to take over?

2017-10-09 Thread Christian Schneider
I have the following case:

1. I have 1 consumer that starts processing a message, then crashes
(Java-VM shuts down)
2. I start a second consumer that should process the message instead.

It seems that it takes about 60 seconds for the second consumer to take
over the processing.
Can this be speeded up? I use this in a test and would like to make that
test faster.

Christian

-- 
-- 
Christian Schneider
http://www.liquid-reality.de
<https://owa.talend.com/owa/redir.aspx?C=3aa4083e0c744ae1ba52bd062c5a7e46&URL=http%3a%2f%2fwww.liquid-reality.de>

Computer Scientist
http://www.adobe.com


Kafa server in cloud very slow to reply

2017-11-26 Thread Christian Schneider
I have deployed a kafka server to the azure cloud using the spotify docker
image.

When I use the docker image locally it works fine.
When I access it from another cloud service and send messages to it then it
seems each message takes very long (more than a minute).

Any idea what might be wrong? From the long delay it looks a bit like a
reverse DNS issue but I dont know if these can happen with kafka or what to
configure to avoid the issue.

Christian

-- 
-- 
Christian Schneider
http://www.liquid-reality.de
<https://owa.talend.com/owa/redir.aspx?C=3aa4083e0c744ae1ba52bd062c5a7e46&URL=http%3a%2f%2fwww.liquid-reality.de>

Computer Scientist
http://www.adobe.com


Re: "Topic Created" event

2016-02-12 Thread Christian Posta
Wonder if you can listen to the zkPath for topics via a zk watch (
https://zookeeper.apache.org/doc/r3.3.3/api/org/apache/zookeeper/Watcher.html)
to let you know when the structure of the tree changes (ie, add/remove)?

The zkPath for topics is "/brokers/topics"

https://github.com/christian-posta/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L59-59

HTH!

On Fri, Feb 12, 2016 at 9:07 AM, Karan Gupta  wrote:

> Hello,
>
> I am learning how to use kafka as my personal project.
> I was wondering if there is a "topic created" event that kafka or zookeeper
> sends such that a simple scala/java program can listen to. I was not able
> to find any documentation on that.
>
> Thanks for any advice :)
> --
>
> Best regards
> Karan
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: 0.9.0.1 RC1

2016-02-17 Thread Christian Posta
BTW, what's the etiquette for votes (non-binding) for this community?
welcomed? noise?
happy to see the non-binding votes, I'd like to contribute, just don't want
to pollute the vote call. thoughts?
thanks!

On Tue, Feb 16, 2016 at 10:56 PM, Jun Rao  wrote:

> Thanks everyone for voting. The results are:
>
> +1 binding = 4 votes (Ewen Cheslack-Postava, Neha Narkhede, Joel Koshy and
> Jun
> Rao)
> +1 non-binding = 3 votes
> -1 = 0 votes
> 0 = 0 votes
>
> The vote passes.
>
> I will release artifacts to maven central, update the dist svn and download
> site. Will send out an announce after that.
>
> Thanks,
>
> Jun
>
> On Thu, Feb 11, 2016 at 6:55 PM, Jun Rao  wrote:
>
> > This is the first candidate for release of Apache Kafka 0.9.0.1. This a
> > bug fix release that fixes 70 issues.
> >
> > Release Notes for the 0.9.0.1 release
> >
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Tuesday, Feb. 16, 7pm PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > and sha2 (SHA256) checksum.
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/
> >
> > * Maven artifacts to be voted upon prior to release:
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > * scala-doc
> > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/scaladoc/
> >
> > * java-doc
> > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/javadoc/
> >
> > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.1 tag
> >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2c17685a45efe665bf5f24c0296cb8f9e1157e89
> >
> > * Documentation
> > http://kafka.apache.org/090/documentation.html
> >
> > Thanks,
> >
> > Jun
> >
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-17 Thread Christian Posta
Yep, assuming you haven't completely partitioned that client from the
cluster, ZK should automatically try to connect/reconnect to other peers in
the server list. Otherwise, it's as Alexis said -- your session would
expire; you'd have to recreate the session once you have connectivity

On Wed, Feb 17, 2016 at 2:30 AM, Alexis Midon <
alexis.mi...@airbnb.com.invalid> wrote:

> By "re-connect", I'm assuming that the ZK session is expired, not
> disconnected.
> For details see
>
> http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
>
> In that case, the high level consumer is basically dead, and the
> application should create a new instance of it.
>
>
> On Mon, Feb 15, 2016 at 12:22 PM Joe San  wrote:
>
> > Any ideas as to which property should I set to enable Zookeeper
> > re-connection? I have the following properties defined for my consumer
> > (High Level Consumer API). Is this enough for a automatic Zookeeper
> > re-connect?
> >
> > val props = new Properties()
> > props.put("zookeeper.connect", zookeeper)
> > props.put("group.id", groupId)
> > props.put("auto.commit.enabled", "false")
> > // this timeout is needed so that we do not block on the stream!
> > props.put("consumer.timeout.ms", "1")
> > props.put("zookeeper.sync.time.ms", "200")
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-17 Thread Christian Posta
i believe reconnect is handled automatically by the client... is that what
you're asking
peek here to see how it does that and when:

https://github.com/apache/zookeeper/blob/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java#L1153

On Wed, Feb 17, 2016 at 12:14 PM, Joe San  wrote:

> It is all pretty strange. Here is what I see in my logs as soon as I
> voluntarily shutdown Zookeeper!
>
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_60]
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[na:1.8.0_60]
> at
>
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> 20160217-20:12:44.960+0100
> [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> 127.0.0.1:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening socket
> connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to
> authenticate using SASL (unknown error)
> 20160217-20:12:44.960+0100
> [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> 127.0.0.1:2181)] WARN  org.apache.zookeeper.ClientCnxn - Session
> 0x152ea19656b005c for server null, unexpected error, closing socket
> connection and attempting reconnect
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_60]
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[na:1.8.0_60]
> at
>
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> ~[zookeeper-3.4.6.jar:3.4.6-1569965]
>
> It just keep repeating trying to reconnect for ever! So I just wanted to
> know which property from my setting in my email above is responsible for
> this auto reconnect mechanism?
>
> On Wed, Feb 17, 2016 at 8:04 PM, Christian Posta <
> christian.po...@gmail.com>
> wrote:
>
> > Yep, assuming you haven't completely partitioned that client from the
> > cluster, ZK should automatically try to connect/reconnect to other peers
> in
> > the server list. Otherwise, it's as Alexis said -- your session would
> > expire; you'd have to recreate the session once you have connectivity
> >
> > On Wed, Feb 17, 2016 at 2:30 AM, Alexis Midon <
> > alexis.mi...@airbnb.com.invalid> wrote:
> >
> > > By "re-connect", I'm assuming that the ZK session is expired, not
> > > disconnected.
> > > For details see
> > >
> > >
> >
> http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
> > >
> > > In that case, the high level consumer is basically dead, and the
> > > application should create a new instance of it.
> > >
> > >
> > > On Mon, Feb 15, 2016 at 12:22 PM Joe San 
> > wrote:
> > >
> > > > Any ideas as to which property should I set to enable Zookeeper
> > > > re-connection? I have the following properties defined for my
> consumer
> > > > (High Level Consumer API). Is this enough for a automatic Zookeeper
> > > > re-connect?
> > > >
> > > > val props = new Properties()
> > > > props.put("zookeeper.connect", zookeeper)
> > > > props.put("group.id", groupId)
> > > > props.put("auto.commit.enabled", "false")
> > > > // this timeout is needed so that we do not block on the stream!
> > > > props.put("consumer.timeout.ms", "1")
> > > > props.put("zookeeper.sync.time.ms", "200")
> > > >
> > >
> >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Resetting Kafka Offsets -- and What are offsets.... exactly?

2016-02-17 Thread Christian Posta
The number is the log-ordered number of bytes. So really, the offset is
kinda like the "number of bytes" to begin reading from. 0 means read the
log from the beginning. The second message is 0 + size of message. So the
message "ids" are really just the offset of the previous message sizes.

For example, if I have three messages of 10 bytes each, and set the
consumer offset to 0, i'll read everything. If you set the offset to 10,
I'll read the second and third messages, and so on.

see more here:
http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
and here: http://kafka.apache.org/documentation.html#introduction

HTH!

On Wed, Feb 17, 2016 at 12:16 PM, John Bickerstaff  wrote:

> *Use Case: Disaster Recovery & Re-indexing SOLR*
>
> I'm using Kafka to hold messages from a service that prepares "documents"
> for SOLR.
>
> A second micro service (a consumer) requests these messages, does any final
> processing, and fires them into SOLR.
>
> The whole thing is (in part) designed to be used for disaster recovery -
> allowing the rebuild of the SOLR index in the shortest possible time.
>
> To do this (and to be able to use it for re-indexing SOLR while testing
> relevancy) I need to be able to "play all messages from the beginning" at
> will.
>
> I find I can use the zkCli.sh tool to delete the Consumer Group Name like
> this:
>  rmr /kafka/consumers/myGroupName
>
> After which my microservice will get all the messages again when it runs.
>
> I was trying to find a way to do this programmatically without actually
> using the "low level" consumer api since the high level one is so simple
> and my code already works.  So I started playing with Zookeeper api for
> duplicating "rmr /kafka/consumers/myGroupName"
>
> *The Question: What does that offset actually represent?*
>
> It was at this point that I discovered the offset must represent something
> other than what I thought it would.  Things obviously work, but I'm
> wondering what - exactly do the offsets represent?
>
> To clarify - if I run this command on a zookeeper node, after the
> microservice has run:
>  get /kafka/consumers/myGroupName/offsets/myTopicName/0
>
> I get the following:
>
> 30024
> cZxid = 0x360355
> ctime = Fri Feb 12 07:27:50 MST 2016
> mZxid = 0x360357
> mtime = Fri Feb 12 07:29:50 MST 2016
> pZxid = 0x360355
> cversion = 0
> dataVersion = 2
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 5
> numChildren = 0
>
> Now - I have exactly 3500 messages in this Kafka topic.  I verify that by
> running this command:
>  bin/kafka-console-consumer.sh --zookeeper 192.168.56.5:2181/kafka
> --topic myTopicName --from-beginning
>
> When I hit Ctrl-C, it tells me it consumed 3500 messages.
>
> So - what does that 30024 actually represent?  If I reset that number to 1
> or 0 and re-run my consumer microservice, I get all the messages again -
> and the number again goes to 30024.  However, I'm not comfortable to trust
> that because my assumption that the number represents a simple count of
> messages that have been sent to this consumer is obviously wrong.
>
> (I reset the number like this -- to 1 -- and assume there's an API command
> that will do it too.)
>  set /kafka/consumers/myGroupName/offsets/myTopicName/0 1
>
> Can someone help me clarify or point me at a doc that explains what is
> getting counted here?  You can shoot me if you like for attempting the
> hack-ish solution of re-setting the offset through the Zookeeper API, but I
> would still like to understand what, exactly, is represented by that number
> 30024.
>
> I need to hand off to IT for the Disaster Recovery portion and saying
> "trust me, it just works" isn't going to fly very far...
>
> Thanks.
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-17 Thread Christian Posta
I believe so. Happy to be corrected.

On Wed, Feb 17, 2016 at 12:31 PM, Joe San  wrote:

> So if I use the High Level Consumer API, using the ConsumerConnector, I get
> this automatic zookeeper connection for free?
>
> On Wed, Feb 17, 2016 at 8:25 PM, Christian Posta <
> christian.po...@gmail.com>
> wrote:
>
> > i believe reconnect is handled automatically by the client... is that
> what
> > you're asking
> > peek here to see how it does that and when:
> >
> >
> >
> https://github.com/apache/zookeeper/blob/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java#L1153
> >
> > On Wed, Feb 17, 2016 at 12:14 PM, Joe San 
> wrote:
> >
> > > It is all pretty strange. Here is what I see in my logs as soon as I
> > > voluntarily shutdown Zookeeper!
> > >
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > ~[na:1.8.0_60]
> > > at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > > ~[na:1.8.0_60]
> > > at
> > >
> > >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > > 20160217-20:12:44.960+0100
> > > [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> > > 127.0.0.1:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening
> socket
> > > connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to
> > > authenticate using SASL (unknown error)
> > > 20160217-20:12:44.960+0100
> > > [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> > > 127.0.0.1:2181)] WARN  org.apache.zookeeper.ClientCnxn - Session
> > > 0x152ea19656b005c for server null, unexpected error, closing socket
> > > connection and attempting reconnect
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > ~[na:1.8.0_60]
> > > at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > > ~[na:1.8.0_60]
> > > at
> > >
> > >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > >
> > > It just keep repeating trying to reconnect for ever! So I just wanted
> to
> > > know which property from my setting in my email above is responsible
> for
> > > this auto reconnect mechanism?
> > >
> > > On Wed, Feb 17, 2016 at 8:04 PM, Christian Posta <
> > > christian.po...@gmail.com>
> > > wrote:
> > >
> > > > Yep, assuming you haven't completely partitioned that client from the
> > > > cluster, ZK should automatically try to connect/reconnect to other
> > peers
> > > in
> > > > the server list. Otherwise, it's as Alexis said -- your session would
> > > > expire; you'd have to recreate the session once you have connectivity
> > > >
> > > > On Wed, Feb 17, 2016 at 2:30 AM, Alexis Midon <
> > > > alexis.mi...@airbnb.com.invalid> wrote:
> > > >
> > > > > By "re-connect", I'm assuming that the ZK session is expired, not
> > > > > disconnected.
> > > > > For details see
> > > > >
> > > > >
> > > >
> > >
> >
> http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
> > > > >
> > > > > In that case, the high level consumer is basically dead, and the
> > > > > application should create a new instance of it.
> > > > >
> > > > >
> > > > > On Mon, Feb 15, 2016 at 12:22 PM Joe San 
> > > > wrote:
> > > > >
> > > > > > Any ideas as to which property should I set to enable Zookeeper
> > > > > > re-connection? I have the following properties defined for my
> > > consumer
> > > > > > (High Level Consumer API). Is this enough for a automatic
> Zookeeper
> > > > > > re-connect?
> > > > > >
> > > > > > val props = new Properties()
> > > > > > props.put("zookeeper.connect", zookeeper)
> > > > > > props.put("group.id", groupId)
> > > > > > props.put("auto.commit.enabled", "false")
> > > > > > // this timeout is needed so that we do not block on the stream!
> > > > > > props.put("consumer.timeout.ms", "1")
> > > > > > props.put("zookeeper.sync.time.ms", "200")
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > *Christian Posta*
> > > > twitter: @christianposta
> > > > http://www.christianposta.com/blog
> > > > http://fabric8.io
> > > >
> > >
> >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: 0.9.0.1 RC1

2016-02-17 Thread Christian Posta
Awesome, glad to hear. Thanks Jun!

On Wed, Feb 17, 2016 at 12:57 PM, Jun Rao  wrote:

> Christian,
>
> Similar to other Apache projects, a vote from a committer is considered
> binding. During the voting process, we encourage non-committers to vote as
> well. We will cancel the release even if a critical issue is reported from
> a non-committer.
>
> Thanks,
>
> Jun
>
> On Tue, Feb 16, 2016 at 11:05 PM, Christian Posta <
> christian.po...@gmail.com
> > wrote:
>
> > BTW, what's the etiquette for votes (non-binding) for this community?
> > welcomed? noise?
> > happy to see the non-binding votes, I'd like to contribute, just don't
> want
> > to pollute the vote call. thoughts?
> > thanks!
> >
> > On Tue, Feb 16, 2016 at 10:56 PM, Jun Rao  wrote:
> >
> > > Thanks everyone for voting. The results are:
> > >
> > > +1 binding = 4 votes (Ewen Cheslack-Postava, Neha Narkhede, Joel Koshy
> > and
> > > Jun
> > > Rao)
> > > +1 non-binding = 3 votes
> > > -1 = 0 votes
> > > 0 = 0 votes
> > >
> > > The vote passes.
> > >
> > > I will release artifacts to maven central, update the dist svn and
> > download
> > > site. Will send out an announce after that.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Feb 11, 2016 at 6:55 PM, Jun Rao  wrote:
> > >
> > > > This is the first candidate for release of Apache Kafka 0.9.0.1.
> This a
> > > > bug fix release that fixes 70 issues.
> > > >
> > > > Release Notes for the 0.9.0.1 release
> > > >
> > >
> >
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/RELEASE_NOTES.html
> > > >
> > > > *** Please download, test and vote by Tuesday, Feb. 16, 7pm PT
> > > >
> > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > > > and sha2 (SHA256) checksum.
> > > >
> > > > * Release artifacts to be voted upon (source and binary):
> > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/
> > > >
> > > > * Maven artifacts to be voted upon prior to release:
> > > >
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > >
> > > > * scala-doc
> > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/scaladoc/
> > > >
> > > > * java-doc
> > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/javadoc/
> > > >
> > > > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.1 tag
> > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2c17685a45efe665bf5f24c0296cb8f9e1157e89
> > > >
> > > > * Documentation
> > > > http://kafka.apache.org/090/documentation.html
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: 0.9.0.1 RC1

2016-02-17 Thread Christian Posta
yep! http://www.apache.org/dev/release-publishing.html#voted

On Wed, Feb 17, 2016 at 1:05 PM, Gwen Shapira  wrote:

> Actually, for releases, committers are non-binding. PMC votes are the only
> binding ones for releases.
>
> On Wed, Feb 17, 2016 at 11:57 AM, Jun Rao  wrote:
>
> > Christian,
> >
> > Similar to other Apache projects, a vote from a committer is considered
> > binding. During the voting process, we encourage non-committers to vote
> as
> > well. We will cancel the release even if a critical issue is reported
> from
> > a non-committer.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Feb 16, 2016 at 11:05 PM, Christian Posta <
> > christian.po...@gmail.com
> > > wrote:
> >
> > > BTW, what's the etiquette for votes (non-binding) for this community?
> > > welcomed? noise?
> > > happy to see the non-binding votes, I'd like to contribute, just don't
> > want
> > > to pollute the vote call. thoughts?
> > > thanks!
> > >
> > > On Tue, Feb 16, 2016 at 10:56 PM, Jun Rao  wrote:
> > >
> > > > Thanks everyone for voting. The results are:
> > > >
> > > > +1 binding = 4 votes (Ewen Cheslack-Postava, Neha Narkhede, Joel
> Koshy
> > > and
> > > > Jun
> > > > Rao)
> > > > +1 non-binding = 3 votes
> > > > -1 = 0 votes
> > > > 0 = 0 votes
> > > >
> > > > The vote passes.
> > > >
> > > > I will release artifacts to maven central, update the dist svn and
> > > download
> > > > site. Will send out an announce after that.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Feb 11, 2016 at 6:55 PM, Jun Rao  wrote:
> > > >
> > > > > This is the first candidate for release of Apache Kafka 0.9.0.1.
> > This a
> > > > > bug fix release that fixes 70 issues.
> > > > >
> > > > > Release Notes for the 0.9.0.1 release
> > > > >
> > > >
> > >
> >
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/RELEASE_NOTES.html
> > > > >
> > > > > *** Please download, test and vote by Tuesday, Feb. 16, 7pm PT
> > > > >
> > > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > > > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > > > > and sha2 (SHA256) checksum.
> > > > >
> > > > > * Release artifacts to be voted upon (source and binary):
> > > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/
> > > > >
> > > > > * Maven artifacts to be voted upon prior to release:
> > > > >
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > > >
> > > > > * scala-doc
> > > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/scaladoc/
> > > > >
> > > > > * java-doc
> > > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/javadoc/
> > > > >
> > > > > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.1
> tag
> > > > >
> > > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2c17685a45efe665bf5f24c0296cb8f9e1157e89
> > > > >
> > > > > * Documentation
> > > > > http://kafka.apache.org/090/documentation.html
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > *Christian Posta*
> > > twitter: @christianposta
> > > http://www.christianposta.com/blog
> > > http://fabric8.io
> > >
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Wiki Karma

2016-02-18 Thread Christian Posta
Can someone add Karma to my user id for contributing to the wiki/docs?
userid is 'ceposta'

thanks!

-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Memory leak in new client API (Java) in 0.8.2.1?

2016-02-28 Thread Christian Posta
Do you eventually run OOM?

On Sunday, February 28, 2016, Asaf Mesika  wrote:

> Hi,
>
> I'm seeing slow off-heap memory leak in production.
> I've managed to recreated the scenario in a testing environment - I have
> one ZooKeeper node and one Kafka Broker (running on same machine). I have
> one java process which runs a thread which constantly writes to Kafka using
> 16 KafkaProducer.
> I see that the resident memory of my java process is slowly decreasing
> (roughly 250mb every 15 min). When running GC it cleans the heap to 730mb
> roughly but resident remains high: 2.3gb and increasing steadily. So from
> that I presume it's a leak of off-heap memory.
>
> I'm little puzzled here since I haven't seen any usage of
> ByteBuffer.allocateDirect() in the new client code, but I though I'll check
> with the audience in case you are aware of any issue that might cause it?
> I'm chasing this leak for several days, and managed to track it down to the
> code writing to Kafka, so I'm a little desperate  :) any help will do.
>
> Thanks!
>
> Asaf
>


-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: monitor rebalanace events

2016-03-01 Thread Christian Posta
This for the new consumer? or old high-level consumer?

For the old high-level consumer, rebalance happens on the consumer side for
the following events:

* Opening new streams
* ZK Session expiration
* When partition information changes or consumer information changes

I don't think there is a place with in the broker that broadcasts when a
rebalance happens. There are listeners in the Consumer you can set to get
that information, but as you pointed out in your question, this happens on
the consumer side. I guess you could set watches on the following paths and
expect a rebalance when those watches get triggered, but you'd miss out on
ZK session expirations etc.

* a ZK watch for /consumers//ids
* a ZK data watch for /brokers/topics/




On Tue, Mar 1, 2016 at 6:37 AM, craig w  wrote:

> Is there some way to monitor when a rebalance occurs other than at the
> consumer level?
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


migrating the main-page docs to gitbook format

2016-03-02 Thread Christian Posta
Would love to have the docs in gitbook/markdown format so they can easily
be viewed from the source repo (or mirror, technically) on github.com. They
can also be easily converted to HTML, have a side-navigation ToC, and still
be versioned along with the src code.

Thoughts?

-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: migrating the main-page docs to gitbook format

2016-03-02 Thread Christian Posta
For sure! Will take a look!

On Wednesday, March 2, 2016, Gwen Shapira  wrote:

> Hey!
>
> Yes! We'd love that too! Maybe you want to help us out with
> https://issues.apache.org/jira/browse/KAFKA-2967 ?
>
> Gwen
>
> On Wed, Mar 2, 2016 at 2:39 PM, Christian Posta
> > wrote:
> > Would love to have the docs in gitbook/markdown format so they can easily
> > be viewed from the source repo (or mirror, technically) on github.com.
> They
> > can also be easily converted to HTML, have a side-navigation ToC, and
> still
> > be versioned along with the src code.
> >
> > Thoughts?
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
>


-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Kafka over Satellite links

2016-03-02 Thread Christian Csar
I would not do that. I admit I may be a bit biased due to working for
Buddy Platform (IoT backend stuff including telemetry collection), but
you want to send the data via some protocol (HTTP? MQTT? COAP?) to the
central hub and then have those servers put the data into Kafka. Now
if you want to use Kafka there are the various HTTP front ends that
will basically put the data into Kafka for you without the client
needing to deal with the partition management part. But putting data
into Kafka directly really seems like a bad idea even if it's a large
number of messages per second per node, even if the security parts
work out for you.

Christian

On Wed, Mar 2, 2016 at 9:52 PM, Jan  wrote:
> Hi folks;
> does anyone know of Kafka's ability to work over Satellite links. We have a 
> IoT Telemetry application that uses Satellite communication to send data from 
> remote sites to a Central hub.
> Any help/ input/ links/ gotchas would be much appreciated.
> Regards,Jan


Re: migrating the main-page docs to gitbook format

2016-03-07 Thread Christian Posta
All,

Here's a first spike.. would love feedback.. happy to continue the convo
here (https://issues.apache.org/jira/browse/KAFKA-2967)

http://christianposta.com/kafka-docs/_book/index.html

have the gitbook build integrated with the kafka gradle builds and rest of
WIP here:

https://github.com/christian-posta/kafka/tree/ceposta-doco

On Thu, Mar 3, 2016 at 6:28 AM, Marcos Luis Ortiz Valmaseda <
marcosluis2...@gmail.com> wrote:

> I love that too
> +1
>
> 2016-03-02 21:15 GMT-05:00 Christian Posta :
>
> > For sure! Will take a look!
> >
> > On Wednesday, March 2, 2016, Gwen Shapira  wrote:
> >
> > > Hey!
> > >
> > > Yes! We'd love that too! Maybe you want to help us out with
> > > https://issues.apache.org/jira/browse/KAFKA-2967 ?
> > >
> > > Gwen
> > >
> > > On Wed, Mar 2, 2016 at 2:39 PM, Christian Posta
> > > > wrote:
> > > > Would love to have the docs in gitbook/markdown format so they can
> > easily
> > > > be viewed from the source repo (or mirror, technically) on
> github.com.
> > > They
> > > > can also be easily converted to HTML, have a side-navigation ToC, and
> > > still
> > > > be versioned along with the src code.
> > > >
> > > > Thoughts?
> > > >
> > > > --
> > > > *Christian Posta*
> > > > twitter: @christianposta
> > > > http://www.christianposta.com/blog
> > > > http://fabric8.io
> > >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Retry Message Consumption On Database Failure

2016-03-09 Thread Christian Posta
So can you have to decide how long you're willing to "wait" for the mongo
db to come back, and what you'd like to do with that message. So for
example, do you just retry inserting to Mongo for a predefined period of
time? Do you try forever? If you try forever, are you okay with the
consumer threads blocking indefinitely? Or maybe you implement a "circuit
breaker" to shed load to mongo? Or are you willing to stash the message
into a DLQ and move on and try the next message?

You don't need to "re-consume" the message do you? Can you just retry
and/or backoff-retry with the message you have? And just do the "commit" of
the offset if successfully?



On Wed, Mar 9, 2016 at 2:00 PM, Michael Freeman 
wrote:

> Hey,
>My team is new to Kafka and we are using the examples found at.
>
>
> http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
>
> We process messages from kafka and persist them to Mongo.
> If Mongo is unavailable we are wondering how we can re-consume the messages
> while we wait for Mongo to come back up.
>
> Right now we commit after the messages for each partition are processed
> (Following the example).
> I have tried a few approaches.
>
> 1. Catch the application exception and skip the kafka commit. However the
> next poll does not re consume the messages.
> 2. Allow the consumer to fail and restart the consumer. This works but
> causes a rebalance.
>
> Should I attempt to store the offset and parition (in memory) instead and
> attempt to reseek in order to re consume the messages?
>
> Whats the best practice approach in this kind of situation? My priority is
> to never loose a message and to ensure it makes it to Mongo. (Redelivery is
> ok)
>
> Thanks for any help or pointers in the right direction.
>
> Michael
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Connect bug in 0.9.0.1 client

2016-03-09 Thread Christian Posta
Open a JIRA here: https://issues.apache.org/jira/browse/KAFKA
and open a github.com pull request here: https://github.com/apache/kafka

May wish to peak at this too:
https://github.com/apache/kafka/blob/trunk/CONTRIBUTING.md

I think you need an apache ICLA too https://www.apache.org/licenses/icla.txt

HTH

On Wed, Mar 9, 2016 at 3:35 PM, Larkin Lowrey  wrote:

> There is a bug in the 0.9.0.1 client which causes consumers to get stuck
> waiting for a connection to be ready to complete.
>
> The root cause is in the connect(...) method of
>
> clients/src/main/java/org/apache/kafka/common/network/Selector.java
>
> Here's the trouble item:
>
> try {
> socketChannel.connect(address);
> } catch (UnresolvedAddressException e) {
>
> The assumption is that socketChannel.connect(address) always returns false
> when in non-blocking mode. A good assumption... but, sadly, wrong.
>
> When spinning up several dozen consumers at the same time we see a small
> number (one or two) where socketChannel.connect(...) returns true. When
> that happens the connection is valid and SelectionKey.OP_CONNECT will never
> be triggered. The poll(long timeout) method in the same class will wait for
> the channel to become ready with key.isConnectable() but that will never
> happen since the channel is already fully connected before the select is
> called.
>
> I implemented a sloppy fix which was able to demonstrate that addressing
> this case solves my stuck consumer problem.
>
> How do I submit a bug report for this issue, or does this email constitute
> a bug report?
>
> --Larkin
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Retry Message Consumption On Database Failure

2016-03-10 Thread Christian Posta
Yah that's a good point. That was brought up in another thread.

The granularity of what poll() needs to be addressed. It tries to do too
many things at once, including heartbeating. Not so sure that's entirely
necessary.

On Thu, Mar 10, 2016 at 1:40 AM, Michael Freeman 
wrote:

> Thanks Christian,
>We would want to retry indefinitely. Or at
> least for say x minutes. If we don't poll how do we keep the heart beat
> alive to Kafka. We never want to loose this message and only want to commit
> to Kafka when the message is in Mongo. That's either as a successful
> message in a collection or an unsuccessful message in an error collection.
>
> Right now I let the consumer die and don't create a new one for x minutes.
> This causes a lot of rebalancing.
>
> Michael
>
> > On 9 Mar 2016, at 21:12, Christian Posta 
> wrote:
> >
> > So can you have to decide how long you're willing to "wait" for the mongo
> > db to come back, and what you'd like to do with that message. So for
> > example, do you just retry inserting to Mongo for a predefined period of
> > time? Do you try forever? If you try forever, are you okay with the
> > consumer threads blocking indefinitely? Or maybe you implement a "circuit
> > breaker" to shed load to mongo? Or are you willing to stash the message
> > into a DLQ and move on and try the next message?
> >
> > You don't need to "re-consume" the message do you? Can you just retry
> > and/or backoff-retry with the message you have? And just do the "commit"
> of
> > the offset if successfully?
> >
> >
> >
> > On Wed, Mar 9, 2016 at 2:00 PM, Michael Freeman 
> > wrote:
> >
> >> Hey,
> >>   My team is new to Kafka and we are using the examples found at.
> >>
> >>
> >>
> http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
> >>
> >> We process messages from kafka and persist them to Mongo.
> >> If Mongo is unavailable we are wondering how we can re-consume the
> messages
> >> while we wait for Mongo to come back up.
> >>
> >> Right now we commit after the messages for each partition are processed
> >> (Following the example).
> >> I have tried a few approaches.
> >>
> >> 1. Catch the application exception and skip the kafka commit. However
> the
> >> next poll does not re consume the messages.
> >> 2. Allow the consumer to fail and restart the consumer. This works but
> >> causes a rebalance.
> >>
> >> Should I attempt to store the offset and parition (in memory) instead
> and
> >> attempt to reseek in order to re consume the messages?
> >>
> >> Whats the best practice approach in this kind of situation? My priority
> is
> >> to never loose a message and to ensure it makes it to Mongo.
> (Redelivery is
> >> ok)
> >>
> >> Thanks for any help or pointers in the right direction.
> >>
> >> Michael
> >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: New to Kafka

2016-03-11 Thread Christian Posta
What exactly is outdated? Are you not able to grok Kafka with the current
presentations?

On Fri, Mar 11, 2016 at 1:01 AM, prabhu v  wrote:

> Hi,
>
> Can anyone please help me with the video presentations from Kafka experts?
>
> Seems the link provided in Kafka home page
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+papers+and+presentations
> is
> outdated..
>
> Thanks in advance..
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Retry Message Consumption On Database Failure

2016-03-14 Thread Christian Posta
Jason,

Can you link to the proposal so I can take a look? Would the "sticky"
proposal prefer to keep partitions assigned to consumers who currently have
them and have not failed?

On Mon, Mar 14, 2016 at 10:16 AM, Jason Gustafson 
wrote:

> Hey Michael,
>
> I don't think a policy of retrying indefinitely is generally possible with
> the new consumer even if you had a heartbeat API. The problem is that the
> consumer itself doesn't control when the group needs to rebalance. If
> another consumer joins or leaves the group, then all consumers will need to
> rebalance, regardless whether they are in the middle of message processing
> or not. Once the rebalance completes, the consumer may or may not get
> assigned the same partition that the message came from. That said, if a
> rebalance is unlikely because the group is stable, then you could use the
> pause() API to move the message processing to a background thread. What
> this would look like is basically this:
>
> 1. Receive message from poll() from partition 0.
> 2. Pause partition 0 using pause().
> 3. Send the message to a background thread for processing and continue
> calling poll().
> 4. When the processing finishes, resume() the partition.
> 5. If the group rebalances before processing finishes, there are two cases:
>   a) if partition 0 is reassigned, pause() it again in the
> onPartitionsAssigned() callback (and you may also want to verify that the
> last committed offset is still what you expect)
>   b) otherwise, abort the background processing thread.
>
> Would that work for your case? It's also worth mentioning that there's a
> proposal to add a sticky partition assignor to Kafka, which would make 5.b
> less likely.
>
> -Jason
>
>
>
> On Fri, Mar 11, 2016 at 1:03 AM, Michael Freeman 
> wrote:
>
> > Thanks Christian,
> >   Sending a heartbeat without having to poll
> > would also be useful when using a large max.partition.fetch.bytes.
> >
> > For now I'm just going to shut the consumer down and restart after x
> > period of time.
> >
> > Thanks for your insights.
> >
> > Michael
> >
> > > On 10 Mar 2016, at 18:33, Christian Posta 
> > wrote:
> > >
> > > Yah that's a good point. That was brought up in another thread.
> > >
> > > The granularity of what poll() needs to be addressed. It tries to do
> too
> > > many things at once, including heartbeating. Not so sure that's
> entirely
> > > necessary.
> > >
> > > On Thu, Mar 10, 2016 at 1:40 AM, Michael Freeman  >
> > > wrote:
> > >
> > >> Thanks Christian,
> > >>   We would want to retry indefinitely. Or
> at
> > >> least for say x minutes. If we don't poll how do we keep the heart
> beat
> > >> alive to Kafka. We never want to loose this message and only want to
> > commit
> > >> to Kafka when the message is in Mongo. That's either as a successful
> > >> message in a collection or an unsuccessful message in an error
> > collection.
> > >>
> > >> Right now I let the consumer die and don't create a new one for x
> > minutes.
> > >> This causes a lot of rebalancing.
> > >>
> > >> Michael
> > >>
> > >>>> On 9 Mar 2016, at 21:12, Christian Posta  >
> > >>> wrote:
> > >>>
> > >>> So can you have to decide how long you're willing to "wait" for the
> > mongo
> > >>> db to come back, and what you'd like to do with that message. So for
> > >>> example, do you just retry inserting to Mongo for a predefined period
> > of
> > >>> time? Do you try forever? If you try forever, are you okay with the
> > >>> consumer threads blocking indefinitely? Or maybe you implement a
> > "circuit
> > >>> breaker" to shed load to mongo? Or are you willing to stash the
> message
> > >>> into a DLQ and move on and try the next message?
> > >>>
> > >>> You don't need to "re-consume" the message do you? Can you just retry
> > >>> and/or backoff-retry with the message you have? And just do the
> > "commit"
> > >> of
> > >>> the offset if successfully?
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Mar 9, 2016 at 2:00 PM, Michael Freeman <
> mikfree...@gmail.com>
> > >>> wrote:
> > >&

Re: Retry Message Consumption On Database Failure

2016-03-14 Thread Christian Posta
Awesome, thanks! I'll take a look!


On Mon, Mar 14, 2016 at 11:27 AM, Jason Gustafson 
wrote:

> Yeah, that's the idea. Here's the JIRA I was thinking of:
> https://issues.apache.org/jira/browse/KAFKA-2273. I'm guessing this will
> need a KIP after 0.10 is out.
>
> -Jason
>
> On Mon, Mar 14, 2016 at 11:21 AM, Christian Posta <
> christian.po...@gmail.com
> > wrote:
>
> > Jason,
> >
> > Can you link to the proposal so I can take a look? Would the "sticky"
> > proposal prefer to keep partitions assigned to consumers who currently
> have
> > them and have not failed?
> >
> > On Mon, Mar 14, 2016 at 10:16 AM, Jason Gustafson 
> > wrote:
> >
> > > Hey Michael,
> > >
> > > I don't think a policy of retrying indefinitely is generally possible
> > with
> > > the new consumer even if you had a heartbeat API. The problem is that
> the
> > > consumer itself doesn't control when the group needs to rebalance. If
> > > another consumer joins or leaves the group, then all consumers will
> need
> > to
> > > rebalance, regardless whether they are in the middle of message
> > processing
> > > or not. Once the rebalance completes, the consumer may or may not get
> > > assigned the same partition that the message came from. That said, if a
> > > rebalance is unlikely because the group is stable, then you could use
> the
> > > pause() API to move the message processing to a background thread. What
> > > this would look like is basically this:
> > >
> > > 1. Receive message from poll() from partition 0.
> > > 2. Pause partition 0 using pause().
> > > 3. Send the message to a background thread for processing and continue
> > > calling poll().
> > > 4. When the processing finishes, resume() the partition.
> > > 5. If the group rebalances before processing finishes, there are two
> > cases:
> > >   a) if partition 0 is reassigned, pause() it again in the
> > > onPartitionsAssigned() callback (and you may also want to verify that
> the
> > > last committed offset is still what you expect)
> > >   b) otherwise, abort the background processing thread.
> > >
> > > Would that work for your case? It's also worth mentioning that there's
> a
> > > proposal to add a sticky partition assignor to Kafka, which would make
> > 5.b
> > > less likely.
> > >
> > > -Jason
> > >
> > >
> > >
> > > On Fri, Mar 11, 2016 at 1:03 AM, Michael Freeman  >
> > > wrote:
> > >
> > > > Thanks Christian,
> > > >   Sending a heartbeat without having to
> > poll
> > > > would also be useful when using a large max.partition.fetch.bytes.
> > > >
> > > > For now I'm just going to shut the consumer down and restart after x
> > > > period of time.
> > > >
> > > > Thanks for your insights.
> > > >
> > > > Michael
> > > >
> > > > > On 10 Mar 2016, at 18:33, Christian Posta <
> christian.po...@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > > Yah that's a good point. That was brought up in another thread.
> > > > >
> > > > > The granularity of what poll() needs to be addressed. It tries to
> do
> > > too
> > > > > many things at once, including heartbeating. Not so sure that's
> > > entirely
> > > > > necessary.
> > > > >
> > > > > On Thu, Mar 10, 2016 at 1:40 AM, Michael Freeman <
> > mikfree...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > >> Thanks Christian,
> > > > >>   We would want to retry indefinitely.
> > Or
> > > at
> > > > >> least for say x minutes. If we don't poll how do we keep the heart
> > > beat
> > > > >> alive to Kafka. We never want to loose this message and only want
> to
> > > > commit
> > > > >> to Kafka when the message is in Mongo. That's either as a
> successful
> > > > >> message in a collection or an unsuccessful message in an error
> > > > collection.
> > > > >>
> > > > >> Right now I let the consumer die and don't create a new one for x
> > > > minutes.
> > > > &

Re: Subscribe request

2016-03-15 Thread Christian Posta
Your best bet is to send a mail to subscr...@kafka.apache.org as outlined
in the mailing list page http://kafka.apache.org/contact.html

On Tue, Mar 15, 2016 at 11:48 AM, Punyavathi Ambur Krishnamurthy <
akpunyava...@athenahealth.com> wrote:

> Hi,
>
> I would like to subscribe to this list to learn and implement Kafka.
>
> Thanks,
> Punya
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Not seeing disk writes on all replica brokers

2016-03-19 Thread Christian Posta
what exactly are you monitoring in terms of the "disk writes"? kafka writes
to disk but does not fsync. so you may or may not see activity at the disk
when writing; it'll be up to the OS to write back to the disk.

On Wed, Mar 16, 2016 at 5:17 PM, Kiran Nagaraja 
wrote:

> Hi,
>
> I'm running version kafka_2.11-0.9.0.1 in a 2 broker configuration with 10
> partitions that are replicated - replication.factor=2
>
> Here's the output from 'kafkat partitions':
>
> TopicPartitionLeaderReplicasISRs
>
> bookmarks00[0, 1][0, 1]
>
> bookmarks11[1, 0][1, 0]
>
> bookmarks20[0, 1][0, 1]
>
> bookmarks31[1, 0][1, 0]
>
> bookmarks40[0, 1][0, 1]
>
> bookmarks51[1, 0][1, 0]
>
> bookmarks60[0, 1][0, 1]
>
> bookmarks71[1, 0][1, 0]
>
> bookmarks80[0, 1][0, 1]
>
> bookmarks91[1, 0][1, 0]
>
> We monitor the disk writes and I only see writes at broker 0, and broker 1
> sees none (not comparable at all). I do see comparable network traffic at
> the 2 brokers.
>
> Anything obvious I'm missing.
>
> Thanks,
> - Kiran
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Kafka list

2016-03-19 Thread Christian Posta
You need to send a mail to users-subscr...@kafka.apache.org
http://kafka.apache.org/contact.html

On Sat, Mar 19, 2016 at 4:14 AM, Andreas Thoelke 
wrote:

> Hi,
>
> please add me to the Kafka list.
>
> Andreas
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Kafka LTS release

2016-03-21 Thread Christian Posta
+1 things are quite fast-moving to be honest... LTS would slow things down
and potentially drag along too much technical debt. I agree revisit this
discussion when 1.0 GAs.

On Mon, Mar 21, 2016 at 4:19 AM, Gerard Klijs 
wrote:

> I think Kafka at the moment is not mature enough to support a LTS release.
> I think it will take a lot of effort to 'guarantee' a back-port will be
> more safe to use in production then the new release. For example, when you
> will manage the release of 0.9.0.2, with the fixes from 0.10.0.0, you need
> to make sure all the 0.9.0.1 clients still work with it, and you don't
> introduce new bugs by the partial merge.
> I do think once there will be a 1.0.0.0 release it would be great to have a
> lts release.
>
> On Mon, Mar 21, 2016 at 11:54 AM Achanta Vamsi Subhash <
> achanta.va...@flipkart.com> wrote:
>
> > *bump*
> >
> > Any opinions on this?
> >
> > On Mon, Mar 14, 2016 at 4:37 PM, Achanta Vamsi Subhash <
> > achanta.va...@flipkart.com> wrote:
> >
> > > Hi all,
> > >
> > > We find that there are many releases of Kafka and not all the bugs are
> > > back ported to the older releases. Can we have a LTS (Long Term
> Support)
> > > release which can be supported for 2 years with all the bugs
> back-ported?
> > >
> > > This will be very helpful as during the last 2-3 releases, we often
> have
> > > the cases where the api of producers/consumers changes and the bugs are
> > > only fixed in the newer components. Also, many people who are on the
> > older
> > > versions treat the latest release of that series is the stable one and
> > end
> > > up with bugs in production.
> > >
> > > ​I can volunteer for the release management of the LTS release but as a
> > > community, can we follow the rigour of back-porting the bug-fixes to
> the
> > > LTS branch?​
> > >
> > > --
> > > Regards
> > > Vamsi Subhash
> > >
> >
> >
> >
> > --
> > Regards
> > Vamsi Subhash
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Kafka Connection Pool?

2016-03-21 Thread Christian Posta
Can you explain more? Have you measured the overhead of opening the
connections?

If I'm not mistaken, Kafka manages the connections under the covers to each
of the brokers that have topics (leader partitions) from which you're
consuming. The connection(s) to each partition leader will stay around as
long as you're consuming from them (ie, calling poll()) method. If you
aren't polling the broker for messages within the session timeout period,
the heartbeat mechanism will actually kick the consumer out and do a
rebalance. I'd be curious to understand more about your usecase to see
how/when you're experiencing overhead of the connection handshaking.

On Mon, Mar 21, 2016 at 7:33 PM, BYEONG-GI KIM  wrote:

> Hello. I have a question that the latest kafka, 0.9.0.1, provides any APIs
> for managing connection pool of kafka on both consumer and producer sides.
>
> I think the overhead which happens while establishing connection from
> consumer/producer to kafka broker(s) seems a little heavy.
>
> Thanks in advance!
>
> Best regards
>
> bgkim
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Consumer keeps committing to a partition it no longer holds after a rebalance

2016-03-23 Thread Christian Posta
How long does this last? Maybe there are some async commit-offset frames
left in the send buffer to that node? Can you reliably reproduce this?

On Wed, Mar 23, 2016 at 6:29 AM, Saurabh Daftary 
wrote:

> I am running broker 0.9.0 with consumer 0.9.0.1.
>
> Seeing an issue wherein after a consumer rebalance (when I add a new
> consumer) my old consumer still keeps committing offsets to partition it no
> longer holds after a rebalance.
>
> The rebalance seem to work ok - On run ConsumerGroupCommand I see that the
> partitions are redistributed after I add a new consumer. The respective
> consumers do their fetches only on their assigned partitions but the old
> consumer still commits the same last offset it had when it was holding on
> to that partition At the same time the added consumer commits the correct
> offsets.
>
> Any inputs on what could be going on?
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: dumping JMX data

2016-03-31 Thread Christian Posta
What if we added something like this to Kafka? https://jolokia.org
I've added a JIRA to do that, just haven't gotten to it yet. Will soon
though, especially if it'd be useful for others.

https://issues.apache.org/jira/browse/KAFKA-3377

On Thu, Mar 31, 2016 at 2:55 PM, David Sidlo  wrote:

> The Kafka JmxTool works fine although it is not user friendly, in that you
> cannot perform a query of the Kafka Server mbeans to determine content and
> to determine the path-string that you need to place into the -object-name
> option.
>
> Here's how I solved the problem...
>
> First, make sure that Kafka is running with jms options enabled...
>
> -  The following opens up the jxm port with no authentication (for
> testing)...
> -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=
> -Dcom.sun.management.jmxremote.ssl=false
> -Dcom.sun.management.jmxremote.authenticate=false
>
> Second, get jstatd running on the same server so that you can use VisualVM
> to look into what is going on inside.
>
> Then, use VisualVM along with its jmx-plugin to view the mbeans and
> contents.
>
> When using VisualVM, you will first connect to jstatd, then you have to
> right click on the host to request a JMX connection, where you will get a
> dialog, where you have to add the jmx port number to the host name (after
> the colon).
>
> When you view the mbeans, you will see the path to the given attribute if
> you hover over it with the mouse pointer, but only for a moment... The
> string can be quite long and... Unfortunately, you don't have the option to
> capture that string into the cut-buffer via the interface. I used a screen
> capture utility to capture the string and typed it into the terminal.
>
> So here is what my first working query looked like...
>
> /opt/kafka/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name
> "kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=ReplicaFetcherThread-0-5,brokerHost=
> hostname05.cluster.com,brokerPort=9092" --jmx-url
> service:jmx:rmi:///jndi/rmi://`hostname`:/jmxrmi
>
> That command will output all of the attributes for the given object-name,
> and it's a long object-name.
> With some experimentation, I found that you can use wild-cards on portions
> of the object-name that were clearly dynamic, such as the host-name or the
> thread-name. So you can get the attribute for all similar objects by using
> the following query...
>
> /opt/kafka/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name
> "kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=ReplicaFetcherThread*,brokerHost=hostname*.
> cluster.com,brokerPort=*" --jmx-url
> service:jmx:rmi:///jndi/rmi://`hostname`:/jmxrmi
>
> There may be simpler tools to use, but I really do like the GUI goodness
> in VisualVM, (and it is free).
>
> I hope that helps.
>
>
>
>


-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: dumping JMX data

2016-04-11 Thread Christian Posta
Yah +1.. i was considering making it an option.

And wrapping it with https://github.com/fabric8io/agent-bond if you want to
run it alongside other agents.

On Thu, Mar 31, 2016 at 9:21 PM, Gerard Klijs 
wrote:

> Don't know if adding it to Kafka is a good thing. I assume you need some
> java opts settings for it to work, and with other solutions these would be
> different. It could be enabled with an option off course, then it's not in
> the way if you use something else.
> We use zabbix, this is a single tool which can be used to read in jmx data,
> store the data for a certain time, configurable for each item, and create
> triggers and graphs for those.
>
> To see and copy jmx items we use the Oracle Java mission control, it has a
> tab with info on each jmx item, which can be copied to clipboard.
>
> On Fri, Apr 1, 2016, 02:03 Sean Clemmer  wrote:
>
> > Another +1 for Jolokia. We've got a pretty cool setup here that deploys
> > Jolokia alongside Kafka, and we wrote a small Sensu plugin to grab all
> the
> > stats from Jolokia's JSON API and reformat them for Graphite.
> >
> > On Thu, Mar 31, 2016 at 4:36 PM, craig w  wrote:
> >
> > > Including jolokia would be great, I've used for kafka and it worked
> well.
> > > On Mar 31, 2016 6:54 PM, "Christian Posta" 
> > > wrote:
> > >
> > > > What if we added something like this to Kafka? https://jolokia.org
> > > > I've added a JIRA to do that, just haven't gotten to it yet. Will
> soon
> > > > though, especially if it'd be useful for others.
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-3377
> > > >
> > > > On Thu, Mar 31, 2016 at 2:55 PM, David Sidlo 
> > > wrote:
> > > >
> > > > > The Kafka JmxTool works fine although it is not user friendly, in
> > that
> > > > you
> > > > > cannot perform a query of the Kafka Server mbeans to determine
> > content
> > > > and
> > > > > to determine the path-string that you need to place into the
> > > -object-name
> > > > > option.
> > > > >
> > > > > Here's how I solved the problem...
> > > > >
> > > > > First, make sure that Kafka is running with jms options enabled...
> > > > >
> > > > > -  The following opens up the jxm port with no
> authentication
> > > > (for
> > > > > testing)...
> > > > > -Dcom.sun.management.jmxremote
> > -Dcom.sun.management.jmxremote.port=
> > > > > -Dcom.sun.management.jmxremote.ssl=false
> > > > > -Dcom.sun.management.jmxremote.authenticate=false
> > > > >
> > > > > Second, get jstatd running on the same server so that you can use
> > > > VisualVM
> > > > > to look into what is going on inside.
> > > > >
> > > > > Then, use VisualVM along with its jmx-plugin to view the mbeans and
> > > > > contents.
> > > > >
> > > > > When using VisualVM, you will first connect to jstatd, then you
> have
> > to
> > > > > right click on the host to request a JMX connection, where you will
> > > get a
> > > > > dialog, where you have to add the jmx port number to the host name
> > > (after
> > > > > the colon).
> > > > >
> > > > > When you view the mbeans, you will see the path to the given
> > attribute
> > > if
> > > > > you hover over it with the mouse pointer, but only for a moment...
> > The
> > > > > string can be quite long and... Unfortunately, you don't have the
> > > option
> > > > to
> > > > > capture that string into the cut-buffer via the interface. I used a
> > > > screen
> > > > > capture utility to capture the string and typed it into the
> terminal.
> > > > >
> > > > > So here is what my first working query looked like...
> > > > >
> > > > > /opt/kafka/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool
> > > --object-name
> > > > >
> > > >
> > >
> >
> "kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=ReplicaFetcherThread-0-5,brokerHost=
> > > > > hostname05.cluster.com,brokerPort=9092" --jmx-url
> > > > &

Re: Encryption at Rest

2016-04-21 Thread Christian Csar
>From what I know of previous discussions encryption at rest can be
handled with transparent disk encryption. When that's sufficient it's
nice and easy.

Christian

On Thu, Apr 21, 2016 at 2:31 PM, Tauzell, Dave
 wrote:
> Has there been any discussion or work on at rest encryption for Kafka?
>
> Thanks,
>   Dave
>
> This e-mail and any files transmitted with it are confidential, may contain 
> sensitive information, and are intended solely for the use of the individual 
> or entity to whom they are addressed. If you have received this e-mail in 
> error, please notify the sender by reply e-mail immediately and destroy all 
> copies of the e-mail and any attachments.


Re: Encryption at Rest

2016-05-02 Thread Christian Csar
"We need to be capable of changing encryption keys on regular
intervals and in case of expected key compromise." is achievable with
full disk encryption particularly if you are willing to add and remove
Kafka servers so that you replicate the data to new machines/disks
with new keys and take the machines with old keys out of use and wipe
them.

For the second part of it I would suggest reevaluating your threat
model since you are looking at a machine that is compromised but not
compromised enough to be able to read the key from Kafka or to use
Kafka to read the data.

While you could add support to encrypt data on the way in and out of
compression I believe you would need either substantial work in Kafka
to support rewriting/reencrypting the logfiles (with performance
penalties) or rotate machines in and out as with full disk encryption.
Though I'll let someone with more knowledge of the implementation
comment further on what would be required.

Christian

On Mon, May 2, 2016 at 9:41 PM, Bruno Rassaerts
 wrote:
> We did try indeed the last scenario you describe as encrypted disks do not 
> fulfil our requirements.
> We need to be capable of changing encryption keys on regular intervals and in 
> case of expected key compromise.
> Also, when a running machine is hacked, disk based or file system based 
> encryption doesn’t offer any protection.
>
> Our goal is indeed to have the content in the broker files encrypted. The 
> problem is that only way to achieve this is through custom serialisers.
> This works, but the overhead is quite dramatic as the messages are no longer 
> efficiently compressed (in batch).
> Compression in the serialiser, before the encryption, doesn’t really solve 
> the performance problem.
>
> The best thing for us would be able to encrypt after the batch compression 
> offered by kafka.
> The hook to do this is missing in the current implementation.
>
> Bruno
>
>> On 02 May 2016, at 22:46, Tom Brown  wrote:
>>
>> I'm trying to understand your use-case for encrypted data.
>>
>> Does it need to be encrypted only over the wire? This can be accomplished
>> using TLS encryption (v0.9.0.0+). See
>> https://issues.apache.org/jira/browse/KAFKA-1690
>>
>> Does it need to be encrypted only when at rest? This can be accomplished
>> using full disk encryption as others have mentioned.
>>
>> Does it need to be encrypted during both? Use both TLS and full disk
>> encryption.
>>
>> Does it need to be encrypted fully from end-to-end so even Kafka can't read
>> it? Since Kafka shouldn't be able to know the contents, the key should not
>> be known to Kafka. What remains is manually encrypting each message before
>> giving it to the producer (or by implementing an encrypting serializer).
>> Either way, each message is still encrypted individually.
>>
>> Have I left out a scenario?
>>
>> --Tom
>>
>>
>> On Mon, May 2, 2016 at 2:01 PM, Bruno Rassaerts >> wrote:
>>
>>> Hello,
>>>
>>> We tried encrypting the data before sending it to kafka, however this
>>> makes the compression done by kafka almost impossible.
>>> Also the performance overhead of encrypting the individual messages was
>>> quite significant.
>>>
>>> Ideally, a pluggable “compression” algorithm would be best. Where message
>>> can first be compressed, then encrypted in batch.
>>> However, the current kafka implementation does not allow this.
>>>
>>> Bruno
>>>
>>>> On 26 Apr 2016, at 19:02, Jim Hoagland 
>>> wrote:
>>>>
>>>> Another option is to encrypt the data before you hand it to Kafka and
>>> have
>>>> the downstream decrypt it.  This takes care of on-disk on on-wire
>>>> encryption.  We did a proof of concept of this:
>>>>
>>>>
>>> http://www.symantec.com/connect/blogs/end-end-encryption-though-kafka-our-p
>>>> roof-concept
>>>>
>>>> ( http://symc.ly/1pC2CEG )
>>>>
>>>> -- Jim
>>>>
>>>> On 4/25/16, 11:39 AM, "David Buschman"  wrote:
>>>>
>>>>> Kafka handles messages which are compose of an array of bytes. Kafka
>>> does
>>>>> not care what is in those byte arrays.
>>>>>
>>>>> You could use a custom Serializer and Deserializer to encrypt and
>>> decrypt
>>>>> the data from with your application(s) easily enough.
>>>>>
>>>>> This give the benefit of having encryption at rest and over the wire.
>>> Two
>>>

Re: kafka-run-class.sh

2016-05-06 Thread Christian Posta
Well, to find the ones that are used by the Kafka tools:

find ./bin -type f | xargs grep kafka-run-class.sh | awk ' { print $4 }'


kafka-run-class.sh basically just sets up the JVM classpath to call any of
the Main classes inside kafka. What exactly are you looking to do?

On Fri, May 6, 2016 at 9:41 AM, Mudit Kumar  wrote:

> How can i get the list for all the class names i can run through
> ./kafka-run-class.sh [class-name] command?
>
> Thanks,
> Mudit




-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Zookeeper dies ... Kafka server unable to connect

2016-05-10 Thread Christian Posta
confidential and/or legally privileged.
> > > > If it has come to you in error you must take no action based on it,
> nor must you copy or show it to anyone; please delete/destroy and inform
> the sender immediately.
> > > >
> > > > On May 10, 2016 at 5:47:24 PM, Paolo Patierno (ppatie...@live.com)
> wrote:
> > > >
> > > > Hi all,
> > > >
> > > > experiencing with Kafka on Kubernetes I have the following error on
> Kafka server reconnection ...
> > > >
> > > > A cluster with one zookeeper and two kafka server ... I turn off the
> zookeeper pod but kubernetes restart it and guaratees the same IP address
> for it but the kafka server starts to retry connection failing with
> following trace :
> > > >
> > > > [2016-05-10 15:40:55,046] WARN Session 0x1549b308dd20002 for server
> 10.0.0.184/10.0.0.184:2181, unexpected error, closing socket connection
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> > > > java.io.IOException: Connection reset by peer
> > > > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > > > at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> > > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> > > > at
> org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
> > > > at
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
> > > > at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > > > [2016-05-10 15:40:55,149] INFO zookeeper state changed
> (Disconnected) (org.I0Itec.zkclient.ZkClient)
> > > > [2016-05-10 15:40:57,093] INFO Opening socket connection to server
> 10.0.0.184/10.0.0.184:2181. Will not attempt to authenticate using SASL
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:40:57,093] INFO Socket connection established to
> 10.0.0.184/10.0.0.184:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:40:57,158] INFO Unable to read additional data from
> server sessionid 0x1549b308dd20002, likely server has closed socket,
> closing socket connection and attempting reconnect
> (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:40:58,936] INFO Opening socket connection to server
> 10.0.0.184/10.0.0.184:2181. Will not attempt to authenticate using SASL
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:40:58,936] INFO Socket connection established to
> 10.0.0.184/10.0.0.184:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:40:58,937] INFO Unable to read additional data from
> server sessionid 0x1549b308dd20002, likely server has closed socket,
> closing socket connection and attempting reconnect
> (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:41:00,845] INFO Opening socket connection to server
> 10.0.0.184/10.0.0.184:2181. Will not attempt to authenticate using SASL
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:41:00,845] INFO Socket connection established to
> 10.0.0.184/10.0.0.184:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:41:00,846] INFO Unable to read additional data from
> server sessionid 0x1549b308dd20002, likely server has closed socket,
> closing socket connection and attempting reconnect
> (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:41:02,071] INFO Opening socket connection to server
> 10.0.0.184/10.0.0.184:2181. Will not attempt to authenticate using SASL
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:41:02,071] INFO Socket connection established to
> 10.0.0.184/10.0.0.184:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:41:02,072] INFO Unable to read additional data from
> server sessionid 0x1549b308dd20002, likely server has closed socket,
> closing socket connection and attempting reconnect
> (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:41:03,336] INFO Opening socket connection to server
> 10.0.0.184/10.0.0.184:2181. Will not attempt to authenticate using SASL
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:41:03,336] INFO Socket connection established to
> 10.0.0.184/10.0.0.184:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> > > > [2016-05-10 15:41:

Re: Increase number of topic in Kafka leads zookeeper fail

2016-05-16 Thread Christian Posta
+1 what Tom said.

Curious though Anas, what motivated you to try a topic per device? was
there something regarding management or security that you believe you can
achieve with topic per device?

On Mon, May 16, 2016 at 4:11 AM, Tom Crayford  wrote:

> Hi there,
>
> Generally you don't use a single topic per device in this use case, but one
> topic with some number of partitions and the key distribution based on
> device id. Kafka isn't designed for millions of low volume topics, but a
> few high volume ones.
>
> Thanks
>
> Tom Crayford
> Heroku Kafka
>
> On Mon, May 16, 2016 at 5:23 AM, Anas A  wrote:
>
> > We plan to use kafka as a message broker for IoT use case, where each
> > device is considered as unique topic. when I simulated 10 message per
> > second to 10 thousand topics zookeeper is getting bottle neck,all Kafka
> > monitoring tools fails to read the throughput values and number of topics
> > from JMX port because of that. will tuning zookeeper will solve the
> issues.
> > where In IoT use case there will be millions of device polling data to
> > millions of topics. I want to make sure the approach is perfect to go.
> > Please suggest.
> >
> >
> > *Thanks & Regards,*
> >
> >
> > Anas A
> > DBA, Trinity Mobility
> > [image: facebook] <https://www.facebook.com/anas.24aj> [image: twitter]
> > <https://twitter.com/anas24aj> [image: linkedin]
> > <http://in.linkedin.com/in/anas24aj> [image: googleplus]
> > <https://plus.google.com/u/0/+anasA24aj/>
> > +917736368236
> > anas.2...@gmail.com
> > Bangalore
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Producer offset commit API

2016-05-16 Thread Christian Posta
If you're using KafkaConnect, it does it for you!

basically you set the sourceRecord's "sourcePartition" and "sourceOffset"
fields (
https://github.com/christian-posta/kafka/blob/8db55618d5d5d5de97feab2bf8da4dc45387a76a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java#L51-51)
and Kafka will handle storing the offsets (default in Kafka i believe)
https://github.com/christian-posta/kafka/blob/1587aeed9a551c7c755666040fe26d066dba6857/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L198-198

If you need to read the offsets, just use the offsetStorageReader (
https://github.com/christian-posta/kafka/blob/417e283d643d8865aa3e79dffa373c8cc853d78f/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java#L31-31)
which is automatically available to a source task.

On Mon, May 16, 2016 at 9:25 AM, Kanagha  wrote:

> Hi,
>
> I am trying to find out the API for committing producer offset for Kafka
>
> I found this example:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>
> Which would work for committing consumer offsets. Is there a separate API
> for committing producer offset using Kafka connect?
>
> Thanks
> Kanagha
>
>
> --
> Kanagha
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Kafka for event sourcing architecture

2016-05-17 Thread Christian Posta
 > Kafka can help with batching writes up to S3 (see e.g. Pintrest's secor
> > > project), and act as a very high throughput, durable, replicated
> > messaging
> > > layer for communication. In this paradigm, when you want to replay, you
> > do
> > > so out of S3 until you've consumed the last offset there, then start
> > > replaying out of and catching up with the small amount of remaining
> data
> > in
> > > Kafka. Of course the replay logic there has to be hand rolled, as Kafka
> > and
> > > its clients have no knowledge of external stores.
> > >
> > > Another potential thing to look at is Kafka's compacted topic
> mechanism.
> > > With compacted topics, Kafka keeps the latest element for a given key,
> > > making it act a little more like a database table. Note that you still
> > have
> > > to consume by offset here - there's no "get the value for key Y
> > > operation". However, this assumes that your keyspace is still tractably
> > > small, and that you're ok with keeping only the latest value.
> Compaction
> > > completely overrides time based retention, so you have to "delete" keys
> > or
> > > have a bounded keyspace if you want to retain operational sanity with
> > > Kafka. I'd recommend reading the docs on compacted topics, they cover
> the
> > > use cases quite well.
> > >
> > >
> > >
> > > >
> > > > 2) Let's say I throw away my derived view and want to re-build it
> from
> > > > scratch, is it possible to consume messages from a topic from its
> very
> > > > first message and once it has caught up, listen for new messages like
> > it
> > > > would normally do?
> > >
> > >
> > > That's entirely possible, you can catch up from the first retained
> > message
> > > and then continue from there very easily. However, see above about
> > infinite
> > > retention.
> > >
> > >
> > >
> > > >
> > > > 2) Does it support transactions? Let's say I want to push 3 messages
> > > > atomically but the producer process crashes after sending only 2
> > > messages,
> > > > is it possible to "rollback" the first 2 messages (e.g. "all or
> > nothing"
> > > > semantics)?
> > >
> > >
> > > No. Kafka at the moment only supports "at least once" semantics, and
> > there
> > > are no cross broker transactions of any kind. Implementing such a thing
> > > would likely have huge negative impacts on the current performance
> > > characteristics of Kafka, which would be a issue for many users.
> > >
> > >
> > > >
> > > > 3) Does it support request/response style semantics or can they be
> > > > simulated? My system's primary interface with the outside world is an
> > > HTTP
> > > > API so it would be nice if I could publish an event and wait for all
> > the
> > > > internal services which need to process the event to be "done"
> > > > processing before returning a response.
> > >
> > >
> > >
> > > In theory that's possible - the producer can return the offset of the
> > > message produced, and you could check the latest offset of each
> consumer
> > in
> > > your web request handler.
> > >
> > > However, doing so is not going to work that well, unless you're ok with
> > > your web requests taking on the order of seconds to tens of seconds to
> > > fulfill. Kafka can do low latency messaging reasonably well, but
> > > coordinating the offsets of many consumers would likely have a huge
> > latency
> > > impact. Writing the code for it and getting it handling failure
> correctly
> > > would likely be a lot of work (there's nothing in any of the client
> > > libraries like this, because it is not a desirable or supported use
> > case).
> > >
> > > Instead I'd like to query *why* you need those semantics? What's the
> > issue
> > > with just producing a message and telling the user HTTP 200 and later
> > > consuming it.
> > >
> > >
> > >
> > > >
> > > > PS: I'm a Node.js/Go developer so when possible please avoid Java
> > centric
> > > > terminology.
> > >
> > >
> > > Please to note that the node and go clients are notably less mature
> than
> > > the JVM clients, and that running Kafka in production means knowing
> > enough
> > > about the JVM and Zookeeper to handle that.
> > >
> > > Thanks!
> > > Tom Crayford
> > > Heroku Kafka
> > >
> > > >
> > > > Thanks!
> > > >
> > > > - Oli
> > >
> > > >
> > > > --
> > > > - Oli
> > > >
> > > > Olivier Lalonde
> > > > http://www.syskall.com
> > > <http://www.syskall.com> <-- connect with me!
> > > >
> > >
> >
>
>
>
>
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Kafka for event sourcing architecture

2016-05-17 Thread Christian Posta
Please create a JIRA with your thoughts. I'd be happy to help out with
something like that.

On Tue, May 17, 2016 at 4:57 PM, Radoslaw Gruchalski 
wrote:

> Not as far as I'm aware. I'd be happy to contribute if there is a desire
> to have such feature. From experience with other projects, I know that
> without the initial pitch / discussion, it could be difficult to get such
> feature in. I can create a jira in the morning, no electricity again
> tonight :-/
>
> Get Outlook for iOS
>
>
>
>
> On Tue, May 17, 2016 at 4:53 PM -0700, "Christian Posta" <
> christian.po...@gmail.com> wrote:
>
>
>
>
>
>
>
>
>
>
> +1 to your solution of log.cleanup.policy. Other brokers (ie, ActiveMQ)
> have a feature like that.
> Is there a JIRA for this?
>
> On Tue, May 17, 2016 at 4:48 PM, Radoslaw Gruchalski
> wrote:
>
> > I have described a cold storage solution for Kafka:
> >
> https://medium.com/@rad_g/the-case-for-kafka-cold-storage-32929d0a57b2#.kf0jf8cwv
> .
> > Also described it here a couple of times. Thd potential solution seems
> > rather straightforward.
> > Get Outlook for iOS
> >
> > _
> > From: Luke Steensen
> > Sent: Tuesday, May 17, 2016 11:22 pm
> > Subject: Re: Kafka for event sourcing architecture
> > To:
> >
> >
> > It's harder in Kafka because the unit of replication is an entire
> > partition, not a single key/value pair. Partitions are large and
> constantly
> > growing, where key/value pairs are typically much smaller and don't
> change
> > in size. There would theoretically be no difference if you had one
> > partition per key, but that's not practical. Instead, you end up trying
> to
> > pick a number of partitions big enough that they'll each be a reasonable
> > size for the foreseeable future but not so big that the cluster overhead
> is
> > untenable. Even then the clock is ticking towards the day your biggest
> > partition approaches the limit of storage available on a single machine.
> >
> > It's frustrating because, as you say, there would be enormous benefits to
> > being able to access all data through the same system. Unfortunately, it
> > seems too far away from Kafka's original use case to be practical.
> >
> >
> > On Tue, May 17, 2016 at 12:32 PM, Daniel Schierbeck <
> > da...@zendesk.com.invalid> wrote:
> >
> > > I'm not sure why Kafka at least in theory cannot be used for infinite
> > > retention – any replicated database system would need to have a new
> node
> > > ingest all the data from failed node from its replicas. Surely this is
> no
> > > different in S3 itself. Why is this harder to do in Kafka than in other
> > > systems? The benefit of having just a single message log system would
> be
> > > rather big.
> > >
> > > On Tue, May 17, 2016 at 4:44 AM Tom Crayford
> > wrote:
> > >
> > > > Hi Oli,
> > > >
> > > > Inline.
> > > >
> > > > On Tuesday, 17 May 2016, Olivier Lalonde  wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I am considering adopting an "event sourcing" architecture for a
> > > system I
> > > > > am developing and Kafka seems like a good choice of store for
> events.
> > > > >
> > > > > For those who aren't aware, this architecture style consists in
> > storing
> > > > all
> > > > > state changes of the system as an ordered log of events and
> building
> > > > > derivative views as needed for easier querying (using a SQL
> database
> > > for
> > > > > example). Those views must be completely derived from the event log
> > > alone
> > > > > so that the log effectively becomes a "single source of truth".
> > > > >
> > > > > I was wondering if anyone else is using Kafka for that purpose and
> > more
> > > > > specifically:
> > > > >
> > > > > 1) Can Kafka store messages permanently?
> > > >
> > > >
> > > > No. Whilst you can tweak config and such to get a very long retention
> > > > period, this doesn't work well with Kafka at all. Keeping data around
> > > > forever has severe impacts on the operability of your cluster. For
> > > example,
> > > > if a machine fails, a replacement would have to catch up with vast
> 

Re: Kafka for event sourcing architecture

2016-05-18 Thread Christian Posta
t; >
> > > Oli,
> > >
> > > Kafka is only a partial solution.
> > > As I describe here (
> > >
> > >
> >
> http://www.slideshare.net/chris.e.richardson/hacksummit-2016-eventdriven-microservices-events-on-the-outside-on-the-inside-and-at-the-core/57
> > > )
> > > an event store is a hybrid of a database and a message broker.
> > > It is a database because it provides an API for inserting events for an
> > > entity and retrieving them by the entity's primary key.
> > > It is a message broker because it provides an API for subscribing to
> > > events.
> > > Kafka clearly satisfies the latter but not the former.
> > >
> > > Just my two cents.
> > >
> > > Chris
> > >
> > > --
> > > Microservices application platform http://eventuate.io
> > >
> > > On Tue, May 17, 2016 at 12:18 AM, Olivier Lalonde 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I am considering adopting an "event sourcing" architecture for a
> > system I
> > > > am developing and Kafka seems like a good choice of store for events.
> > > >
> > > > For those who aren't aware, this architecture style consists in
> storing
> > > all
> > > > state changes of the system as an ordered log of events and building
> > > > derivative views as needed for easier querying (using a SQL database
> > for
> > > > example). Those views must be completely derived from the event log
> > alone
> > > > so that the log effectively becomes a "single source of truth".
> > > >
> > > > I was wondering if anyone else is using Kafka for that purpose and
> more
> > > > specifically:
> > > >
> > > > 1) Can Kafka store messages permanently?
> > > >
> > > > 2) Let's say I throw away my derived view and want to re-build it
> from
> > > > scratch, is it possible to consume messages from a topic from its
> very
> > > > first message and once it has caught up, listen for new messages like
> > it
> > > > would normally do?
> > > >
> > > > 2) Does it support transactions? Let's say I want to push 3 messages
> > > > atomically but the producer process crashes after sending only 2
> > > messages,
> > > > is it possible to "rollback" the first 2 messages (e.g. "all or
> > nothing"
> > > > semantics)?
> > > >
> > > > 3) Does it support request/response style semantics or can they be
> > > > simulated? My system's primary interface with the outside world is an
> > > HTTP
> > > > API so it would be nice if I could publish an event and wait for all
> > the
> > > > internal services which need to process the event to be "done"
> > > > processing before returning a response.
> > > >
> > > > PS: I'm a Node.js/Go developer so when possible please avoid Java
> > centric
> > > > terminology.
> > > >
> > > > Thanks!
> > > >
> > > > - Oli
> > > >
> > > > --
> > > > - Oli
> > > >
> > > > Olivier Lalonde
> > > > http://www.syskall.com <-- connect with me!
> > > >
> > >
> >
>
>
>
> --
> - Oli
>
> Olivier Lalonde
> http://www.syskall.com <-- connect with me!
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: OffSet checker.

2016-05-18 Thread Christian Posta
Maybe give it a try with the kafka-consumer-groups.sh tool and the
--new-consumer flag:



On Wed, May 18, 2016 at 3:40 AM, Sathyakumar Seshachalam <
sathyakumar_seshacha...@trimble.com> wrote:

> The command line tool that comes with Kafka one seems to check offsets in
> zookeeper. This does not seem to be related to this issue<
> https://issues.apache.org/jira/browse/KAFKA-1951> - As I am sure the
> offsets are stored in Kafka for burrow seem to retrieve it properly.
> Is there any easier way to check the current offset for a given consumer
> group in Kafka – considering that different consumers within the group
> commit to either kafka or Zk ?
>
> Regards
> Sathya,
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: OffSet checker.

2016-05-18 Thread Christian Posta
On Wed, May 18, 2016 at 7:12 AM, Gerard Klijs 
wrote:

> I was just trying to do the same thing, but it does not seem to support ssl
> (or at least not in combination with the acl). I get similar errors as in
> https://issues.apache.org/jira/browse/KAFKA-3151 but when I used
> --command-config /home/kafka/consumer.properties to give the ssl properties
> it worked. It does only seem to work as long as there are client
> connections active.
>
>
Right, if there are no consumers part of the group still active, it
eliminates the group after a timeout period


> This was my output:
> kafka@2b28607f562f:/usr/bin$ ./kafka-consumer-groups --new-consumer
> --group
> mirrormaker --bootstrap-server 192.168.99.100:9093 --describe
> --command-config /home/kafka/consumer.properties
> GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
> mirrormaker, local.payments.accountentry, 0, 26583507, 26583507, 0,
> mirrormaker-0_172.17.0.1/172.17.0.1
> mirrormaker, local.beb.time, 0, unknown, 0, unknown,
> mirrormaker-0_172.17.0.1/172.17.0.1
> mirrormaker, local.distribution.alert, 0, unknown, 0, unknown,
> mirrormaker-0_172.17.0.1/172.17.0.1
> mirrormaker, local.general.example, 0, unknown, 0, unknown,
> mirrormaker-0_172.17.0.1/172.17.0.1
>
> On Wed, May 18, 2016 at 2:36 PM Christian Posta  >
> wrote:
>
> > Maybe give it a try with the kafka-consumer-groups.sh tool and the
> > --new-consumer flag:
> >
> >
> >
> > On Wed, May 18, 2016 at 3:40 AM, Sathyakumar Seshachalam <
> > sathyakumar_seshacha...@trimble.com> wrote:
> >
> > > The command line tool that comes with Kafka one seems to check offsets
> in
> > > zookeeper. This does not seem to be related to this issue<
> > > https://issues.apache.org/jira/browse/KAFKA-1951> - As I am sure the
> > > offsets are stored in Kafka for burrow seem to retrieve it properly.
> > > Is there any easier way to check the current offset for a given
> consumer
> > > group in Kafka – considering that different consumers within the group
> > > commit to either kafka or Zk ?
> > >
> > > Regards
> > > Sathya,
> > >
> >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: FW: [ANNOUCE] Apache Kafka 0.10.0.0 Released

2016-05-24 Thread Christian Posta
Maybe ask the spark mailing list ;-)


Congrats on the release! It's pretty amazing!

On Tue, May 24, 2016 at 4:42 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Does anyone know if spark plans to upgrade?
>
> I think the current version is 0.8x?
>
> Kind regards
>
> Andy
>
> From:  Gwen Shapira 
> Reply-To:  
> Date:  Tuesday, May 24, 2016 at 9:24 AM
> To:  , , <
> users@kafka.apache.org>
> Subject:  [ANNOUCE] Apache Kafka 0.10.0.0 Released
>
> > The Apache Kafka community is pleased to announce the release for Apache
> > Kafka 0.10.0.0.
> > This is a major release with exciting new features, including first
> > release of KafkaStreams and many other improvements.
> >
> > All of the changes in this release can be found:
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/RELEASE_NOTES.html
> >
> > Apache Kafka is high-throughput, publish-subscribe messaging system
> > rethought of as a distributed commit log.
> >
> > ** Fast => A single Kafka broker can handle hundreds of megabytes of
> reads
> > and
> > writes per second from thousands of clients.
> >
> > ** Scalable => Kafka is designed to allow a single cluster to serve as
> the
> > central data backbone
> > for a large organization. It can be elastically and transparently
> expanded
> > without downtime.
> > Data streams are partitioned and spread over a cluster of machines to
> allow
> > data streams
> > larger than the capability of any single machine and to allow clusters of
> > co-ordinated consumers.
> >
> > ** Durable => Messages are persisted on disk and replicated within the
> > cluster to prevent
> > data loss. Each broker can handle terabytes of messages without
> performance
> > impact.
> >
> > ** Distributed by Design => Kafka has a modern cluster-centric design
> that
> > offers
> > strong durability and fault-tolerance guarantees.
> >
> > You can download the source release from
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka-0.10.0.0-src
> .
> > tgz
> >
> > and binary releases from
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.10-0.10.0.0
> > .tgz
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.11-0.10.0.0
> > .tgz
> >
> > A big thank you for the following people who have contributed to the
> > 0.10.0.0 release.
> >
> > Adam Kunicki, Aditya Auradkar, Alex Loddengaard, Alex Sherwin, Allen
> > Wang, Andrea Cosentino, Anna Povzner, Ashish Singh, Atul Soman, Ben
> > Stopford, Bill Bejeck, BINLEI XUE, Chen Shangan, Chen Zhu, Christian
> > Posta, Cory Kolbeck, Damian Guy, dan norwood, Dana Powers, David
> > Jacot, Denise Fernandez, Dionysis Grigoropoulos, Dmitry Stratiychuk,
> > Dong Lin, Dongjoon Hyun, Drausin Wulsin, Duncan Sands, Dustin Cote,
> > Eamon Zhang, edoardo, Edward Ribeiro, Eno Thereska, Ewen
> > Cheslack-Postava, Flavio Junqueira, Francois Visconte, Frank Scholten,
> > Gabriel Zhang, gaob13, Geoff Anderson, glikson, Grant Henke, Greg
> > Fodor, Guozhang Wang, Gwen Shapira, Igor Stepanov, Ishita Mandhan,
> > Ismael Juma, Jaikiran Pai, Jakub Nowak, James Cheng, Jason Gustafson,
> > Jay Kreps, Jeff Klukas, Jeremy Custenborder, Jesse Anderson, jholoman,
> > Jiangjie Qin, Jin Xing, jinxing, Jonathan Bond, Jun Rao, Ján Koščo,
> > Kaufman Ng, kenji yoshida, Kim Christensen, Kishore Senji, Konrad,
> > Liquan Pei, Luciano Afranllie, Magnus Edenhill, Maksim Logvinenko,
> > manasvigupta, Manikumar reddy O, Mark Grover, Matt Fluet, Matt
> > McClure, Matthias J. Sax, Mayuresh Gharat, Micah Zoltu, Michael Blume,
> > Michael G. Noll, Mickael Maison, Onur Karaman, ouyangliduo, Parth
> > Brahmbhatt, Paul Cavallaro, Pierre-Yves Ritschard, Piotr Szwed,
> > Praveen Devarao, Rafael Winterhalter, Rajini Sivaram, Randall Hauch,
> > Richard Whaling, Ryan P, Samuel Julius Hecht, Sasaki Toru, Som Sahu,
> > Sriharsha Chintalapani, Stig Rohde Døssing, Tao Xiao, Tom Crayford,
> > Tom Dearman, Tom Graves, Tom Lee, Tomasz Nurkiewicz, Vahid Hashemian,
> > William Thurston, Xin Wang, Yasuhiro Matsuda, Yifan Ying, Yuto
> > Kawamura, zhuchen1018
> >
> > We welcome your help and feedback. For more information on how to
> > report problems, and to get involved, visit the project website at
> > http://kafka.apache.org/
> >
> > Thanks,
> >
> > Gwen
> >
>
>
>


-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Scalability of Kafka Consumer 0.9.0.1

2016-06-01 Thread Christian Posta
Gerard is correct.

The unit of parallelization in kafka is the topic and topic partition. A
single thread/consumer consumes each partition in a topic (even if multiple
topics). KafkaConsumer is NOT thread safe and should not be shared between
threads.

On Wed, Jun 1, 2016 at 12:11 AM, Gerard Klijs 
wrote:

> If I understand it correctly each consumer should have it's 'own' thread,
> and should not be accessible from other threads. But you could
> (dynamically) create enough threads to cover all the partitions, so each
> consumer only reads from one partition. You could also let all those
> consumers access some threadsafe object if you need to combine the result.
> In your linked example the consumers just each do there part, with solves
> the multi-threaded issue, but when you want to combine data from different
> consumer threads it becomes more tricky.
>
> On Wed, Jun 1, 2016 at 2:57 AM BYEONG-GI KIM  wrote:
>
> > Hello.
> >
> > I've implemented a Kafka Consumer Application which consume large number
> of
> > monitoring data from Kafka Broker and analyze those data accordingly.
> >
> > I referred to a guide,
> >
> >
> http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
> > ,
> > since I thought the app needs to implement multi-threading for Kafka
> > Consumer per Topic. Actually, A topic is assigned to each open-source
> > monitoring software, e.g., Nagios, Collectd, etc., in order to
> distinguish
> > those because each of these uses its own message format, such as JSON,
> > String, and so on.
> >
> > There was, however, an Exception even though my source code for the Kafka
> > Consumer are mostly copied and pasted from the guide;
> > *java.util.ConcurrentModificationException:
> > KafkaConsumer is not safe for multi-threaded access*
> >
> > First Question. Could the implementation in the guide really prevent the
> > Exception?
> >
> > And second Question is, could the KafkaConsumer support such huge amount
> of
> > data with one thread? The KafkaConsumer seems not thread-safe, and it can
> > subscribe multi-topics at once. Do I need to change the implementation
> from
> > the multi-threaded to one-thread and subscribing multi-topics?... I'm
> just
> > wonder whether a KafkaConsumer is able to stand the bunch of data without
> > performance degradation.
> >
> > Thanks in advance!
> >
> > Best regards
> >
> > KIM
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Does the Kafka Streams DSL support non-Kafka sources/sinks?

2016-06-02 Thread Christian Posta
Hate to bring up "non-flashy" technology... but Apache Camel would be a
great fit for something like this. Two java libraries each with very strong
suits.



On Thu, Jun 2, 2016 at 6:09 PM, Avi Flax  wrote:

> On 6/2/16, 07:03, "Eno Thereska"  wrote:
>
> > Using the low-level streams API you can definitely read or write to
> arbitrary
> > locations inside the process() method.
>
> Ah, good to know — thank you!
>
> > However, back to your original question: even with the low-level streams
> > API the sources and sinks can only be Kafka topics for now. So, as Gwen
> > mentioned, Connect would be the way to go to bring the data to a Kafka
> > Topic first.
>
> Got it — thank you!
>
>


-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: where does kafka store consumer offset

2016-06-14 Thread Christian Posta
I believe it stores it inside Kafka itself inside a topic
named __consumer_offsets
The "new" consumer groups doesn't use ZK to store consumer offsets.

On Tue, Jun 14, 2016 at 1:04 AM, thinking  wrote:

> Hello everyone,   I have use kafka_2.11-0.10.0.0, I write a simple
> consumer code. But I can't find consumer offset in zookeeper(I have check
> /consumers、/config/clients、/config/topics/__consumer_offsets). I want to
> know where does kafka store consumer?
>
>
> here is my consumer code:
> package com.nfsq.fire.eye.message;  import
> org.apache.kafka.clients.consumer.ConsumerRecord; import
> org.apache.kafka.clients.consumer.ConsumerRecords; import
> org.apache.kafka.clients.consumer.KafkaConsumer;  import
> java.util.ArrayList; import java.util.Arrays; import java.util.List; import
> java.util.Properties;  /**  * Created by letianyipin on 16/6/13.  */ public
> class KafkaSimpleConsumerDemo { public static void main(String[] args)
> {  Properties props = new Properties();
>  props.put("bootstrap.servers", "10.4.250.5:9092,10.4.250.6:9092,
> 10.4.250.7:9092"); props.put("group.id", "test");
>  props.put("enable.auto.commit", "false"); //props.put("
> auto.commit.interval.ms", "100"); props.put("
> session.timeout.ms", "3");  props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>  props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>  props.put("client.id", "22i am terry d342"); //
> props.put("autooffset.reset", "smallest"); //
> props.put("autocommit.enable", false); KafkaConsumer String> consumer = new KafkaConsumer<>(props);
>  consumer.subscribe(Arrays.asList("say-hello-test1"));     final int
> minBatchSize = 200; List> buffer =
> new ArrayList<>(); while (true) {
>  ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records){
>buffer.add(record); } consumer.commitSync();
>   } } }




-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Delete Message From topic

2016-06-14 Thread Christian Posta
Might be worth describing your use case a bit to see if there's another way
to help you?

On Tue, Jun 14, 2016 at 5:29 AM, Mudit Kumar  wrote:

> Hey,
>
> How can I delete particular messages from particular topic?Is that
> possible?
>
> Thanks,
> Mudit
>
>


-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Expired messages in kafka topic

2016-06-23 Thread Christian Posta
Sounds like something a traditional message broker (ie, ActiveMQ) would be
able to do with a TTL setting and expiry. Expired messages get moved to a
DLQ.

On Thu, Jun 23, 2016 at 2:45 AM, Krish  wrote:

> Hi,
> I am trying to design a real-time application where message timeout can be
> as low as a minute or two (message can get stale real-fast).
>
> In the rare chance that the consumers lag too far behind in processing
> messages from the broker, is there a concept of expired message queue in
> Kafka?
>
> I would like to know if a message has expired and then park it in some
> topic till as such time that a service can dequeue, process it and/or
> investigate it.
>
> Thanks.
>
> Best,
> Krish
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Pros and cons of dockerizing kafka brokers?

2016-07-07 Thread Christian Posta
One thing I can think of is Kafka likes lots of OS page cache. Dockerizing
from the standpoint of packaging configs is a good idea, just make sure if
you're running many brokers together on the same host, they've got enough
resources (CPU/Mem) so they don't starve each other.

On Thu, Jul 7, 2016 at 2:30 AM, Krish  wrote:

> Hi,
> I am currently testing a custom docker volume driver plugin for AWS EFS/EBS
> access and mounting. So, running kafka broker inside a container makes will
> ease up a lot of configuration issues wrt storage for me.
>
> Are there any pros and cons of dockerizing kafka broker?
> Off the top of my head, since kafka forms the base of our setup, I can
> think of making is use the host networking stack, and increase ulimits for
> the container.
> I would like to know if and when kafka becomes greedy and cannibalizes
> resources; I can also ensure that it runs on a dedicated machine.
>
> Thanks.
>
> Best,
> Krish
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


GDPR compliance

2020-08-19 Thread Apolloni, Christian
Hello,

I have some questions about implementing GDPR compliance in Kafka.

In our situation we have the requirement of removing personal data from in 
coordination with multiple systems. The idea is having a central "coordinator 
system" which triggers the deletion process for the individual systems in a 
specific, controlled sequence which takes into account the various system 
inter-dependencies and data flows. This means e.g. system nr. 2 will receive 
the delete order only after system nr. 1 has reported that it's done with the 
deletion on its side (and so forth).

One of the systems in question publishes data in Kafka topics for consumption 
in other systems and part of the deletion process is to remove the relevant 
personal data from these Kafka topics too. This has to happen in a relatively 
short time after the deletion order is received, to prevent a long delay before 
the systems further down the chain can start their own deletion. Furthermore, 
we need to know when the operation is completed: only at that point we can give 
the "go" to the other systems.

We are unsure how to satisfy those requirements in Kafka. If anyone has ideas 
or suggestions we would be very interested in your opinion. We are also 
interested in general about experiences in implementing GDPR compliance in 
Kafka, especially when dealing with multiple, interconnected systems.

Kind regards,

-- 
Christian Apolloni

Disclaimer: The contents of this email and any attachment thereto are intended 
exclusively for the attention of the addressee(s). The email and any such 
attachment(s) may contain information that is confidential and protected on the 
strength of professional, official or business secrecy laws and regulations or 
contractual obligations. Should you have received this email by mistake, you 
may neither make use of nor divulge the contents of the email or of any 
attachment thereto. In such a case, please inform the email's sender and delete 
the message and all attachments without delay from your systems.
You can find our e-mail disclaimer statement in other languages under 
http://www.baloise.ch/email_disclaimer


Re: GDPR compliance

2020-08-19 Thread Apolloni, Christian
On 2020/08/19 16:15:40, Nemeth Sandor  wrote:
> Hi Christian,>

Hi, thanks for your reply.

> depending on how your Kafka topics are configured, you have 2 different>
> options:>
>
> a) if you have a non-log-compacted then you can set the message retention>
> on the topic to the desired value. In that case the message will be deleted>
> by Kafka after the retention period expires. (the config value is `>
> retention.ms` I think)>

That's what we thought too at first as solution, but we likely cannot set the 
retention low enough.

> b) if you use Kafka as a log store with topics having infinite retention,>
> then one common solution is to send a so-called tombstone record (a record>
> with the same key containing only GDPR compatible data with the sensitive>
> information removed), and let Kafka take care of the removal using log>
> compaction.>

We also thought about this, but as far as we understood there is no real 
guarantee that the compaction completes in a given time for all messages in the 
topic. From what we understood compaction can be delayed by the messages still 
being in the active segment and/or the compaction thread pool being too busy.

It's also unclear to us how we can know that the compaction has completed for 
all relevant messages and that we can safely report to our "coordinator system" 
that the next system can start its own deletion process safely.

Kind Regards,

 -- 
 Christian Apolloni


Disclaimer: The contents of this email and any attachment thereto are intended 
exclusively for the attention of the addressee(s). The email and any such 
attachment(s) may contain information that is confidential and protected on the 
strength of professional, official or business secrecy laws and regulations or 
contractual obligations. Should you have received this email by mistake, you 
may neither make use of nor divulge the contents of the email or of any 
attachment thereto. In such a case, please inform the email's sender and delete 
the message and all attachments without delay from your systems.
You can find our e-mail disclaimer statement in other languages under 
http://www.baloise.ch/email_disclaimer


Re: GDPR compliance

2020-08-19 Thread Apolloni, Christian
Hi Sandor, thanks again for your reply.

> If you have a non-log-compacted topic, after `retention.ms` the message>
> (along with the PII) gets deleted from the Kafka message store without any>
> further action, which should satisfy GDPR requirements:>
> - you are handling PII in Kafka for a limited amount of time>
> - you are processing the data for the given purpose it was given>
> - the data will automatically be deleted without any further steps>
> If you have a downstream system, you should also be able to publish a>
> message through Kafka so that the downstream system executes its delete>
> processes - if required. We implemented a similar process where we>
> published an AnonymizeOrder event, which instructed downstream systems to>
> anonymize the order data in their own data store.>

Our problem is, the data could have been published shortly before the system 
receives a delete order from the "coordinator". This is because the data might 
have been mutated and the update needs to be propagated to consumer systems. If 
we go with a retention-period of days we would only be able to proceed with 
subsequent systems in the coordinated chain with too much of a delay. Going 
with an even shorter retention would be problematic.

> If you have a log-compacted topic:>
> - yes, I have the same understanding as you have on the active segment.>
> - You can set the segment.ms>
> <https://kafka.apache.org/documentation/#segment.ms> property to force the>
> compaction to occur within an expected timeframe.>
>
> In general what I understand is true in both cases that Kafka gives you>
> good enough guarantees to either remove the old message after retention.ms>
> milliseconds or execute the topic compaction after segment.ms time that it>
> is unnecessary to try to figure out more specifically in what exact moment>
> the data is deleted. Setting these configurations should give you enough>
> guarantee that the data removal will occur - if not, that imo should be>
> considered a bug and reported back to the project.>

We investigated the max.compaction.lag.ms parameter which was introduced in 
KIP-354 and from our understanding the intent is exactly what we'd like to 
accomplish, but unless we missed something we have noticed new segments are 
rolled only if new messages are appended. If the topic has very low activity it 
can be that no new message is appended and the segment is left active 
indefinitely. This means the cleaning for that segment might remain also 
indefinitely stalled. We are unsure whether our understanding is correct and 
whether it's a bug or not.

In general, I think part of the issue is that the system receives the delete 
order at the time that it has to be performed: we don't deal with the 
processing of the required waiting periods, that's what happens in the 
"coordinator system". The system with the data to be deleted receives the order 
and has to perform the deletion immediately.

Kind regards,

 -- 
 Christian Apolloni



Disclaimer: The contents of this email and any attachment thereto are intended 
exclusively for the attention of the addressee(s). The email and any such 
attachment(s) may contain information that is confidential and protected on the 
strength of professional, official or business secrecy laws and regulations or 
contractual obligations. Should you have received this email by mistake, you 
may neither make use of nor divulge the contents of the email or of any 
attachment thereto. In such a case, please inform the email's sender and delete 
the message and all attachments without delay from your systems.
You can find our e-mail disclaimer statement in other languages under 
http://www.baloise.ch/email_disclaimer


Re: GDPR compliance

2020-08-19 Thread Apolloni, Christian
As alternative solution we also investigated encryption: encrypting all 
messages with an individual key and removing the key once the "deletion" needs 
to be performed.

Has anyone experience with such a solution?

 -- 
 Christian Apolloni



Disclaimer: The contents of this email and any attachment thereto are intended 
exclusively for the attention of the addressee(s). The email and any such 
attachment(s) may contain information that is confidential and protected on the 
strength of professional, official or business secrecy laws and regulations or 
contractual obligations. Should you have received this email by mistake, you 
may neither make use of nor divulge the contents of the email or of any 
attachment thereto. In such a case, please inform the email's sender and delete 
the message and all attachments without delay from your systems.
You can find our e-mail disclaimer statement in other languages under 
http://www.baloise.ch/email_disclaimer


Re: GDPR compliance

2020-08-19 Thread Apolloni, Christian
> Hi all,>
>
> there has been an interesting talk about this during a previous Kafka>
> Summit. It talks about using crypto-shredding to 'forget' user information.>
> I'm not sure if there are any slides, but it basically suggests that you'd>
> encrypt user data on Kafka, and when you get a information removal request,>
> the only thing you have to do is to delete the encryption key for that user.>
>
> Here's the announcement of the talk:>
> https://kafka-summit.org/sessions/handling-gdpr-apache-kafka-comply-without-freaking/,>
> but not sure where slides or a recording can be found unfortunately.>
>
> Hope it helps.>
>
> BR,>
> Patrick>

Hi Patrick,

Thanks for your reply, we are aware of that talk: the documentation is 
avaliable here:

https://www.confluent.io/kafka-summit-lon19/handling-gdpr-apache-kafka-comply-freaking-out/

That's what sparked our interest in such a solution.

Kind regards,

 -- 
 Christian Apolloni
Disclaimer: The contents of this email and any attachment thereto are intended 
exclusively for the attention of the addressee(s). The email and any such 
attachment(s) may contain information that is confidential and protected on the 
strength of professional, official or business secrecy laws and regulations or 
contractual obligations. Should you have received this email by mistake, you 
may neither make use of nor divulge the contents of the email or of any 
attachment thereto. In such a case, please inform the email's sender and delete 
the message and all attachments without delay from your systems.
You can find our e-mail disclaimer statement in other languages under 
http://www.baloise.ch/email_disclaimer


Possible bug? Duplicates when searching kafka stream state store with caching

2018-06-29 Thread Christian Henry
Hi all,

I'll first describe a simplified view of relevant parts of our setup (which
should be enough to repro), describe the behavior we're seeing, and then
note some information I've come across after digging in a bit.

We have a kafka stream application, and one of our transform steps keeps a
state store to filter out messages with a previously seen GUID. That is,
our transform looks like:

public KeyValue transform(byte[] key, String guid) {
try (WindowStoreIterator iterator =
duplicateStore.fetch(correlationId, start, now)) {
if (iterator.hasNext()) {
return null;
} else {
duplicateStore.put(correlationId, some metadata);
return new KeyValue<>(key, message);
}
}}

where the duplicateStore is a persistent windowed store with caching
enabled.

I was debugging some tests and found that sometimes when calling
*all()* or *fetchAll()
*on the duplicate store and stepping through the iterator, it would return
the same guid more than once, even if it was only inserted into the store
once. More specifically, if I had the following guids sent to the stream:
[1, 2, ... 9] (for 9 values total), sometimes it would return
10 values, with one (or more) of the values being returned twice by the
iterator. However, this would not show up with a *fetch(guid)* on that
specific guid. For instance, if 1 was being returned twice by
*fetchAll()*, calling *duplicateStore.fetch("1", start, end)* will
still return an iterator with size of 1.

I dug into this a bit more by setting a breakpoint in
*SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
storeKey)* and watching the two input values as I looped through the
iterator using "*while(iterator.hasNext()) { print(iterator.next()) }*". In
one test, the duplicate value was 6, and saw the following behavior
(trimming off the segment values from the byte input):
-- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
-- next() returns 6
and
-- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
-- next() returns 6
Besides those, the input values are the same and the output is as expected.
Additionally, a coworker noted that the number of duplicates always matches
the number of times *Long.compare(cacheSegmentId, storeSegmentId) *returns
a non-zero value, indicating that duplicates are likely arising due to the
segment comparison.


Re: Possible bug? Duplicates when searching kafka stream state store with caching

2018-07-02 Thread Christian Henry
We're using the latest Kafka (1.1.0). I'd like to note that when we
encounter duplicates, the window is the same as well.

My original code was a bit simplifier -- we also insert into the store if
iterator.hasNext() as well, before returning null. We're using a window
store because we have a punctuator that runs every few minutes to count
GUIDs with similar metadata, and reports that in a healthcheck. Since our
healthcheck window is less than the retention period of the store
(retention period might be 1 hour, healthcheck window is ~5 min), the
window store seemed like a good way to efficiently query all of the most
recent data. Note that since the healthcheck punctuator needs to aggregate
on all the recent values, it has to do a *fetchAll(start, end) *which is
how these duplicates are affecting us.

On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang  wrote:

> Hello Christian,
>
> Since you are calling fetch(key, start, end) I'm assuming that
> duplicateStore
> is a WindowedStore. With a windowed store, it is possible that a single key
> can fall into multiple windows, and hence be returned from the
> WindowStoreIterator,
> note its type is , V>
>
> So I'd first want to know
>
> 1) which Kafka version are you using.
> 2) why you'd need a window store, and if yes, could you consider using the
> single point fetch (added in KAFKA-6560) other than the range query (which
> is more expensive as well).
>
>
>
> Guozhang
>
>
> On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> christian.henr...@gmail.com> wrote:
>
> > Hi all,
> >
> > I'll first describe a simplified view of relevant parts of our setup
> (which
> > should be enough to repro), describe the behavior we're seeing, and then
> > note some information I've come across after digging in a bit.
> >
> > We have a kafka stream application, and one of our transform steps keeps
> a
> > state store to filter out messages with a previously seen GUID. That is,
> > our transform looks like:
> >
> > public KeyValue transform(byte[] key, String guid) {
> > try (WindowStoreIterator iterator =
> > duplicateStore.fetch(correlationId, start, now)) {
> > if (iterator.hasNext()) {
> > return null;
> > } else {
> > duplicateStore.put(correlationId, some metadata);
> > return new KeyValue<>(key, message);
> > }
> > }}
> >
> > where the duplicateStore is a persistent windowed store with caching
> > enabled.
> >
> > I was debugging some tests and found that sometimes when calling
> > *all()* or *fetchAll()
> > *on the duplicate store and stepping through the iterator, it would
> return
> > the same guid more than once, even if it was only inserted into the store
> > once. More specifically, if I had the following guids sent to the stream:
> > [1, 2, ... 9] (for 9 values total), sometimes it would return
> > 10 values, with one (or more) of the values being returned twice by the
> > iterator. However, this would not show up with a *fetch(guid)* on that
> > specific guid. For instance, if 1 was being returned twice by
> > *fetchAll()*, calling *duplicateStore.fetch("1", start, end)* will
> > still return an iterator with size of 1.
> >
> > I dug into this a bit more by setting a breakpoint in
> > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> > storeKey)* and watching the two input values as I looped through the
> > iterator using "*while(iterator.hasNext()) { print(iterator.next()) }*".
> In
> > one test, the duplicate value was 6, and saw the following behavior
> > (trimming off the segment values from the byte input):
> > -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> > -- next() returns 6
> > and
> > -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> > -- next() returns 6
> > Besides those, the input values are the same and the output is as
> expected.
> > Additionally, a coworker noted that the number of duplicates always
> matches
> > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> *returns
> > a non-zero value, indicating that duplicates are likely arising due to
> the
> > segment comparison.
> >
>
>
>
> --
> -- Guozhang
>


Re: Possible bug? Duplicates when searching kafka stream state store with caching

2018-07-03 Thread Christian Henry
Nope, we're setting retainDuplicates to false.

On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy  wrote:

> Hi,
>
> When you create your window store do you have `retainDuplicates` set to
> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is the
> last param `true`?
>
> Thanks,
> Damian
>
> On Mon, 2 Jul 2018 at 17:29 Christian Henry 
> wrote:
>
> > We're using the latest Kafka (1.1.0). I'd like to note that when we
> > encounter duplicates, the window is the same as well.
> >
> > My original code was a bit simplifier -- we also insert into the store if
> > iterator.hasNext() as well, before returning null. We're using a window
> > store because we have a punctuator that runs every few minutes to count
> > GUIDs with similar metadata, and reports that in a healthcheck. Since our
> > healthcheck window is less than the retention period of the store
> > (retention period might be 1 hour, healthcheck window is ~5 min), the
> > window store seemed like a good way to efficiently query all of the most
> > recent data. Note that since the healthcheck punctuator needs to
> aggregate
> > on all the recent values, it has to do a *fetchAll(start, end) *which is
> > how these duplicates are affecting us.
> >
> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Christian,
> > >
> > > Since you are calling fetch(key, start, end) I'm assuming that
> > > duplicateStore
> > > is a WindowedStore. With a windowed store, it is possible that a single
> > key
> > > can fall into multiple windows, and hence be returned from the
> > > WindowStoreIterator,
> > > note its type is , V>
> > >
> > > So I'd first want to know
> > >
> > > 1) which Kafka version are you using.
> > > 2) why you'd need a window store, and if yes, could you consider using
> > the
> > > single point fetch (added in KAFKA-6560) other than the range query
> > (which
> > > is more expensive as well).
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> > > christian.henr...@gmail.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'll first describe a simplified view of relevant parts of our setup
> > > (which
> > > > should be enough to repro), describe the behavior we're seeing, and
> > then
> > > > note some information I've come across after digging in a bit.
> > > >
> > > > We have a kafka stream application, and one of our transform steps
> > keeps
> > > a
> > > > state store to filter out messages with a previously seen GUID. That
> > is,
> > > > our transform looks like:
> > > >
> > > > public KeyValue transform(byte[] key, String guid) {
> > > > try (WindowStoreIterator iterator =
> > > > duplicateStore.fetch(correlationId, start, now)) {
> > > > if (iterator.hasNext()) {
> > > > return null;
> > > > } else {
> > > > duplicateStore.put(correlationId, some metadata);
> > > > return new KeyValue<>(key, message);
> > > > }
> > > > }}
> > > >
> > > > where the duplicateStore is a persistent windowed store with caching
> > > > enabled.
> > > >
> > > > I was debugging some tests and found that sometimes when calling
> > > > *all()* or *fetchAll()
> > > > *on the duplicate store and stepping through the iterator, it would
> > > return
> > > > the same guid more than once, even if it was only inserted into the
> > store
> > > > once. More specifically, if I had the following guids sent to the
> > stream:
> > > > [1, 2, ... 9] (for 9 values total), sometimes it would
> > return
> > > > 10 values, with one (or more) of the values being returned twice by
> the
> > > > iterator. However, this would not show up with a *fetch(guid)* on
> that
> > > > specific guid. For instance, if 1 was being returned twice by
> > > > *fetchAll()*, calling *duplicateStore.fetch("1", start, end)*
> will
> > > > still return an iterator with size of 1.
> > > >
> > > > I dug into this a bit more by setting a breakpoint in
> > >

Re: Possible bug? Duplicates when searching kafka stream state store with caching

2018-07-06 Thread Christian Henry
Any other ideas here? Should I create a bug?

On Tue, Jul 3, 2018 at 1:21 PM, Christian Henry  wrote:

> Nope, we're setting retainDuplicates to false.
>
> On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy  wrote:
>
>> Hi,
>>
>> When you create your window store do you have `retainDuplicates` set to
>> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is the
>> last param `true`?
>>
>> Thanks,
>> Damian
>>
>> On Mon, 2 Jul 2018 at 17:29 Christian Henry 
>> wrote:
>>
>> > We're using the latest Kafka (1.1.0). I'd like to note that when we
>> > encounter duplicates, the window is the same as well.
>> >
>> > My original code was a bit simplifier -- we also insert into the store
>> if
>> > iterator.hasNext() as well, before returning null. We're using a window
>> > store because we have a punctuator that runs every few minutes to count
>> > GUIDs with similar metadata, and reports that in a healthcheck. Since
>> our
>> > healthcheck window is less than the retention period of the store
>> > (retention period might be 1 hour, healthcheck window is ~5 min), the
>> > window store seemed like a good way to efficiently query all of the most
>> > recent data. Note that since the healthcheck punctuator needs to
>> aggregate
>> > on all the recent values, it has to do a *fetchAll(start, end) *which is
>> > how these duplicates are affecting us.
>> >
>> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang 
>> wrote:
>> >
>> > > Hello Christian,
>> > >
>> > > Since you are calling fetch(key, start, end) I'm assuming that
>> > > duplicateStore
>> > > is a WindowedStore. With a windowed store, it is possible that a
>> single
>> > key
>> > > can fall into multiple windows, and hence be returned from the
>> > > WindowStoreIterator,
>> > > note its type is , V>
>> > >
>> > > So I'd first want to know
>> > >
>> > > 1) which Kafka version are you using.
>> > > 2) why you'd need a window store, and if yes, could you consider using
>> > the
>> > > single point fetch (added in KAFKA-6560) other than the range query
>> > (which
>> > > is more expensive as well).
>> > >
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
>> > > christian.henr...@gmail.com> wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > I'll first describe a simplified view of relevant parts of our setup
>> > > (which
>> > > > should be enough to repro), describe the behavior we're seeing, and
>> > then
>> > > > note some information I've come across after digging in a bit.
>> > > >
>> > > > We have a kafka stream application, and one of our transform steps
>> > keeps
>> > > a
>> > > > state store to filter out messages with a previously seen GUID. That
>> > is,
>> > > > our transform looks like:
>> > > >
>> > > > public KeyValue transform(byte[] key, String guid) {
>> > > > try (WindowStoreIterator iterator =
>> > > > duplicateStore.fetch(correlationId, start, now)) {
>> > > > if (iterator.hasNext()) {
>> > > > return null;
>> > > > } else {
>> > > > duplicateStore.put(correlationId, some metadata);
>> > > > return new KeyValue<>(key, message);
>> > > > }
>> > > > }}
>> > > >
>> > > > where the duplicateStore is a persistent windowed store with caching
>> > > > enabled.
>> > > >
>> > > > I was debugging some tests and found that sometimes when calling
>> > > > *all()* or *fetchAll()
>> > > > *on the duplicate store and stepping through the iterator, it would
>> > > return
>> > > > the same guid more than once, even if it was only inserted into the
>> > store
>> > > > once. More specifically, if I had the following guids sent to the
>> > stream:
>> > > > [1, 2, ... 9] (for 9 values total), sometimes it would
>> > return
>> > > > 10 values, with o

Use case: Per tenant deployments talking to multi tenant kafka cluster

2021-12-08 Thread Christian Schneider
We have a single tenant application that we deploy to a kubernetes cluster
in many instances.
Every customer has several environments of the application. Each
application lives in a separate namespace and should be isolated from other
applications.

We plan to use kafka to communicate inside an environment (between the
different pods).
As setting up one kafka cluster per such environment is a lot of overhead
and cost we would like to just use a single multi tenant kafka cluster.

Let's assume we just have one topic with 10 partitions for simplicity.
We can now use the environment id as a key for the messages to make sure
the messages of each environment arrive in order while sharing the load on
the partitions.

Now we want each environment to only read the minimal number of messages
while consuming. Ideally we would like to to only consume its own messages.
Can we somehow filter to only
receive messages with a certain key? Can we maybe only listen to a certain
partition at least?

Additionally we ideally would like to have enforced isolation. So each
environment can only see its own messages even if it might receive messages
of other environments from the same partition.
I think in worst case we can make this happen by encrypting the messages
but it would be great if we could filter on broker side.

Christian

-- 
-- 
Christian Schneider
http://www.liquid-reality.de

Computer Scientist
http://www.adobe.com


Re: Use case: Per tenant deployments talking to multi tenant kafka cluster

2021-12-08 Thread Christian Schneider
Hi Luke,

thanks for the hints. This helps a lot already.

We already use assign as we manage offsets on the consumer side. Currently
we only have one partition and simply assign a stored offset on partition 0.
For multiple partitions is it the correct behaviour to simply assign to
partition number:offset or do I have to provide offsets for the other
partitions too? I only want to listen to one partition.
You mentioned custom producer partitioner. We currently use a random
consumer group name for each consumer as we want each consumer to receive
all messages of the environment. In this case do we still need a custom
producer partitioner or is it enough to simply assign to the topic like
described above?

Christian

Am Mi., 8. Dez. 2021 um 11:19 Uhr schrieb Luke Chen :

> Hi Christian,
> Answering your question below:
>
> > Let's assume we just have one topic with 10 partitions for simplicity.
> We can now use the environment id as a key for the messages to make sure
> the messages of each environment arrive in order while sharing the load on
> the partitions.
>
> > Now we want each environment to only read the minimal number of messages
> while consuming. Ideally we would like to to only consume its own messages.
> Can we somehow filter to only
> receive messages with a certain key? Can we maybe only listen to a certain
> partition at least?
>
>
> Unfortunately, Kafka doesn't have the feature to filter the messages on
> broker before sending to consumer.
> But for your 2nd question:
> > Can we maybe only listen to a certain partition at least?
>
> Actually, yes. Kafka has a way to just fetch data from a certain partition
> of a topic. You can use Consumer#assign API to achieve that. So, to do
> that, I think you also need to have a custom producer partitioner for your
> purpose. Let's say, in your example, you have 10 partitions, and 10
> environments. Your partitioner should send to the specific partition based
> on the environment ID, ex: env ID 1 -> partition 1, env ID 2 -> partition
> 2 So, in your consumer, you can just assign to the partition containing
> its environment ID.
>
> And for the idea of encrypting the messages to achieve isolation, it's
> interesting! I've never thought about it! :)
>
> Hope it helps.
>
> Thank you.
> Luke
>
>
> On Wed, Dec 8, 2021 at 4:48 PM Christian Schneider <
> ch...@die-schneider.net>
> wrote:
>
> > We have a single tenant application that we deploy to a kubernetes
> cluster
> > in many instances.
> > Every customer has several environments of the application. Each
> > application lives in a separate namespace and should be isolated from
> other
> > applications.
> >
> > We plan to use kafka to communicate inside an environment (between the
> > different pods).
> > As setting up one kafka cluster per such environment is a lot of overhead
> > and cost we would like to just use a single multi tenant kafka cluster.
> >
> > Let's assume we just have one topic with 10 partitions for simplicity.
> > We can now use the environment id as a key for the messages to make sure
> > the messages of each environment arrive in order while sharing the load
> on
> > the partitions.
> >
> > Now we want each environment to only read the minimal number of messages
> > while consuming. Ideally we would like to to only consume its own
> messages.
> > Can we somehow filter to only
> > receive messages with a certain key? Can we maybe only listen to a
> certain
> > partition at least?
> >
> > Additionally we ideally would like to have enforced isolation. So each
> > environment can only see its own messages even if it might receive
> messages
> > of other environments from the same partition.
> > I think in worst case we can make this happen by encrypting the messages
> > but it would be great if we could filter on broker side.
> >
> > Christian
> >
> > --
> > --
> > Christian Schneider
> > http://www.liquid-reality.de
> >
> > Computer Scientist
> > http://www.adobe.com
> >
>


-- 
-- 
Christian Schneider
http://www.liquid-reality.de

Computer Scientist
http://www.adobe.com


Custom plugin to filter on kafka server side

2021-12-08 Thread Christian Schneider
We share topics between different tenants. Would it be possible to
implement a filtering on kafka
side that allows a consumer to filter a topic for a certain key?
The idea is that this consumer only gets messages with the specified key to
save network bandwidth as well as (possibly) disk io on kafka broker side.

Christian


-- 
-- 
Christian Schneider
http://www.liquid-reality.de

Computer Scientist
http://www.adobe.com


GlobalKTable with RocksDB - queries before state RUNNING?

2023-11-21 Thread Christian Zuegner
d36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread) 
stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State 
transition from REBALANCING to RUNNING
2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams] (pool-10-thread-1) 
stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] Started 0 
stream threads
2023-11-21 15:28:36,774 INFO  [com.osr.serKafkaStreamsService] 
(vert.x-worker-thread-1) new file processed


Thanks for any input!
Christian





RE: GlobalKTable with RocksDB - queries before state RUNNING?

2023-11-22 Thread Christian Zuegner
Hi Sophie,

thanks a lot for you tip! I've implemented a StateListener - to block queries 
when the state does not equal RUNNING. This will work perfectly now for our 
use-case!


In the meantime I noticed the InteractiveQuery API v2 and give it a try. 
Unfortunately it seems not to cope with GlobalKTable. When try to run this:

return 
streams.query(StateQueryRequest.inStore(STORE_NAME).withQuery(KeyQuery.withKey(key)));

I got: "Global stores do not yet support the KafkaStreams#query API. Use 
KafkaStreams#store instead."

From my point of view it would be great if this will work and behave like with 
IN_MEMORY StoreType as it is straight forward to use.

Do you see a chance to get InteractiveQueryV2 work with GlobalKTable?

Kind regards,
Christian

-Original Message-
From: Sophie Blee-Goldman  
Sent: Wednesday, November 22, 2023 1:51 AM
To: christian.zueg...@ams-osram.com.invalid
Cc: users@kafka.apache.org
Subject: Re: GlobalKTable with RocksDB - queries before state RUNNING?

[Sie erhalten nicht häufig E-Mails von sop...@responsive.dev. Weitere 
Informationen, warum dies wichtig ist, finden Sie unter 
https://aka.ms/LearnAboutSenderIdentification ]

Just to make sure I understand the logs, you're saying the "new file processed" 
lines represent store queries, and presumably the 
com.osr.serKafkaStreamsService is your service that's issuing these queries?

You need to wait for the app to finish restoring state before querying it.
Based on this message -- "KafkaStreams has not been started, you can retry 
after calling start()" -- I assume you're kicking off the querying service 
right away and blocking queries until after KafkaStreams#start is called.
But you need to wait for it to actually finish starting up, not just for
start() to be called. The best way to do this is by setting a state listener 
via KafkaStreams#setStateListener, and then using this to listen in on the 
KafkaStreams.State and blocking the queries until the state has changed to 
RUNNING.

In case you're curious about why this seems to work with in-memory stores but 
not with rocksdb, it seems like in the in-memory case, the queries that are 
attempted during restoration are blocked due to the store being closed 
(according to "(Quarkus Main Thread) the state store, store-name, is not
open.")

So why is the store closed for most of the restoration in the in-memory case 
only? This gets a bit into the weeds, but it has to do with the sequence of 
events in starting up a state store. When the global thread starts up, it'll 
first loop over all its state stores and call #init on them. Two things have to 
happen inside #init: the store is opened, and the store registers itself with 
the ProcessorContext. The #register involves various things, including a call 
to fetch the end offsets of the topic for global state stores. This is a 
blocking call, so the store might stay inside the #register call for a 
relatively long while.

For RocksDB stores, we open the store first and then call #register, so by the 
time the GlobalStreamThread is sitting around waiting on the end offsets 
response, the store is open and your queries are getting through to it. However 
the in-memory store actually registers itself *first*, before marking itself as 
open, and so it remains closed for most of the time it spends in restoration 
and blocks any query attempts during this time.

I suppose it would make sense to align the two store implementations to have 
the same behavior, and the in-memory store is probably technically more 
correct. But in the end you really should just wait for the KafkaStreams.State 
to get to RUNNING before querying the state store, as that's the only true 
guarantee.

Hope this helps!

-Sophie

On Tue, Nov 21, 2023 at 6:44 AM Christian Zuegner 
 wrote:

> Hi,
>
> we have the following problem - a Kafka Topic ~20Megabytes is made 
> available as GlobalKTable for queries. With using RocksDB the GKTable 
> is ready for queries instantly even without having reading the data 
> complete - all get() requests return null. After a few seconds the 
> data is querieable correctly - but this is to late for our 
> application. Once we switch to IN_MEMORY we get the expected behavior. 
> The store is only ready after all data has been read from topic.
>
> How can we achieve the same behavior with the RocksDB setup?
>
> Snipet to build KafkaStreams Topology
>
> builder.globalTable(
>   "topic-name",
>   Consumed.with(Serdes.String(), Serdes.String()),
>
> Materialized.as(STORE_NAME).withStoreType(Materialized.StoreType.ROCKS
> _DB)
> );
>
> Query the Table
>
> while (true) {
> try {
> return streams.store(
>
> StoreQueryParameters.fromNameAndType(FileCrawlerKafkaTopologyProducer.
> STORE_NAME, QueryableStoreTypes.keyValueS

Installing kafka

2024-07-08 Thread Christian Scharrer
Hi there,

when wanting to install Kafka using the command tar -xzf kafka_2.13-3.7.1.tgz

my Mac says no, produces the following error message: tar: Error opening 
archive: Failed to open 'kafka_2.13-3.7.1.tgz'

Any idea how I can still install Kafka?

Thanks for helping.
Christian 

When will Kafka without ZooKeeper be GA?

2021-12-14 Thread Christian A. Mathiesen
Hi,

In the quickstart guide on the Kafka website, the following is stated:
«Note: Soon, ZooKeeper will no longer be required by Apache Kafka»

According to release notes, «early access of replace ZooKeeper with a 
self-managed quorum» was added April 19 2021.

The question is; when will this feature be GA or considered as stable?

Best regards,
Christian A. Mathiesen



My Docker Kafka image

2017-11-26 Thread Christian F. Gonzalez Di Antonio
I would like to share my @apachekafka <https://twitter.com/apachekafka>
@Docker <https://twitter.com/Docker> image! with all of you. The
Documentation is a work in progress!
https://hub.docker.com/r/christiangda/kafka/

Regards,
Christian


My kafka docker image

2017-11-29 Thread Christian F. Gonzalez Di Antonio
Hi everyone, I would like to share my recently kafka's docker image with
you.

I made this docker image because I try to get the major focus of kafka
configuration in my development environment and also because any of the
others images available was not easy to understand (how to use) outside
kafka itself configuration.

it was not tested on Kubernates, but I expect to do that soon.


 feel free to let me know your feedback on the github's repository

Regards,
Christian


Re: My kafka docker image

2017-11-29 Thread Christian F. Gonzalez Di Antonio
El mié., 29 nov. 2017 9:09 PM, Christian F. Gonzalez Di Antonio <
christian...@gmail.com> escribió:

> uhh, so sorry, I forgot it.
>
> Dockek Hub: https://hub.docker.com/r/christiangda/kafka/
>
> Github: https://github.com/christiangda/docker-kafka
>
> Regards,
>
> Christian
>
>
>
> El mié., 29 nov. 2017 8:01 PM, Jeremy Hansen  escribió:
>
>> Christian, I didn’t see your github link.
>>
>> Thanks
>>
>> On Nov 29, 2017, at 1:44 PM, Christian F. Gonzalez Di Antonio <
>> christian...@gmail.com> wrote:
>>
>> Hi everyone, I would like to share my recently kafka's docker image with
>> you.
>>
>> I made this docker image because I try to get the major focus of kafka
>> configuration in my development environment and also because any of the
>> others images available was not easy to understand (how to use) outside
>> kafka itself configuration.
>>
>> it was not tested on Kubernates, but I expect to do that soon.
>>
>>
>> feel free to let me know your feedback on the github's repository
>>
>> Regards,
>> Christian
>>
>>


  1   2   >