Kafka Configuration Question

2017-05-24 Thread Bennett, Conrad
Hello,

I'm hoping someone could provide me with some assistance please. I am in the 
process of attempting to standing up a Kafka cluster and I have 7 nodes all of 
which has kafka and zookeeper installed. I have attached my server.properties 
file to verify whether or not I have anything misconfigured but each time I try 
to start the kafka service it fails with the error timed out connecting to 
zookeeper but the zookeeper process is up and running. Also during my research 
I read in order to achieve better performance separate drives for kafka data 
should be configure, but in the configuration file I didn't understand where 
exactly that should be configure. Any assistance would be greatly appreciated. 
Thanks in advance

kafka: { version: 0.10.1.1 }

zkper: { version: 3.4.9 }

Conrad Bennett Jr.


Re: SASL and SSL

2017-05-24 Thread Waleed Fateem
For completion, I saw Ismael Juma post an answer which contains the
information I was looking for:

http://comments.gmane.org/gmane.comp.apache.kafka.user/15140

SASL_SSL -> authentication using SASL AND connection is encrypted using
SSL.

On Wed, May 24, 2017 at 7:37 PM, Waleed Fateem 
wrote:

> Hello!
>
> I'm not very clear on the behavior that we should expect when we configure
> Kafka to use the protocol SASL_SSL.
>
> Is SASL or SSL mutually exclusive here or can I authenticate with SASL and
> use SSL for encryption?
>
> If the latter is true, then is it correct to assume that encryption will
> take place using SSL if a client authenticates using a Kerberos ticket so
> long as they have a trust store configured?
>
> Thank you.
>
> Waleed
>


Re: Kafka Authorization and ACLs Broken

2017-05-24 Thread Raghav
I initially tried kerberos, but it felt too complicated, so gave up and
only tried SSL.

On Wed, May 24, 2017 at 7:47 PM, Mike Marzo 
wrote:

> Thanks.  We will try it.  Struggling with krb5 and acls
>
> mike marzo
> 908 209-4484 <(908)%20209-4484>
>
> On May 24, 2017 9:29 PM, "Raghav"  wrote:
>
>> Mike
>>
>> I am not using jaas file. I literally took the config Rajini gave in the
>> previous email and it worked for me. I am using ssl Kafka with ACLs. I am
>> not suing kerberos.
>>
>> Thanks.
>>
>> On Wed, May 24, 2017 at 11:29 AM, Mike Marzo <
>> precisionarchery...@gmail.com> wrote:
>>
>>> I'm also having issues getting acls to work.  Out of intereat, are you
>>> starting ur brokers with a jaas file, if so do u mind sharing the client
>>> and server side jaas entries so I can validate what I'm doing.
>>>
>>> mike marzo
>>> 908 209-4484
>>>
>>> On May 24, 2017 10:54 AM, "Raghav"  wrote:
>>>
>>> > Hi Rajini
>>> >
>>> > Thank you very much. It perfectly works.
>>> >
>>> > I think in my setup I was trying to use a CA (certificate authority) to
>>> > sign the certificates from client and server, and then adding it to
>>> trust
>>> > store and keystore. I think in that process, I may have messed
>>> something. I
>>> > will try above config with a CA to sign certificates. Hopefully that
>>> would
>>> > work too.
>>> >
>>> > Thanks a lot again.
>>> >
>>> > Raghav
>>> >
>>> >
>>> >
>>> >
>>> > On Wed, May 24, 2017 at 7:04 AM, Rajini Sivaram <
>>> rajinisiva...@gmail.com>
>>> > wrote:
>>> >
>>> > > Raghav/Darshan,
>>> > >
>>> > > Can you try these steps on a clean installation of Kafka? It works
>>> for
>>> > me,
>>> > > so hopefully it will work for you. And then you can adapt to your
>>> > scenario.
>>> > >
>>> > > *Create keystores and truststores:*
>>> > >
>>> > > keytool -genkey -alias kafka -keystore server.keystore.jks -dname
>>> > > "CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
>>> > > -keypass server-key-password
>>> > >
>>> > > keytool -exportcert -file server-cert-file -keystore
>>> server.keystore.jks
>>> > > -alias kafka -storepass server-keystore-password
>>> > >
>>> > > keytool -importcert -file server-cert-file -keystore
>>> > server.truststore.jks
>>> > > -alias kafka -storepass server-truststore-password -noprompt
>>> > >
>>> > > keytool -importcert -file server-cert-file -keystore
>>> > client.truststore.jks
>>> > > -alias kafkaclient -storepass client-truststore-password -noprompt
>>> > >
>>> > >
>>> > > keytool -genkey -alias kafkaclient -keystore client.keystore.jks
>>> -dname
>>> > > "CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
>>> > > -keypass client-key-password
>>> > >
>>> > > keytool -exportcert -file client-cert-file -keystore
>>> client.keystore.jks
>>> > > -alias kafkaclient -storepass client-keystore-password
>>> > >
>>> > > keytool -importcert -file client-cert-file -keystore
>>> > server.truststore.jks
>>> > > -alias kafkaclient -storepass server-truststore-password -noprompt
>>> > >
>>> > > *Configure broker: Add these lines at the end of your
>>> server.properties*
>>> > >
>>> > > listeners=SSL://:9093
>>> > >
>>> > > advertised.listeners=SSL://127.0.0.1:9093
>>> > >
>>> > > ssl.keystore.location=/tmp/acl/server.keystore.jks
>>> > >
>>> > > ssl.keystore.password=server-keystore-password
>>> > >
>>> > > ssl.key.password=server-key-password
>>> > >
>>> > > ssl.truststore.location=/tmp/acl/server.truststore.jks
>>> > >
>>> > > ssl.truststore.password=server-truststore-password
>>> > >
>>> > > security.inter.broker.protocol=SSL
>>> > >
>>> > > security.protocol=SSL
>>> > >
>>> > > ssl.client.auth=required
>>> > >
>>> > > allow.everyone.if.no.acl.found=false
>>> > >
>>> > > authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
>>> > >
>>> > > super.users=User:CN=KafkaBroker,O=Pivotal,C=UK
>>> > >
>>> > > *Configure producer: producer.properties*
>>> > >
>>> > > security.protocol=SSL
>>> > >
>>> > > ssl.truststore.location=/tmp/acl/client.truststore.jks
>>> > >
>>> > > ssl.truststore.password=client-truststore-password
>>> > >
>>> > > ssl.keystore.location=/tmp/acl/client.keystore.jks
>>> > >
>>> > > ssl.keystore.password=client-keystore-password
>>> > >
>>> > > ssl.key.password=client-key-password
>>> > >
>>> > >
>>> > > *Configure consumer: consumer.properties*
>>> > >
>>> > > security.protocol=SSL
>>> > >
>>> > > ssl.truststore.location=/tmp/acl/client.truststore.jks
>>> > >
>>> > > ssl.truststore.password=client-truststore-password
>>> > >
>>> > > ssl.keystore.location=/tmp/acl/client.keystore.jks
>>> > >
>>> > > ssl.keystore.password=client-keystore-password
>>> > >
>>> > > ssl.key.password=client-key-password
>>> > >
>>> > > group.id=testgroup
>>> > >
>>> > > *Create topic:*
>>> > >
>>> > > bin/kafka-topics.sh  --zookeeper localhost --create --topic testtopic
>>> > > --replication-factor 1 --partitions 1
>>> > >
>>> > >
>>> > > *Configure ACLs:*
>>> > >
>>> > > bin/kafka-acls.sh --authorizer-properties
>>> zookeeper.connect=loc

Re: Kafka Authorization and ACLs Broken

2017-05-24 Thread Mike Marzo
Thanks.  We will try it.  Struggling with krb5 and acls

mike marzo
908 209-4484

On May 24, 2017 9:29 PM, "Raghav"  wrote:

> Mike
>
> I am not using jaas file. I literally took the config Rajini gave in the
> previous email and it worked for me. I am using ssl Kafka with ACLs. I am
> not suing kerberos.
>
> Thanks.
>
> On Wed, May 24, 2017 at 11:29 AM, Mike Marzo <
> precisionarchery...@gmail.com> wrote:
>
>> I'm also having issues getting acls to work.  Out of intereat, are you
>> starting ur brokers with a jaas file, if so do u mind sharing the client
>> and server side jaas entries so I can validate what I'm doing.
>>
>> mike marzo
>> 908 209-4484
>>
>> On May 24, 2017 10:54 AM, "Raghav"  wrote:
>>
>> > Hi Rajini
>> >
>> > Thank you very much. It perfectly works.
>> >
>> > I think in my setup I was trying to use a CA (certificate authority) to
>> > sign the certificates from client and server, and then adding it to
>> trust
>> > store and keystore. I think in that process, I may have messed
>> something. I
>> > will try above config with a CA to sign certificates. Hopefully that
>> would
>> > work too.
>> >
>> > Thanks a lot again.
>> >
>> > Raghav
>> >
>> >
>> >
>> >
>> > On Wed, May 24, 2017 at 7:04 AM, Rajini Sivaram <
>> rajinisiva...@gmail.com>
>> > wrote:
>> >
>> > > Raghav/Darshan,
>> > >
>> > > Can you try these steps on a clean installation of Kafka? It works for
>> > me,
>> > > so hopefully it will work for you. And then you can adapt to your
>> > scenario.
>> > >
>> > > *Create keystores and truststores:*
>> > >
>> > > keytool -genkey -alias kafka -keystore server.keystore.jks -dname
>> > > "CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
>> > > -keypass server-key-password
>> > >
>> > > keytool -exportcert -file server-cert-file -keystore
>> server.keystore.jks
>> > > -alias kafka -storepass server-keystore-password
>> > >
>> > > keytool -importcert -file server-cert-file -keystore
>> > server.truststore.jks
>> > > -alias kafka -storepass server-truststore-password -noprompt
>> > >
>> > > keytool -importcert -file server-cert-file -keystore
>> > client.truststore.jks
>> > > -alias kafkaclient -storepass client-truststore-password -noprompt
>> > >
>> > >
>> > > keytool -genkey -alias kafkaclient -keystore client.keystore.jks
>> -dname
>> > > "CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
>> > > -keypass client-key-password
>> > >
>> > > keytool -exportcert -file client-cert-file -keystore
>> client.keystore.jks
>> > > -alias kafkaclient -storepass client-keystore-password
>> > >
>> > > keytool -importcert -file client-cert-file -keystore
>> > server.truststore.jks
>> > > -alias kafkaclient -storepass server-truststore-password -noprompt
>> > >
>> > > *Configure broker: Add these lines at the end of your
>> server.properties*
>> > >
>> > > listeners=SSL://:9093
>> > >
>> > > advertised.listeners=SSL://127.0.0.1:9093
>> > >
>> > > ssl.keystore.location=/tmp/acl/server.keystore.jks
>> > >
>> > > ssl.keystore.password=server-keystore-password
>> > >
>> > > ssl.key.password=server-key-password
>> > >
>> > > ssl.truststore.location=/tmp/acl/server.truststore.jks
>> > >
>> > > ssl.truststore.password=server-truststore-password
>> > >
>> > > security.inter.broker.protocol=SSL
>> > >
>> > > security.protocol=SSL
>> > >
>> > > ssl.client.auth=required
>> > >
>> > > allow.everyone.if.no.acl.found=false
>> > >
>> > > authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
>> > >
>> > > super.users=User:CN=KafkaBroker,O=Pivotal,C=UK
>> > >
>> > > *Configure producer: producer.properties*
>> > >
>> > > security.protocol=SSL
>> > >
>> > > ssl.truststore.location=/tmp/acl/client.truststore.jks
>> > >
>> > > ssl.truststore.password=client-truststore-password
>> > >
>> > > ssl.keystore.location=/tmp/acl/client.keystore.jks
>> > >
>> > > ssl.keystore.password=client-keystore-password
>> > >
>> > > ssl.key.password=client-key-password
>> > >
>> > >
>> > > *Configure consumer: consumer.properties*
>> > >
>> > > security.protocol=SSL
>> > >
>> > > ssl.truststore.location=/tmp/acl/client.truststore.jks
>> > >
>> > > ssl.truststore.password=client-truststore-password
>> > >
>> > > ssl.keystore.location=/tmp/acl/client.keystore.jks
>> > >
>> > > ssl.keystore.password=client-keystore-password
>> > >
>> > > ssl.key.password=client-key-password
>> > >
>> > > group.id=testgroup
>> > >
>> > > *Create topic:*
>> > >
>> > > bin/kafka-topics.sh  --zookeeper localhost --create --topic testtopic
>> > > --replication-factor 1 --partitions 1
>> > >
>> > >
>> > > *Configure ACLs:*
>> > >
>> > > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:
>> > 2181
>> > > --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK"
>> --producer
>> > > --topic testtopic
>> > >
>> > > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:
>> > 2181
>> > > --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK"
>> --consumer
>> > > --topic testtopic --gro

Re: Kafka Authorization and ACLs Broken

2017-05-24 Thread Raghav
Mike

I am not using jaas file. I literally took the config Rajini gave in the
previous email and it worked for me. I am using ssl Kafka with ACLs. I am
not suing kerberos.

Thanks.

On Wed, May 24, 2017 at 11:29 AM, Mike Marzo 
wrote:

> I'm also having issues getting acls to work.  Out of intereat, are you
> starting ur brokers with a jaas file, if so do u mind sharing the client
> and server side jaas entries so I can validate what I'm doing.
>
> mike marzo
> 908 209-4484
>
> On May 24, 2017 10:54 AM, "Raghav"  wrote:
>
> > Hi Rajini
> >
> > Thank you very much. It perfectly works.
> >
> > I think in my setup I was trying to use a CA (certificate authority) to
> > sign the certificates from client and server, and then adding it to trust
> > store and keystore. I think in that process, I may have messed
> something. I
> > will try above config with a CA to sign certificates. Hopefully that
> would
> > work too.
> >
> > Thanks a lot again.
> >
> > Raghav
> >
> >
> >
> >
> > On Wed, May 24, 2017 at 7:04 AM, Rajini Sivaram  >
> > wrote:
> >
> > > Raghav/Darshan,
> > >
> > > Can you try these steps on a clean installation of Kafka? It works for
> > me,
> > > so hopefully it will work for you. And then you can adapt to your
> > scenario.
> > >
> > > *Create keystores and truststores:*
> > >
> > > keytool -genkey -alias kafka -keystore server.keystore.jks -dname
> > > "CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
> > > -keypass server-key-password
> > >
> > > keytool -exportcert -file server-cert-file -keystore
> server.keystore.jks
> > > -alias kafka -storepass server-keystore-password
> > >
> > > keytool -importcert -file server-cert-file -keystore
> > server.truststore.jks
> > > -alias kafka -storepass server-truststore-password -noprompt
> > >
> > > keytool -importcert -file server-cert-file -keystore
> > client.truststore.jks
> > > -alias kafkaclient -storepass client-truststore-password -noprompt
> > >
> > >
> > > keytool -genkey -alias kafkaclient -keystore client.keystore.jks -dname
> > > "CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
> > > -keypass client-key-password
> > >
> > > keytool -exportcert -file client-cert-file -keystore
> client.keystore.jks
> > > -alias kafkaclient -storepass client-keystore-password
> > >
> > > keytool -importcert -file client-cert-file -keystore
> > server.truststore.jks
> > > -alias kafkaclient -storepass server-truststore-password -noprompt
> > >
> > > *Configure broker: Add these lines at the end of your
> server.properties*
> > >
> > > listeners=SSL://:9093
> > >
> > > advertised.listeners=SSL://127.0.0.1:9093
> > >
> > > ssl.keystore.location=/tmp/acl/server.keystore.jks
> > >
> > > ssl.keystore.password=server-keystore-password
> > >
> > > ssl.key.password=server-key-password
> > >
> > > ssl.truststore.location=/tmp/acl/server.truststore.jks
> > >
> > > ssl.truststore.password=server-truststore-password
> > >
> > > security.inter.broker.protocol=SSL
> > >
> > > security.protocol=SSL
> > >
> > > ssl.client.auth=required
> > >
> > > allow.everyone.if.no.acl.found=false
> > >
> > > authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
> > >
> > > super.users=User:CN=KafkaBroker,O=Pivotal,C=UK
> > >
> > > *Configure producer: producer.properties*
> > >
> > > security.protocol=SSL
> > >
> > > ssl.truststore.location=/tmp/acl/client.truststore.jks
> > >
> > > ssl.truststore.password=client-truststore-password
> > >
> > > ssl.keystore.location=/tmp/acl/client.keystore.jks
> > >
> > > ssl.keystore.password=client-keystore-password
> > >
> > > ssl.key.password=client-key-password
> > >
> > >
> > > *Configure consumer: consumer.properties*
> > >
> > > security.protocol=SSL
> > >
> > > ssl.truststore.location=/tmp/acl/client.truststore.jks
> > >
> > > ssl.truststore.password=client-truststore-password
> > >
> > > ssl.keystore.location=/tmp/acl/client.keystore.jks
> > >
> > > ssl.keystore.password=client-keystore-password
> > >
> > > ssl.key.password=client-key-password
> > >
> > > group.id=testgroup
> > >
> > > *Create topic:*
> > >
> > > bin/kafka-topics.sh  --zookeeper localhost --create --topic testtopic
> > > --replication-factor 1 --partitions 1
> > >
> > >
> > > *Configure ACLs:*
> > >
> > > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:
> > 2181
> > > --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK"
> --producer
> > > --topic testtopic
> > >
> > > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:
> > 2181
> > > --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK"
> --consumer
> > > --topic testtopic --group test group
> > >
> > >
> > > *Run console producer and type in some messages:*
> > >
> > > bin/kafka-console-producer.sh  --producer.config
> > > /tmp/acl/producer.properties --topic testtopic --broker-list
> > > 127.0.0.1:9093
> > >
> > >
> > > *Run console consumer, you should see messages from above:*
> > >
> > > bin/kafka-console-consumer.sh  --co

SASL and SSL

2017-05-24 Thread Waleed Fateem
Hello!

I'm not very clear on the behavior that we should expect when we configure
Kafka to use the protocol SASL_SSL.

Is SASL or SSL mutually exclusive here or can I authenticate with SASL and
use SSL for encryption?

If the latter is true, then is it correct to assume that encryption will
take place using SSL if a client authenticates using a Kerberos ticket so
long as they have a trust store configured?

Thank you.

Waleed


Re: Kafka Streams and AWS IOPS Credits

2017-05-24 Thread Guozhang Wang
Thanks for sharing Ian!

Guozhang

On Wed, May 24, 2017 at 3:49 AM, Damian Guy  wrote:

> Thanks for reporting back Ian. Very useful information.
>
> Cheers,
> Damian
>
> On Wed, 24 May 2017 at 10:40 Ian Duffy  wrote:
>
> > Hi All,
> >
> > In the past, we experienced lots of problems with running Kafka Stream
> > Applications on AWS.
> >
> > We've seen issues with state locking, memory spiking to 100% and the
> > instance dying, very
> > slow startup on pulling down initial rocksdbs and so on
> >
> > Today we realised that the instances were experiencing such high
> throughput
> > that the IOPS
> > credits on our EBSs storing the rocks EBS were completely used up
> resulting
> > in very slow IO
> > and most of our issues.
> >
> > Sharing this as others might have the same problems.
> >
> > We were running on EBSs of 100GB which give 300 "provisioned" IOPS (3
> > iops per 1gb).
> > Most cost effective solution for us is to just increase the EBS size to
> get
> > more "provisioned"
> > IOPS. Alternative solutions are to use i3.* instances or IO1 EBS volumes
> > but they are expensive.
> >
> > Hope this helps somebody else,
> > Ian.
> >
>



-- 
-- Guozhang


Disks full after upgrading kafka version : 0.8.1.1 to 0.10.0.0

2017-05-24 Thread Milind Vaidya
In 24 hours the brokers started getting killed due to disk full.

The retention period is 48 hrs and with 0.8 disks used to fill ~65%

What is going wrong here ?

This is production system. I am reducing the retention for the time being
to 24 hrs.


Re: Kafka Authorization and ACLs Broken

2017-05-24 Thread Mike Marzo
I'm also having issues getting acls to work.  Out of intereat, are you
starting ur brokers with a jaas file, if so do u mind sharing the client
and server side jaas entries so I can validate what I'm doing.

mike marzo
908 209-4484

On May 24, 2017 10:54 AM, "Raghav"  wrote:

> Hi Rajini
>
> Thank you very much. It perfectly works.
>
> I think in my setup I was trying to use a CA (certificate authority) to
> sign the certificates from client and server, and then adding it to trust
> store and keystore. I think in that process, I may have messed something. I
> will try above config with a CA to sign certificates. Hopefully that would
> work too.
>
> Thanks a lot again.
>
> Raghav
>
>
>
>
> On Wed, May 24, 2017 at 7:04 AM, Rajini Sivaram 
> wrote:
>
> > Raghav/Darshan,
> >
> > Can you try these steps on a clean installation of Kafka? It works for
> me,
> > so hopefully it will work for you. And then you can adapt to your
> scenario.
> >
> > *Create keystores and truststores:*
> >
> > keytool -genkey -alias kafka -keystore server.keystore.jks -dname
> > "CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
> > -keypass server-key-password
> >
> > keytool -exportcert -file server-cert-file -keystore server.keystore.jks
> > -alias kafka -storepass server-keystore-password
> >
> > keytool -importcert -file server-cert-file -keystore
> server.truststore.jks
> > -alias kafka -storepass server-truststore-password -noprompt
> >
> > keytool -importcert -file server-cert-file -keystore
> client.truststore.jks
> > -alias kafkaclient -storepass client-truststore-password -noprompt
> >
> >
> > keytool -genkey -alias kafkaclient -keystore client.keystore.jks -dname
> > "CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
> > -keypass client-key-password
> >
> > keytool -exportcert -file client-cert-file -keystore client.keystore.jks
> > -alias kafkaclient -storepass client-keystore-password
> >
> > keytool -importcert -file client-cert-file -keystore
> server.truststore.jks
> > -alias kafkaclient -storepass server-truststore-password -noprompt
> >
> > *Configure broker: Add these lines at the end of your server.properties*
> >
> > listeners=SSL://:9093
> >
> > advertised.listeners=SSL://127.0.0.1:9093
> >
> > ssl.keystore.location=/tmp/acl/server.keystore.jks
> >
> > ssl.keystore.password=server-keystore-password
> >
> > ssl.key.password=server-key-password
> >
> > ssl.truststore.location=/tmp/acl/server.truststore.jks
> >
> > ssl.truststore.password=server-truststore-password
> >
> > security.inter.broker.protocol=SSL
> >
> > security.protocol=SSL
> >
> > ssl.client.auth=required
> >
> > allow.everyone.if.no.acl.found=false
> >
> > authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
> >
> > super.users=User:CN=KafkaBroker,O=Pivotal,C=UK
> >
> > *Configure producer: producer.properties*
> >
> > security.protocol=SSL
> >
> > ssl.truststore.location=/tmp/acl/client.truststore.jks
> >
> > ssl.truststore.password=client-truststore-password
> >
> > ssl.keystore.location=/tmp/acl/client.keystore.jks
> >
> > ssl.keystore.password=client-keystore-password
> >
> > ssl.key.password=client-key-password
> >
> >
> > *Configure consumer: consumer.properties*
> >
> > security.protocol=SSL
> >
> > ssl.truststore.location=/tmp/acl/client.truststore.jks
> >
> > ssl.truststore.password=client-truststore-password
> >
> > ssl.keystore.location=/tmp/acl/client.keystore.jks
> >
> > ssl.keystore.password=client-keystore-password
> >
> > ssl.key.password=client-key-password
> >
> > group.id=testgroup
> >
> > *Create topic:*
> >
> > bin/kafka-topics.sh  --zookeeper localhost --create --topic testtopic
> > --replication-factor 1 --partitions 1
> >
> >
> > *Configure ACLs:*
> >
> > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:
> 2181
> > --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK" --producer
> > --topic testtopic
> >
> > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:
> 2181
> > --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK" --consumer
> > --topic testtopic --group test group
> >
> >
> > *Run console producer and type in some messages:*
> >
> > bin/kafka-console-producer.sh  --producer.config
> > /tmp/acl/producer.properties --topic testtopic --broker-list
> > 127.0.0.1:9093
> >
> >
> > *Run console consumer, you should see messages from above:*
> >
> > bin/kafka-console-consumer.sh  --consumer.config
> > /tmp/acl/consumer.properties --topic testtopic --bootstrap-server
> > 127.0.0.1:9093 --from-beginning
> >
> >
> >
> > On Tue, May 23, 2017 at 12:57 PM, Raghav  wrote:
> >
> >> Darshan,
> >>
> >> I have not yet successfully gotten the ACLs to work in Kafka. I am still
> >> looking for help. I will update this email thread if I do find. In case
> >> you
> >> get it working, please let me know.
> >>
> >> Thanks.
> >>
> >> R
> >>
> >> On Tue, May 23, 2017 at 8:49 AM, Darshan Purandare <
> >> purandare.dars...@gmail.com> wrote:
> >>
> >>

Differece observed during kafka-producer-perf-test and JUnit producer test

2017-05-24 Thread Mohammed Manna
Hi,

I ran a JUnit test where i delivered a payload of ~5MB file to a Kafka
cluser (3 broker, 3 zookeeper). These are all setup on my laptop. My test
config for producer is the following:

max.request.size=5242880
> batch.size=8192
> key.serializer=org.apache.kafka.common.serialization.StringSerializer
> value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
> compression.type=gzip


Before I started the JUnit test I started all my kafka and zookeeper nodes,
so the leaders and followers are holding good. After this I started a kafka
consumer from command line which has the following consumer.properties:

zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
> group.id=test-consumer-group
> zookeeper.connection.timeout=6000


For Each broker I have the following config (broker id and log changed
correctly)

> #port numbers will be 9093 and 9094 for other 2 servers
> listeners=PLAINTEXT://localhost:9092
> broker.id=0
> listener.security.protocol.map=plaintext:plaintext,ssl;ssl
> num.network.threads=3
> num,io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=9
> log.dirs=/tmp/kafka-logs
> #the below partition info will be overwritten when I create topic with
> partition and
> #replication factor
> num.partitions=1
> compression.type=gzip
> delete.topic.enable=true


For each zookeeper (in my ensemble), I have the following (and respective
myid files are setup in the zookeeper logDir)

> dataDir=/tmp/zookeeper
> clientPort=2181
> autopurge.purgeInterval=120
> tickTime=5000
> initLimit=5
> syncLimit=2
> server.1=localhost:2666:3666
> server.2=localhost:2667:3667
> server.3=localhost:2668:3668


When i run my Junit test, I can see that my kafka-console-consumer is
displaying the file contents correctly, no issues. But when I run the the
kafka-producer-perf-test using the same producer config, i am getting the
following error

>
> >kafka-producer-perf-test.bat --topic testtopic11 --num-records 10
> --record-size 500 ---throughput -1 --producer.config
> C:/kafka/config/producer_metrics.properties
> org.apache.kafka.common.errors.RecordTooLargeException The request
> included a message larger than the max message size the server will accept.


yes I can see from the Kakfa doc area that message.max.bytes (default
100) definition:


The maximum size of a message that the server can receive. It is important
that this property be in sync with the maximum fetch size your consumers
use or else an unruly producer will be able to publish messages too large
for consumers to consume.
 If that's the case, how was it possible that I managed to receive 5 files
(transferring using JUnit Test) on kafka-console-consumer, but cannot do
testing when I specify record number and record size? Is there a
configuration that happens differently for this performance test?


Upgrade from 0.8.2-beta to 0.10.2.1 -- Configuration changes

2017-05-24 Thread Fernando Vega
I need some help regarding kafka.
Im moving from 0.8.2-beta to 0.10.2.1, but I cannot make the mirrormakers
to come up.

We are using same configs from old version on the newone, here is the
example of the config files:


*For Producer*

# Producer
# hkg1
zookeeper.connect=localhost:2181
metadata.broker.list=server1:9092,server2:9092
compression.codec=gzip
producer.type=asyncqueue.enqueue.timeout.ms=-1
request.required.acks=0
serializer.class=kafka.serializer.DefaultEncoder

*For Consumers*

#
# Cluster: repl
# Topic list(goes into command line): topicA,topicB
zookeeper.connect=localhost:2181
metadata.broker.list=server1:9092,server2:9092
zookeeper.connection.timeout.ms=3zookeeper.session.timeout.ms=3
group.id=testing_fernandoauto.commit.interval.ms=6
partition.assignment.strategy=roundrobinrebalance.backoff.ms=1
rebalance.max.retries=4
socket.receive.buffer.bytes=262144zookeeper.sync.time.ms=6000

*Server.properties:*

###
### This file is managed by Puppet.
###

# See http://kafka.apache.org/documentation.html#brokerconfigs for
default values.

# The id of the broker. This must be set to a unique integer for each
broker.broker.id=777

# The port the socket server listens on
port=9092

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-log-test

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.

zookeeper.connect=localhost:2181/testing_replication/kafka

# Additional configuration options may follow here
auto.leader.rebalance.enable=true
delete.topic.enable=true
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
default.replication.factor=2
auto.create.topics.enable=true
num.partitions=1
num.network.threads=8
num.io.threads=40
log.retention.hours=1
log.roll.hours=1
num.replica.fetchers=8zookeeper.connection.timeout.ms=3zookeeper.session.timeout.ms=3
inter.broker.protocol.version=0.8.2
log.message.format.version=0.8.2

I know some of key values have change from one version to the other so I
just want to get the equivalent keys to use and values to replicate this
config and make it work with our test.


Regards

[image: Turn] 

*Fernando Vega*
Sr. Operations Engineer
*cell* (415) 810-0242
901 Marshall St, Suite 200, Redwood City, CA 94063


turn.com    |   @TurnPlatform


This message is Turn Confidential, except for information included that is
already available to the public. If this message was sent to you
accidentally, please delete it.


Re: Loss of Messages

2017-05-24 Thread David Garcia
What is your in-sync timeout set to?

-David

On 5/24/17, 5:57 AM, "Vinayak Sharma"  wrote:

Hi,

I am running Kafka as a 2 node cluster.
When I am scaling up and down 1 kafka broker I am experiencing loss of
messages at consumer end during reassignment of partitions.
Do you know what might be the cause or how we can fix this issue at our end?
I tried to search about this issue but couldn't find anything.

Regards,
Vinayak.

-- 
 




How to list all consumer-groups & their offset using confluent-kafka-python or librdkafka?

2017-05-24 Thread Yuxiang Mai
Hi, all

We have upgraded Kafka from 0.8.2 to 0.10.2. In Kafka 0.8.2, we store
offset for each consumer-group in Zookeeper. We used Python API to get
offset for each consumer-group in ZK to monitor the lag for each Topic.

After upgraded to Kafka 0.10.2, we plan to store consumer-group offset in
Kafka itself. But we need to using Python API to monitor the consumer
offset. We need some Python API like the kafka-consumer-groups.sh tool to
list all consumer-group & show their offset one by one.

We plan to use confluent-kafka-python/librdkafka as our option. But we can
not find API to list all consumer groups. For the offset for each
consumer-group, we find it can use position method.

Here is our code:

consumer = confluent_kafka.Consumer(conf)
consumer.subscribe(['xxx'])
p = confluent_kafka.TopicPartition("xxx", 1)
print consumer.position([p])

result:

[TopicPartition{topic=elk_app,partition=1,offset=-1001,error=None}]


Seems it can not show the correct offset.

Any thing I did wrong?

In sum, Here is my 2 questions:

1. How to list all consumer-groups using How to list all
consumer-groups/librdkafka API for a topic?
2. How to get current offset for a consumer-group?


Thanks a lot.



-- 
Yuxiang Mai
Sun Yat-Sen Unitversity
State Key Lab of Optoelectronic
Materials and Technologies


Re: Streams error handling

2017-05-24 Thread Eno Thereska
Thanks Mike,

Most of the JIRAs there are internal cleanups, but for the user-facing ones 
we're planning on creating a wiki and collecting feedback like yours, and a 
KIP, so stay tuned (your current feedback already noted, thanks).

Eno
> On 24 May 2017, at 15:35, Mike Gould  wrote:
> 
> Watching it with interest thanks
> 
> Not sure where appropriate to add suggestions but I'd vote for exceptions
> being passed along the stream in something like a hidden Either wrapper.
> Most of the KStream methods would ignore this but new onException() or
> similar methods would be able to examine the error with the key/value prior
> to the error and handle it - possibly by replacing the message, sending a
> message to a new stream, or even putting it back on the original stream for
> retry.
> 
> Regards
> MikeG
> 
> On Wed, 24 May 2017 at 10:09, Eno Thereska  > wrote:
> 
>> Just a heads up that we're tracking this and other improvements in
>> exception handling at https://issues.apache.org/jira/browse/KAFKA-5156 
>>  <
>> https://issues.apache.org/jira/browse/KAFKA-5156 
>> >.
>> 
>> Thanks
>> Eno
>>> On 23 May 2017, at 17:31, Mike Gould  wrote:
>>> 
>>> That's great for the value but not the key
>>> 
>>> On Thu, 13 Apr 2017 at 18:27, Sachin Mittal  wrote:
>>> 
 We are also catching the exception in serde and returning null and then
 filtering out null values downstream so as they are not included.
 
 Thanks
 Sachin
 
 
 On Thu, Apr 13, 2017 at 9:13 PM, Mike Gould 
>> wrote:
 
> Great to know I've not gone off in the wrong direction
> Thanks
> 
> On Thu, 13 Apr 2017 at 16:34, Matthias J. Sax 
> wrote:
> 
>> Mike,
>> 
>> thanks for your feedback. You are absolutely right that Streams API
 does
>> not have great support for this atm. And it's very valuable that you
>> report this (you are not the first person). It helps us prioritizing
>> :)
>> 
>> For now, there is no better solution as the one you described in your
>> email, but its on our roadmap to improve the API -- and its priority
 got
>> just increase by your request.
>> 
>> I am sorry, that I can't give you a better answer right now :(
>> 
>> 
>> -Matthias
>> 
>> 
>> On 4/13/17 8:16 AM, Mike Gould wrote:
>>> Hi
>>> Are there any better error handling options for Kafka streams in
 java.
>>> 
>>> Any errors in the serdes will break the stream.  The suggested
>>> implementation is to use the byte[] serde and do the deserialisation
> in a
>>> map operation.  However this isn't ideal either as there's no great
 way
>> to
>>> handle exceptions.
>>> My current tactics are to use flatMap in place of map everywhere and
>> return
>>> empySet on error. Unfortunately this means the error has to be
 handled
>>> directly in the function where it happened and can only be handled
 as a
>>> side effect.
>>> 
>>> It seems to me that this could be done better. Maybe the *Mapper
>> interfaces
>>> could allow specific checked exceptions. These could be handled by
>> specific
>>> downstream KStream.mapException() steps which might e.g. Put an error
>>> response on another stream branch.
>>> Alternatively could it be made easier to return something like an
> Either
>>> from the Mappers with a the addition of few extra mapError or mapLeft
>>> mapRight methods on KStream?
>>> 
>>> Unless there's a better error handling pattern which I've entirely
>> missed?
>>> 
>>> Thanks
>>> MIkeG
>>> 
>> 
>> --
> - MikeG
> http://en.wikipedia.org/wiki/Common_misconceptions
> 
> 
 
>>> --
>>> - MikeG
>>> http://en.wikipedia.org/wiki/Common_misconceptions
>>> 
>> 
>> --
> - MikeG
> http://en.wikipedia.org/wiki/Common_misconceptions 
> 
>  >



How to stop the producer if I use the incorrect bootstrap.servers

2017-05-24 Thread 苏卫泉
Hello,
I am new to kafka and I found that if I use the incorrect bootstrap.server 
in producer, the kafka client seems that never give up althought the console  
show the exception below but no exception throws:
  
  Connection with /10.201.0.119 disconnected
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
at 
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
at java.lang.Thread.run(Thread.java:745)
[DEBUG] 2017-05-24 14:12:50,906 
method:org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:463)
Node -1 disconnected.

Would you like to tell me how to stop the producer when Connection timed 
out?
Thank you in advance:)


Re: Data loss after a Kafka Broker restart scenario

2017-05-24 Thread Jun Rao
Hi, Fathima,

Yes, the most efficient way to verify if a message is sent successfully is
through the producer callback. You can take a look at PrintInfoCallback in
org.apache.kafka.toolsVerifiableProducer as an example. Our system tests
use that to verify if any data loss has occurred.

Thanks,

Jun

On Mon, May 22, 2017 at 2:48 AM, Fathima Amara  wrote:

> Hi Jun,
>
> Do you mean by using CallBack mechanism? Since I am new to kafka would you
> mind directing me how to do it if it's not to be done using CallBack?
>
> Fathima.
>


Loss of Messages

2017-05-24 Thread Vinayak Sharma
Hi,

I am running Kafka as a 2 node cluster.
When I am scaling up and down 1 kafka broker I am experiencing loss of
messages at consumer end during reassignment of partitions.
Do you know what might be the cause or how we can fix this issue at our end?
I tried to search about this issue but couldn't find anything.

Regards,
Vinayak.

-- 
 


Re: Kafka Authorization and ACLs Broken

2017-05-24 Thread Raghav
Hi Rajini

Thank you very much. It perfectly works.

I think in my setup I was trying to use a CA (certificate authority) to
sign the certificates from client and server, and then adding it to trust
store and keystore. I think in that process, I may have messed something. I
will try above config with a CA to sign certificates. Hopefully that would
work too.

Thanks a lot again.

Raghav




On Wed, May 24, 2017 at 7:04 AM, Rajini Sivaram 
wrote:

> Raghav/Darshan,
>
> Can you try these steps on a clean installation of Kafka? It works for me,
> so hopefully it will work for you. And then you can adapt to your scenario.
>
> *Create keystores and truststores:*
>
> keytool -genkey -alias kafka -keystore server.keystore.jks -dname
> "CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
> -keypass server-key-password
>
> keytool -exportcert -file server-cert-file -keystore server.keystore.jks
> -alias kafka -storepass server-keystore-password
>
> keytool -importcert -file server-cert-file -keystore server.truststore.jks
> -alias kafka -storepass server-truststore-password -noprompt
>
> keytool -importcert -file server-cert-file -keystore client.truststore.jks
> -alias kafkaclient -storepass client-truststore-password -noprompt
>
>
> keytool -genkey -alias kafkaclient -keystore client.keystore.jks -dname
> "CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
> -keypass client-key-password
>
> keytool -exportcert -file client-cert-file -keystore client.keystore.jks
> -alias kafkaclient -storepass client-keystore-password
>
> keytool -importcert -file client-cert-file -keystore server.truststore.jks
> -alias kafkaclient -storepass server-truststore-password -noprompt
>
> *Configure broker: Add these lines at the end of your server.properties*
>
> listeners=SSL://:9093
>
> advertised.listeners=SSL://127.0.0.1:9093
>
> ssl.keystore.location=/tmp/acl/server.keystore.jks
>
> ssl.keystore.password=server-keystore-password
>
> ssl.key.password=server-key-password
>
> ssl.truststore.location=/tmp/acl/server.truststore.jks
>
> ssl.truststore.password=server-truststore-password
>
> security.inter.broker.protocol=SSL
>
> security.protocol=SSL
>
> ssl.client.auth=required
>
> allow.everyone.if.no.acl.found=false
>
> authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
>
> super.users=User:CN=KafkaBroker,O=Pivotal,C=UK
>
> *Configure producer: producer.properties*
>
> security.protocol=SSL
>
> ssl.truststore.location=/tmp/acl/client.truststore.jks
>
> ssl.truststore.password=client-truststore-password
>
> ssl.keystore.location=/tmp/acl/client.keystore.jks
>
> ssl.keystore.password=client-keystore-password
>
> ssl.key.password=client-key-password
>
>
> *Configure consumer: consumer.properties*
>
> security.protocol=SSL
>
> ssl.truststore.location=/tmp/acl/client.truststore.jks
>
> ssl.truststore.password=client-truststore-password
>
> ssl.keystore.location=/tmp/acl/client.keystore.jks
>
> ssl.keystore.password=client-keystore-password
>
> ssl.key.password=client-key-password
>
> group.id=testgroup
>
> *Create topic:*
>
> bin/kafka-topics.sh  --zookeeper localhost --create --topic testtopic
> --replication-factor 1 --partitions 1
>
>
> *Configure ACLs:*
>
> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181
> --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK" --producer
> --topic testtopic
>
> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181
> --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK" --consumer
> --topic testtopic --group test group
>
>
> *Run console producer and type in some messages:*
>
> bin/kafka-console-producer.sh  --producer.config
> /tmp/acl/producer.properties --topic testtopic --broker-list
> 127.0.0.1:9093
>
>
> *Run console consumer, you should see messages from above:*
>
> bin/kafka-console-consumer.sh  --consumer.config
> /tmp/acl/consumer.properties --topic testtopic --bootstrap-server
> 127.0.0.1:9093 --from-beginning
>
>
>
> On Tue, May 23, 2017 at 12:57 PM, Raghav  wrote:
>
>> Darshan,
>>
>> I have not yet successfully gotten the ACLs to work in Kafka. I am still
>> looking for help. I will update this email thread if I do find. In case
>> you
>> get it working, please let me know.
>>
>> Thanks.
>>
>> R
>>
>> On Tue, May 23, 2017 at 8:49 AM, Darshan Purandare <
>> purandare.dars...@gmail.com> wrote:
>>
>> > Raghav
>> >
>> > I saw few posts of yours around Kafka ACLs and the problems. I have seen
>> > similar issues where Writer has not been able to write to any topic. I
>> have
>> > seen "leader not available" and sometimes "unknown topic or partition",
>> and
>> > "topic_authorization_failed" error.
>> >
>> > Let me know if you find a valid config that works.
>> >
>> > Thanks.
>> >
>> >
>> >
>> > On Tue, May 23, 2017 at 8:44 AM, Raghav  wrote:
>> >
>> >> Hello Kafka Users
>> >>
>> >> I am a new Kafka user and trying to make Kafka SSL work with
>> Authorization
>> >> and ACLs. I followed posts f

Re: Streams error handling

2017-05-24 Thread Mike Gould
Watching it with interest thanks

Not sure where appropriate to add suggestions but I'd vote for exceptions
being passed along the stream in something like a hidden Either wrapper.
Most of the KStream methods would ignore this but new onException() or
similar methods would be able to examine the error with the key/value prior
to the error and handle it - possibly by replacing the message, sending a
message to a new stream, or even putting it back on the original stream for
retry.

Regards
MikeG

On Wed, 24 May 2017 at 10:09, Eno Thereska  wrote:

> Just a heads up that we're tracking this and other improvements in
> exception handling at https://issues.apache.org/jira/browse/KAFKA-5156 <
> https://issues.apache.org/jira/browse/KAFKA-5156>.
>
> Thanks
> Eno
> > On 23 May 2017, at 17:31, Mike Gould  wrote:
> >
> > That's great for the value but not the key
> >
> > On Thu, 13 Apr 2017 at 18:27, Sachin Mittal  wrote:
> >
> >> We are also catching the exception in serde and returning null and then
> >> filtering out null values downstream so as they are not included.
> >>
> >> Thanks
> >> Sachin
> >>
> >>
> >> On Thu, Apr 13, 2017 at 9:13 PM, Mike Gould 
> wrote:
> >>
> >>> Great to know I've not gone off in the wrong direction
> >>> Thanks
> >>>
> >>> On Thu, 13 Apr 2017 at 16:34, Matthias J. Sax 
> >>> wrote:
> >>>
>  Mike,
> 
>  thanks for your feedback. You are absolutely right that Streams API
> >> does
>  not have great support for this atm. And it's very valuable that you
>  report this (you are not the first person). It helps us prioritizing
> :)
> 
>  For now, there is no better solution as the one you described in your
>  email, but its on our roadmap to improve the API -- and its priority
> >> got
>  just increase by your request.
> 
>  I am sorry, that I can't give you a better answer right now :(
> 
> 
>  -Matthias
> 
> 
>  On 4/13/17 8:16 AM, Mike Gould wrote:
> > Hi
> > Are there any better error handling options for Kafka streams in
> >> java.
> >
> > Any errors in the serdes will break the stream.  The suggested
> > implementation is to use the byte[] serde and do the deserialisation
> >>> in a
> > map operation.  However this isn't ideal either as there's no great
> >> way
>  to
> > handle exceptions.
> > My current tactics are to use flatMap in place of map everywhere and
>  return
> > empySet on error. Unfortunately this means the error has to be
> >> handled
> > directly in the function where it happened and can only be handled
> >> as a
> > side effect.
> >
> > It seems to me that this could be done better. Maybe the *Mapper
>  interfaces
> > could allow specific checked exceptions. These could be handled by
>  specific
> > downstream KStream.mapException() steps which might e.g. Put an error
> > response on another stream branch.
> > Alternatively could it be made easier to return something like an
> >>> Either
> > from the Mappers with a the addition of few extra mapError or mapLeft
> > mapRight methods on KStream?
> >
> > Unless there's a better error handling pattern which I've entirely
>  missed?
> >
> > Thanks
> > MIkeG
> >
> 
>  --
> >>> - MikeG
> >>> http://en.wikipedia.org/wiki/Common_misconceptions
> >>> 
> >>>
> >>
> > --
> > - MikeG
> > http://en.wikipedia.org/wiki/Common_misconceptions
> > 
>
> --
 - MikeG
http://en.wikipedia.org/wiki/Common_misconceptions



Re: Kafka Authorization and ACLs Broken

2017-05-24 Thread Raghav
Rajini

I will try and report to you shortly. Many thanks.

Raghav

On Wed, May 24, 2017 at 7:04 AM, Rajini Sivaram 
wrote:

> Raghav/Darshan,
>
> Can you try these steps on a clean installation of Kafka? It works for me,
> so hopefully it will work for you. And then you can adapt to your scenario.
>
> *Create keystores and truststores:*
>
> keytool -genkey -alias kafka -keystore server.keystore.jks -dname
> "CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
> -keypass server-key-password
>
> keytool -exportcert -file server-cert-file -keystore server.keystore.jks
> -alias kafka -storepass server-keystore-password
>
> keytool -importcert -file server-cert-file -keystore server.truststore.jks
> -alias kafka -storepass server-truststore-password -noprompt
>
> keytool -importcert -file server-cert-file -keystore client.truststore.jks
> -alias kafkaclient -storepass client-truststore-password -noprompt
>
>
> keytool -genkey -alias kafkaclient -keystore client.keystore.jks -dname
> "CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
> -keypass client-key-password
>
> keytool -exportcert -file client-cert-file -keystore client.keystore.jks
> -alias kafkaclient -storepass client-keystore-password
>
> keytool -importcert -file client-cert-file -keystore server.truststore.jks
> -alias kafkaclient -storepass server-truststore-password -noprompt
>
> *Configure broker: Add these lines at the end of your server.properties*
>
> listeners=SSL://:9093
>
> advertised.listeners=SSL://127.0.0.1:9093
>
> ssl.keystore.location=/tmp/acl/server.keystore.jks
>
> ssl.keystore.password=server-keystore-password
>
> ssl.key.password=server-key-password
>
> ssl.truststore.location=/tmp/acl/server.truststore.jks
>
> ssl.truststore.password=server-truststore-password
>
> security.inter.broker.protocol=SSL
>
> security.protocol=SSL
>
> ssl.client.auth=required
>
> allow.everyone.if.no.acl.found=false
>
> authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
>
> super.users=User:CN=KafkaBroker,O=Pivotal,C=UK
>
> *Configure producer: producer.properties*
>
> security.protocol=SSL
>
> ssl.truststore.location=/tmp/acl/client.truststore.jks
>
> ssl.truststore.password=client-truststore-password
>
> ssl.keystore.location=/tmp/acl/client.keystore.jks
>
> ssl.keystore.password=client-keystore-password
>
> ssl.key.password=client-key-password
>
>
> *Configure consumer: consumer.properties*
>
> security.protocol=SSL
>
> ssl.truststore.location=/tmp/acl/client.truststore.jks
>
> ssl.truststore.password=client-truststore-password
>
> ssl.keystore.location=/tmp/acl/client.keystore.jks
>
> ssl.keystore.password=client-keystore-password
>
> ssl.key.password=client-key-password
>
> group.id=testgroup
>
> *Create topic:*
>
> bin/kafka-topics.sh  --zookeeper localhost --create --topic testtopic
> --replication-factor 1 --partitions 1
>
>
> *Configure ACLs:*
>
> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181
> --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK" --producer
> --topic testtopic
>
> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181
> --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK" --consumer
> --topic testtopic --group test group
>
>
> *Run console producer and type in some messages:*
>
> bin/kafka-console-producer.sh  --producer.config
> /tmp/acl/producer.properties --topic testtopic --broker-list
> 127.0.0.1:9093
>
>
> *Run console consumer, you should see messages from above:*
>
> bin/kafka-console-consumer.sh  --consumer.config
> /tmp/acl/consumer.properties --topic testtopic --bootstrap-server
> 127.0.0.1:9093 --from-beginning
>
>
>
> On Tue, May 23, 2017 at 12:57 PM, Raghav  wrote:
>
>> Darshan,
>>
>> I have not yet successfully gotten the ACLs to work in Kafka. I am still
>> looking for help. I will update this email thread if I do find. In case
>> you
>> get it working, please let me know.
>>
>> Thanks.
>>
>> R
>>
>> On Tue, May 23, 2017 at 8:49 AM, Darshan Purandare <
>> purandare.dars...@gmail.com> wrote:
>>
>> > Raghav
>> >
>> > I saw few posts of yours around Kafka ACLs and the problems. I have seen
>> > similar issues where Writer has not been able to write to any topic. I
>> have
>> > seen "leader not available" and sometimes "unknown topic or partition",
>> and
>> > "topic_authorization_failed" error.
>> >
>> > Let me know if you find a valid config that works.
>> >
>> > Thanks.
>> >
>> >
>> >
>> > On Tue, May 23, 2017 at 8:44 AM, Raghav  wrote:
>> >
>> >> Hello Kafka Users
>> >>
>> >> I am a new Kafka user and trying to make Kafka SSL work with
>> Authorization
>> >> and ACLs. I followed posts from Kafka and Confluent docs exactly to the
>> >> point but my producer cannot write to kafka broker. I get
>> >> "LEADER_NOT_FOUND" errors. And even Consumer throws the same errors.
>> >>
>> >> Can someone please share their config which worked with ACLs.
>> >>
>> >> Here is my config. Please help.
>> >>
>> >> server.prope

Re: Kafka Authorization and ACLs Broken

2017-05-24 Thread Rajini Sivaram
Raghav/Darshan,

Can you try these steps on a clean installation of Kafka? It works for me,
so hopefully it will work for you. And then you can adapt to your scenario.

*Create keystores and truststores:*

keytool -genkey -alias kafka -keystore server.keystore.jks -dname
"CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
-keypass server-key-password

keytool -exportcert -file server-cert-file -keystore server.keystore.jks
-alias kafka -storepass server-keystore-password

keytool -importcert -file server-cert-file -keystore server.truststore.jks
-alias kafka -storepass server-truststore-password -noprompt

keytool -importcert -file server-cert-file -keystore client.truststore.jks
-alias kafkaclient -storepass client-truststore-password -noprompt


keytool -genkey -alias kafkaclient -keystore client.keystore.jks -dname
"CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
-keypass client-key-password

keytool -exportcert -file client-cert-file -keystore client.keystore.jks
-alias kafkaclient -storepass client-keystore-password

keytool -importcert -file client-cert-file -keystore server.truststore.jks
-alias kafkaclient -storepass server-truststore-password -noprompt

*Configure broker: Add these lines at the end of your server.properties*

listeners=SSL://:9093

advertised.listeners=SSL://127.0.0.1:9093

ssl.keystore.location=/tmp/acl/server.keystore.jks

ssl.keystore.password=server-keystore-password

ssl.key.password=server-key-password

ssl.truststore.location=/tmp/acl/server.truststore.jks

ssl.truststore.password=server-truststore-password

security.inter.broker.protocol=SSL

security.protocol=SSL

ssl.client.auth=required

allow.everyone.if.no.acl.found=false

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

super.users=User:CN=KafkaBroker,O=Pivotal,C=UK

*Configure producer: producer.properties*

security.protocol=SSL

ssl.truststore.location=/tmp/acl/client.truststore.jks

ssl.truststore.password=client-truststore-password

ssl.keystore.location=/tmp/acl/client.keystore.jks

ssl.keystore.password=client-keystore-password

ssl.key.password=client-key-password


*Configure consumer: consumer.properties*

security.protocol=SSL

ssl.truststore.location=/tmp/acl/client.truststore.jks

ssl.truststore.password=client-truststore-password

ssl.keystore.location=/tmp/acl/client.keystore.jks

ssl.keystore.password=client-keystore-password

ssl.key.password=client-key-password

group.id=testgroup

*Create topic:*

bin/kafka-topics.sh  --zookeeper localhost --create --topic testtopic
--replication-factor 1 --partitions 1


*Configure ACLs:*

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181
--add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK" --producer
--topic testtopic

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181
--add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK" --consumer
--topic testtopic --group test group


*Run console producer and type in some messages:*

bin/kafka-console-producer.sh  --producer.config
/tmp/acl/producer.properties --topic testtopic --broker-list 127.0.0.1:9093


*Run console consumer, you should see messages from above:*

bin/kafka-console-consumer.sh  --consumer.config
/tmp/acl/consumer.properties --topic testtopic --bootstrap-server
127.0.0.1:9093 --from-beginning



On Tue, May 23, 2017 at 12:57 PM, Raghav  wrote:

> Darshan,
>
> I have not yet successfully gotten the ACLs to work in Kafka. I am still
> looking for help. I will update this email thread if I do find. In case you
> get it working, please let me know.
>
> Thanks.
>
> R
>
> On Tue, May 23, 2017 at 8:49 AM, Darshan Purandare <
> purandare.dars...@gmail.com> wrote:
>
> > Raghav
> >
> > I saw few posts of yours around Kafka ACLs and the problems. I have seen
> > similar issues where Writer has not been able to write to any topic. I
> have
> > seen "leader not available" and sometimes "unknown topic or partition",
> and
> > "topic_authorization_failed" error.
> >
> > Let me know if you find a valid config that works.
> >
> > Thanks.
> >
> >
> >
> > On Tue, May 23, 2017 at 8:44 AM, Raghav  wrote:
> >
> >> Hello Kafka Users
> >>
> >> I am a new Kafka user and trying to make Kafka SSL work with
> Authorization
> >> and ACLs. I followed posts from Kafka and Confluent docs exactly to the
> >> point but my producer cannot write to kafka broker. I get
> >> "LEADER_NOT_FOUND" errors. And even Consumer throws the same errors.
> >>
> >> Can someone please share their config which worked with ACLs.
> >>
> >> Here is my config. Please help.
> >>
> >> server.properties config
> >> 
> >> 
> >> broker.id=0
> >> auto.create.topics.enable=true
> >> delete.topic.enable=true
> >>
> >> listeners=PLAINTEXT://kafka1.example.com:9092
> >> ,SSL://kafka1.example.com:9093
> >> 

Kafka Read Data from All Partition Using Key or Timestamp

2017-05-24 Thread SenthilKumar K
Hi All ,  We have been using Kafka for our Use Case which helps in
delivering real time raw logs.. I have a requirement to fetch data from
Kafka by using offset ..

DataSet Example :
{"access_date":"2017-05-24
13:57:45.044","format":"json","start":"1490296463.031"}
{"access_date":"2017-05-24
13:57:46.044","format":"json","start":"1490296463.031"}
{"access_date":"2017-05-24
13:57:47.044","format":"json","start":"1490296463.031"}
{"access_date":"2017-05-24
13:58:02.042","format":"json","start":"1490296463.031"}

Above JSON data will be stored in Kafka..

Key --> acces_date in epoch format
Value --> whole JSON.

Data Access Pattern:
  1) Get me last 2 minz data ?
   2) Get me records between 2017-05-24 13:57:42:00 to 2017-05-24
13:57:44:00 ?

How to achieve this in Kafka ?

I tried using SimpleConsumer , but it expects partition and not sure
SimpleConsumer would match our requirement...

Appreciate you help !

Cheers,
Senthil


Kafka Lag Monitor

2017-05-24 Thread Ian Duffy
Hi All,

In the past my team was using burrow by linkedin for monitoring consumer
group lag. We discovered it crashed a lot and we found ourselves constantly
restarting it.

As a fix, a co-worker has created his own solution that I would like to
share with you https://github.com/zalando-incubator/remora

It effectively just exposes the output of kafka-consume-group --describe
--group  over http.

Hope this project is useful for you, all feedback is welcome :)
Ian.


Re: Kafka Streams and AWS IOPS Credits

2017-05-24 Thread Damian Guy
Thanks for reporting back Ian. Very useful information.

Cheers,
Damian

On Wed, 24 May 2017 at 10:40 Ian Duffy  wrote:

> Hi All,
>
> In the past, we experienced lots of problems with running Kafka Stream
> Applications on AWS.
>
> We've seen issues with state locking, memory spiking to 100% and the
> instance dying, very
> slow startup on pulling down initial rocksdbs and so on
>
> Today we realised that the instances were experiencing such high throughput
> that the IOPS
> credits on our EBSs storing the rocks EBS were completely used up resulting
> in very slow IO
> and most of our issues.
>
> Sharing this as others might have the same problems.
>
> We were running on EBSs of 100GB which give 300 "provisioned" IOPS (3
> iops per 1gb).
> Most cost effective solution for us is to just increase the EBS size to get
> more "provisioned"
> IOPS. Alternative solutions are to use i3.* instances or IO1 EBS volumes
> but they are expensive.
>
> Hope this helps somebody else,
> Ian.
>


Kafka Streams and AWS IOPS Credits

2017-05-24 Thread Ian Duffy
Hi All,

In the past, we experienced lots of problems with running Kafka Stream
Applications on AWS.

We've seen issues with state locking, memory spiking to 100% and the
instance dying, very
slow startup on pulling down initial rocksdbs and so on

Today we realised that the instances were experiencing such high throughput
that the IOPS
credits on our EBSs storing the rocks EBS were completely used up resulting
in very slow IO
and most of our issues.

Sharing this as others might have the same problems.

We were running on EBSs of 100GB which give 300 "provisioned" IOPS (3
iops per 1gb).
Most cost effective solution for us is to just increase the EBS size to get
more "provisioned"
IOPS. Alternative solutions are to use i3.* instances or IO1 EBS volumes
but they are expensive.

Hope this helps somebody else,
Ian.


Re: Streams error handling

2017-05-24 Thread Eno Thereska
Just a heads up that we're tracking this and other improvements in exception 
handling at https://issues.apache.org/jira/browse/KAFKA-5156 
.

Thanks
Eno
> On 23 May 2017, at 17:31, Mike Gould  wrote:
> 
> That's great for the value but not the key
> 
> On Thu, 13 Apr 2017 at 18:27, Sachin Mittal  wrote:
> 
>> We are also catching the exception in serde and returning null and then
>> filtering out null values downstream so as they are not included.
>> 
>> Thanks
>> Sachin
>> 
>> 
>> On Thu, Apr 13, 2017 at 9:13 PM, Mike Gould  wrote:
>> 
>>> Great to know I've not gone off in the wrong direction
>>> Thanks
>>> 
>>> On Thu, 13 Apr 2017 at 16:34, Matthias J. Sax 
>>> wrote:
>>> 
 Mike,
 
 thanks for your feedback. You are absolutely right that Streams API
>> does
 not have great support for this atm. And it's very valuable that you
 report this (you are not the first person). It helps us prioritizing :)
 
 For now, there is no better solution as the one you described in your
 email, but its on our roadmap to improve the API -- and its priority
>> got
 just increase by your request.
 
 I am sorry, that I can't give you a better answer right now :(
 
 
 -Matthias
 
 
 On 4/13/17 8:16 AM, Mike Gould wrote:
> Hi
> Are there any better error handling options for Kafka streams in
>> java.
> 
> Any errors in the serdes will break the stream.  The suggested
> implementation is to use the byte[] serde and do the deserialisation
>>> in a
> map operation.  However this isn't ideal either as there's no great
>> way
 to
> handle exceptions.
> My current tactics are to use flatMap in place of map everywhere and
 return
> empySet on error. Unfortunately this means the error has to be
>> handled
> directly in the function where it happened and can only be handled
>> as a
> side effect.
> 
> It seems to me that this could be done better. Maybe the *Mapper
 interfaces
> could allow specific checked exceptions. These could be handled by
 specific
> downstream KStream.mapException() steps which might e.g. Put an error
> response on another stream branch.
> Alternatively could it be made easier to return something like an
>>> Either
> from the Mappers with a the addition of few extra mapError or mapLeft
> mapRight methods on KStream?
> 
> Unless there's a better error handling pattern which I've entirely
 missed?
> 
> Thanks
> MIkeG
> 
 
 --
>>> - MikeG
>>> http://en.wikipedia.org/wiki/Common_misconceptions
>>> 
>>> 
>> 
> -- 
> - MikeG
> http://en.wikipedia.org/wiki/Common_misconceptions
> 



Re: Question regarding buffer.memory, max.request.size and send.buffer.bytes

2017-05-24 Thread Mohammed Manna
As the document suggest, you should keep it same as your max.request.size.
And the documentation clearly says that the system will throw exception if
you are producing faster than you can send, and kept blocking until
max.block.ms is reached.



On 24 May 2017 at 00:51, Milind Vaidya  wrote:

> I am looking for Producer tuning as mentioned in the mail, all the
> properties are related to producer config.
>
> This is where the property is mentioned :
> https://kafka.apache.org/0100/documentation.html#producerconfigs
>
> Consumer in this case if KafkaSpout from Apache-Storm.
>
>
>
>
>
> On Tue, May 23, 2017 at 3:28 PM, Mohammed Manna 
> wrote:
>
> >  This could be for various reasons:
> >
> > 1) Your consumer.property settings - if you have not been acknowledging
> > automatically, you need to provide a sufficient polling time and commit
> in
> > sync/async.
> > 2) You are not consuming the messages how you think.
> >
> > I don't know how you got this buffer.memory property. Doesn't sound
> right,
> > could you kindly check this again? Also, could you please provide a
> snippet
> > of your Consumer and how you are reading from the stream?
> >
> > By default, the buffer is about 10% of the message.max.bytes. Perhaps you
> > are looking for a Producer tuning by using the following:
> >
> > batch.size
> > message.max.bytes
> > send.buffer.bytes
> > Cloudtera and Confluent.io have some nice articles on Kafka. Have a read
> > through this
> > https://www.cloudera.com/documentation/kafka/latest/
> > topics/kafka_performance.html
> >
> >
> >
> > On 23 May 2017 at 20:09, Milind Vaidya  wrote:
> >
> > > I have set the producer properties as follows (0.10.0.0)
> > >
> > > *"linger.ms "** : **"500"** ,*
> > >
> > >  *"batch.size"** : **"1000"**,*
> > >
> > > *"buffer.memory"** :**"**1**"**,*
> > >
> > >  *"send.buffer.bytes"** : **"512000"*
> > >
> > > *and default *
> > >
> > > * max.request.size = *1048576
> > >
> > >
> > >  If records are sent faster than they can be delivered, they will be
> > > buffered. Now with buffer.memory having *1 *bytes value, if a
> record
> > > has
> > >  more size than this what will happen ? say 11629 bytes in size. What
> is
> > > the minimum value of buffer.memory in terms of other params ? Should it
> > be
> > > atleast equal to *send.buffer.bytes or **max.request.size or* better
> left
> > > to default which is 33554432 ?
> > >
> > > I am trying to debug some events not reaching consumer, so wondering if
> > > this could be the reason.
> > >
> >
>


Re: Messages are repeating in kafka

2017-05-24 Thread Abhimanyu Nagrath
@Kant
Actually, after setting that configuration I deleted a topic having 100
partitions. After that, I checked through ./kafka-topic.sh --list command.
It was not showing that topic. Again I started producers and ran the
following command

*./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe
--group *

and got the following output:

*Note: This will only show information about consumers that use the Java
consumer API (non-ZooKeeper-based consumers).*

*[2017-05-24 07:17:48,444] WARN Error while fetching metadata with
correlation id 1 : {topicName=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)*

The same describe groups command is working fine for other groups.

So I am not able to get what can be the reason for such error.and how to
fix it.


Regards,
Abhimanyu




On Wed, May 24, 2017 at 12:27 PM, kant kodali  wrote:

> @Abhimanyu
>
> 1) My guess is that topic offsets will remain for 30 days since that is the
> configuration you are explicitly setting and Kafka should respect that
> although I don't know for sure.
>
> 2) same as #1 offsets should remain to whatever time you specify.
>
> What is the problem  with setting offsets.retention.minutes ==
> log.retention.hours
> ? Did it fix the problem you were facing before?
>
>
> On Tue, May 23, 2017 at 11:40 PM, Abhimanyu Nagrath <
> abhimanyunagr...@gmail.com> wrote:
>
> > Hi Kant,
> >
> > After setting this configuration offsets.retention.minutes . I am in
> doubt
> > about the two things
> >  1. If I am deleting a topic will that topic offsets would also get
> deleted
> > or will they present for 30 days?
> >  2. What will happen if for some topics my log.retention.hours = 168 and
> > offsets.retention.minutes= 1440 * 30 ?
> >
> > Regards,
> > Abhimanyu
> >
> > On Mon, May 22, 2017 at 4:09 PM, Abhimanyu Nagrath <
> > abhimanyunagr...@gmail.com> wrote:
> >
> > > @Kant I was going through the offset related configurations before
> > setting
> > > offsets.retention.minutes so came accross this configuration and
> thought
> > to
> > > ask whether this should also be tuned or not.
> > >
> > >
> > > Regards,
> > > Abhimanyu
> > >
> > >
> > >
> > >
> > > On Mon, May 22, 2017 at 2:24 PM, kant kodali 
> wrote:
> > >
> > >> @Abhimanyu Why do you think you need to set that? Did you try setting
> > >> offsets.retention.minutes
> > >> = 1440 * 30 and still seeing duplicates?
> > >>
> > >> On Mon, May 22, 2017 at 12:37 AM, Abhimanyu Nagrath <
> > >> abhimanyunagr...@gmail.com> wrote:
> > >>
> > >> > Hi Girish ,
> > >> >
> > >> > Do I need to tune this configuration offsets.retention.check.interv
> > >> al.ms
> > >> > also . Please let me know if I need to tune any other configuration.
> > >> >
> > >> >
> > >> > Regards,
> > >> > Abhimanyu
> > >> >
> > >> > On Sun, May 21, 2017 at 8:01 PM, Girish Aher 
> > >> wrote:
> > >> >
> > >> > > Yup, exactly as Kant said.
> > >> > > Also make sure that the retention of the offsets topic is an upper
> > >> bound
> > >> > > across all topics. So in this case, don't create any other topics
> in
> > >> the
> > >> > > future with retention of more than 30 days or otherwise they may
> > have
> > >> the
> > >> > > same problem too.
> > >> > >
> > >> > > On May 21, 2017 03:25, "Abhimanyu Nagrath" <
> > >> abhimanyunagr...@gmail.com>
> > >> > > wrote:
> > >> > >
> > >> > >> Hi Kant,
> > >> > >>
> > >> > >> Thanks for the suggestion.
> > >> > >>
> > >> > >>
> > >> > >> Regards,
> > >> > >> Abhimanyu
> > >> > >>
> > >> > >> On Sun, May 21, 2017 at 3:44 PM, kant kodali  >
> > >> > wrote:
> > >> > >>
> > >> > >>> @Abhimanyu You can try setting offset.retention = 30
> > >> (log.retention).
> > >> > At
> > >> > >>> most, you will have a storage overhead of 5 million msgs per
> day *
> > >> 30
> > >> > >>> (days) * 8 bytes (for each offset) = 1.2GB (not that much since
> > you
> > >> > have
> > >> > >>> a
> > >> > >>> TB of hard disk)
> > >> > >>>
> > >> > >>> On Sun, May 21, 2017 at 3:05 AM, kant kodali <
> kanth...@gmail.com>
> > >> > wrote:
> > >> > >>>
> > >> > >>> > Looking at that ticket and reading the comments it looks like
> > one
> > >> of
> > >> > >>> the
> > >> > >>> > concern is as follows.
> > >> > >>> >
> > >> > >>> > "offsets.retention.minutes is designed to handle the case
> that a
> > >> > >>> consumer
> > >> > >>> > group goes away forever. In that case, we don't want to store
> > the
> > >> > >>> offsets
> > >> > >>> > for that group forever."
> > >> > >>> >
> > >> > >>> > This can simply be addressed by setting offset.retention ==
> > >> > >>> log.retention
> > >> > >>> > by default right? In which case offset wont be stored forever
> > even
> > >> > when
> > >> > >>> > consumer group goes away forever. When the consumer group goes
> > >> away
> > >> > >>> forever
> > >> > >>> > the upper bound to clean up offsets would be equal to
> > >> log.retention.
> > >> > >>> >
> > >> > >>> >
> > >> > >>> >
> > >> > >>> > On Sun, May 21, 2017 at 2:19 AM, kant kodali <
> > kanth...@gmail.com>
> > >>