Re: How to enable Kafka Authentication as on-demand feature and later make mandatory feature?

2018-09-19 Thread Manikumar
One option is to enable multiple listeners (PLAINTEXT, SASL_PLAINTEXT)  on
brokers and slowly migrate the clients/services.

On Wed, Sep 19, 2018 at 10:38 PM Ashwin Sinha 
wrote:

> Hi,
>
> We have tried setting up Kafka Authentication by SASL/PLAIN
>  and
> SASL/SCRAM
>  (both
> w/o SSL). We found that SASL/SCRAM is more convenient as we do not need to
> restart Kafka/Zookeeper again and again on adding new credentials.
>
> Problem: if we enable cluster-wide authentication by adding broker
> parameters
> <
> https://kafka.apache.org/10/documentation.html#security_sasl_scram_brokerconfig
> >
> then existing consumers will stop getting the data until they use
> authentication
> properties file
> <
> https://kafka.apache.org/10/documentation.html#security_sasl_scram_clientconfig
> >
> at their end. Is there any way/mode/config where we can start it as an
> on-demand feature and slowly change to required(mandatory) feature? Because
> this sudden change can affect many crucial services and jobs.
>
> --
> *Ashwin Sinha *| Data Engineer
> ashwin.si...@go-mmt.com  | 9452075361
>  
> 
>
> --
>
>
> ::DISCLAIMER::
>
>
>
> 
>
>
>
>
>
> This message is intended only for the use of the addressee and may
> contain information that is privileged, confidential and exempt from
> disclosure under applicable law. If the reader of this message is not the
> intended recipient, or the employee or agent responsible for delivering
> the
> message to the intended recipient, you are hereby notified that any
> dissemination, distribution or copying of this communication is strictly
> prohibited. If you have received this e-mail in error, please notify us
> immediately by return e-mail and delete this e-mail and all attachments
> from your system.
>


Re: Does kafka support the delay queue

2018-09-19 Thread Subash Konar
Approach 1: Yes it can be.You have to create a dedicated topic for the same
and I think the limitation will be all the messages to be processed should
have same delay time in that particular topic and use decision making along
with some loop for the messages.

Approach 2:I believe the feature in versions above 0.10+ by using the new
timestamp per message.

Approach 3:Store the messages in RDBMS using indexing and put these
messages as per their execution time in topic.You can refer here

for
more details


-- 
Thanks and Regards,
Subash Konar

On Thu, Sep 20, 2018 at 7:31 AM lx  wrote:

> Hi,kafka.apache.org
> Does kafka support delay queues?how can kafka implement delay queues?
> I hope my doubts can be answered.thanks
>
>
> Sincerely
> Lx



-- 
Thanks and Regards,
Subash Konar


Does kafka support the delay queue

2018-09-19 Thread lx
Hi,kafka.apache.org
Does kafka support delay queues?how can kafka implement delay queues?
I hope my doubts can be answered.thanks


Sincerely
Lx

Re: Understanding default.deserialization.exception.handler

2018-09-19 Thread Matthias J. Sax
Thanks for reporting!

It might have been an unknown issue. The community relies on reports
like this, so we really appreciate that you reached out!


-Matthias

On 9/19/18 1:08 AM, Tim Ward wrote:
> Thanks. No, it's not that big a deal now that I understand it, but as I'd had 
> to spend a fair amount of time working out what was going on I thought I'd 
> flag it up in case it wasn't known about.
> 
> Tim Ward
> 
> -Original Message-
> From: Matthias J. Sax 
> Sent: 14 September 2018 19:57
> To: users@kafka.apache.org
> Subject: Re: Understanding default.deserialization.exception.handler
> 
> Your observation is correct. It's a known bug:
> https://issues.apache.org/jira/browse/KAFKA-6502
> 
> In practice, it should not be a big issue though.
>  - you would only hit this bug if you don't process a "good message"
> afterwards
>  - even if you hit this bug, you would just skip the message again
> 
> Thus, the only drawback I see is an additional log message. As long as
> you don't have many _consecutive_ corrupted messages it should be impact
> you much.
> 
> Hope this helps.
> 
> 
> -Matthias
> 
> On 9/13/18 6:09 AM, Tim Ward wrote:
>> With
>>
>> 
>> props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
>>  LogAndContinueExceptionHandler.class);
>>
>> Scenario A:
>>
>> Run application. Feed a message into the topic that will fail 
>> deserialization.
>> Application logs exception and keeps running.
>> Shut down application. Restart application.
>> Application re-reads broken message, logs exception again (and keeps 
>> running).
>>
>> Scenario B:
>>
>> Run application. Feed a message into the topic that will fail 
>> deserialization.
>> Application logs exception and keeps running.
>> Feed a good message into deserialization.
>> Application processes it normally.
>> Shut down application. Restart application.
>> Application does *not* re-reads broken message.
>>
>> So it looks like LogAndContinueExceptionHandler does not seem to commit() 
>> the incoming "poison pill" message(s), and these will be re-read if the 
>> application is restarted without any good messages having been read after 
>> the bad ones.
>>
>> Have I understood this correctly? If so, is this correct behaviour as 
>> designed? Is it documented to that level of detail?
>>
>> Tim Ward
>>
>> The contents of this email and any attachment are confidential to the 
>> intended recipient(s). If you are not an intended recipient: (i) do not use, 
>> disclose, distribute, copy or publish this email or its contents; (ii) 
>> please contact the sender immediately; and (iii) delete this email. Our 
>> privacy policy is available here: https://origamienergy.com/privacy-policy/. 
>> Origami Energy Limited (company number 8619644); Origami Storage Limited 
>> (company number 10436515) and OSSPV001 Limited (company number 10933403), 
>> each registered in England and each with a registered office at: Ashcombe 
>> Court, Woolsack Way, Godalming, GU7 1LQ.
>>
> 
> The contents of this email and any attachment are confidential to the 
> intended recipient(s). If you are not an intended recipient: (i) do not use, 
> disclose, distribute, copy or publish this email or its contents; (ii) please 
> contact the sender immediately; and (iii) delete this email. Our privacy 
> policy is available here: https://origamienergy.com/privacy-policy/. Origami 
> Energy Limited (company number 8619644); Origami Storage Limited (company 
> number 10436515) and OSSPV001 Limited (company number 10933403), each 
> registered in England and each with a registered office at: Ashcombe Court, 
> Woolsack Way, Godalming, GU7 1LQ.
> 



signature.asc
Description: OpenPGP digital signature


Re: Questions about manage offset in external storage and consumer failure detect

2018-09-19 Thread Matthias J. Sax
1. If you don't have a good reason to store offsets externally, I would
not recommend it, but use the client's built-in mechanism. It will be
more work (ie, code you need to write) if you store offsets externally.

For some use-cases, it's beneficial to store the offsets in an external
system to get exaclty-once delivery semantics. But it depends on the use
case if you can apply this pattern.



2.
> "If consumer not call another poll() after max.poll.interval.ms, it will be 
> remove from consumer group."

I think it should be "within" not "after" (can you link to the docs? --
I could not find the snipped you quote)

> And the only active approach for consumer to detect failures, is to call 
> consumer.commit() and try catch CommitFailedException

Not sure what you mean by this? Can you elaborate?

For the blocking poll() issue, this is addressed via
https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior
(available in version 2.0)


-Matthias


On 9/18/18 7:58 PM,  wrote:
> Hi,
> 
> I have two questions about Kafka consumer api:
> 
> 
> 1. I read the section "Storing Offsets Outside Kafka" in jdoc of 
> KafkaConsumer. Is it safer or more efficient to manage offset with external 
> storage compared to manage with Kafka(Zookeeper or topic _consumer_offset)? 
> Or is there any benefit for system safety or robustness, when network issue 
> occur or consumer crash? 
> 
> 
> 2. I read the section "Detecting Consumer Failures" in jdoc of 
> KafkaConsumder, and got bellow points:
> - If consumer not call another poll() after max.poll.interval.ms, it will be 
> remove from consumer group. 
> - And the only active approach for consumer to detect failures, is to call 
> consumer.commit() and try catch CommitFailedException .
> Sometimes I found that consumer hanging at method poll() when network issue 
> occur or rebalance failure, and no exception throw from poll(). How do we 
> detect that failure from consumer side? I refer the old mail  "[Kafka-users] 
> How are rebalance failures raised to consumers?" and it seems no approach.
> 
> 
> Thanks,
> Ruiping Li
> 



signature.asc
Description: OpenPGP digital signature


How to enable Kafka Authentication as on-demand feature and later make mandatory feature?

2018-09-19 Thread Ashwin Sinha
Hi,

We have tried setting up Kafka Authentication by SASL/PLAIN
 and
SASL/SCRAM
 (both
w/o SSL). We found that SASL/SCRAM is more convenient as we do not need to
restart Kafka/Zookeeper again and again on adding new credentials.

Problem: if we enable cluster-wide authentication by adding broker
parameters

then existing consumers will stop getting the data until they use
authentication
properties file

at their end. Is there any way/mode/config where we can start it as an
on-demand feature and slowly change to required(mandatory) feature? Because
this sudden change can affect many crucial services and jobs.

-- 
*Ashwin Sinha *| Data Engineer
ashwin.si...@go-mmt.com  | 9452075361
 


-- 


::DISCLAIMER::








This message is intended only for the use of the addressee and may 
contain information that is privileged, confidential and exempt from 
disclosure under applicable law. If the reader of this message is not the 
intended recipient, or the employee or agent responsible for delivering the 
message to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited. If you have received this e-mail in error, please notify us 
immediately by return e-mail and delete this e-mail and all attachments 
from your system.


Did not work yet.//How to config the servers when Kafka cluster is behind a NAT?

2018-09-19 Thread XinYi Long
Hello,
Thanks for your reply. And my net topology is as the attach picture.
The node1's configuration is as below. The others are similar.

listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9001
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

advertised.listeners=EXTERNAL://172.16.0.13:9001,INTERNAL://172.20.1.11:9092
inter.broker.listener.name=INTERNAL

I want Kafka to broadcast 
"172.16.0.13:9001,172.16.0.13:9002,172.16.0.13:9003,172.16.0.13:9004,172.16.0.13:9005"
 to the clients. But it can't!
Here is some output of rdkafka_examle in librdkafka.

Metadata for all topics (from broker -1: 172.16.0.13:9001/bootstrap):
 5 brokers:
broker 2 at 172.20.1.12:9092
broker 5 at 172.20.1.15:9092
broker 4 at 172.20.1.14:9092
broker 1 at 172.20.1.11:9092
broker 3 at 172.20.1.13:9092

-邮件原件-
发件人: Robin Moffatt [mailto:ro...@confluent.io] 
发送时间: 2018年9月17日 21:51
收件人: users@kafka.apache.org
主题: Re: How to config the servers when Kafka cluster is behind a NAT?

This should help: https://rmoff.net/2018/08/02/kafka-listeners-explained/


-- 

Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff


On Mon, 17 Sep 2018 at 12:18, XinYi Long  wrote:

> Hello, Guys,
>
> I have a Kafka cluster, which is behind a NAT.
>
> I have some devices on the internet, to play as consumers and 
> producers. And I also have some applications which are in the same LAN 
> with the Kafka cluster, and play as consumers and producers.
>
> I have changed the "advertised.listeners" to "PLAINTEXT://{NAT 
> IP}:{NAT port}", and add some routes on the servers. Because I found 
> that Kafka brokers also use the "advertised.listeners" to talk with each 
> other.
> Am I right?
>
> When I start a consumer in the same LAN, I found it can receive 
> metadata correctly. But it can't consume any message from other 
> producer in the same LAN.
>
> Did I miss anything, and how to make it work?
>
>
> Thank you very much!
>
>
> lxyscls
>


Re: Kafka stream issue : Deleting obsolete state directory

2018-09-19 Thread Bhavesh Patel
Hi Bill,
Apologies, I was trying to attached image.
Please find below the error log,

2018-09-18 09:26:09.112  INFO 1 --- [5-CleanupThread]
o.a.k.s.p.internals.StateDirectory   : stream-thread
[ApplicationName-1ae22d38-32d3-451a-b039-372c79b2e6a5-CleanupThread]
Deleting obsolete state directory 2_1 for task 2_1 as 601112ms has elapsed
(cleanup delay is 60ms).

2018-09-18 09:26:09.116 ERROR 1 --- [5-CleanupThread]
o.a.k.s.p.internals.StateDirectory   : stream-thread
[ApplicationName-1ae22d38-32d3-451a-b039-372c79b2e6a5-CleanupThread] Failed
to delete the state directory.



java.nio.file.DirectoryNotEmptyException:
/tmp/ApplicationName/ApplicationName/2_1

at
sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242)
~[na:1.8.0_161]

at
sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
~[na:1.8.0_161]

at java.nio.file.Files.delete(Files.java:1126)
~[na:1.8.0_161]

at
org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:740)
~[kafka-clients-2.0.0.jar!/:na]

at
org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:723)
~[kafka-clients-2.0.0.jar!/:na]

at java.nio.file.Files.walkFileTree(Files.java:2688)
~[na:1.8.0_161]

at java.nio.file.Files.walkFileTree(Files.java:2742)
~[na:1.8.0_161]

at
org.apache.kafka.common.utils.Utils.delete(Utils.java:723)
~[kafka-clients-2.0.0.jar!/:na]

at
org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287)
[kafka-streams-2.0.0.jar!/:na]

at
org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:250)
[kafka-streams-2.0.0.jar!/:na]

at
org.apache.kafka.streams.KafkaStreams$2.run(KafkaStreams.java:800)
[kafka-streams-2.0.0.jar!/:na]

at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_161]

at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[na:1.8.0_161]

at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_161]

at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_161]

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[na:1.8.0_161]

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[na:1.8.0_161]

at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

Many Thanks,
Bhavesh Patel


On Wed, Sep 19, 2018, 02:02 Bill Bejeck  wrote:

> Hi Bhavesh,
>
> Sorry, but I'm still not seeing any log file attachment, maybe it's being
> filtered out.
>
> Can you copy the relevant section in the body of the email?
>
> Thanks,
> Bill
>
> On Tue, Sep 18, 2018 at 3:21 PM Bhavesh Patel 
> wrote:
>
> > Hi Bill,
> >
> > Apologies, Please find the error stack attached.
> >
> > We are using kafka 2.0.
> >
> > Thanks,
> > Bhavesh Patel
> >
> > On Tue, Sep 18, 2018 at 8:04 PM Bill Bejeck  wrote:
> >
> >> Hi Bhavesh,
> >>
> >> I don't see the log file attachment, can you resend it?
> >>
> >> Also, what version of Kafka Streams are you running?
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Tue, Sep 18, 2018 at 12:31 PM Bhavesh Patel 
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > We have a stream application where we are facing a little strange
> >> problem
> >> > with deleting state directory.
> >> >
> >> > Our application is deployed into in-house container platform and
> >> storage is
> >> > backed by nfs.
> >> >
> >> > When app is running for a while and when it hits the cleanup delay of
> >> > 60ms as defined dafault in Kafka settings we are getting exception
> >> of
> >> > DIRECTORYNOTEMPTYEXCEPTION.
> >> >
> >> > I am attaching the log below, thoughts on this issue will be much
> >> > appreciated.
> >> >
> >> > Many Thanks,
> >> > Bhavesh Patel
> >> >
> >>
> >
>


RE: Understanding default.deserialization.exception.handler

2018-09-19 Thread Tim Ward
Thanks. No, it's not that big a deal now that I understand it, but as I'd had 
to spend a fair amount of time working out what was going on I thought I'd flag 
it up in case it wasn't known about.

Tim Ward

-Original Message-
From: Matthias J. Sax 
Sent: 14 September 2018 19:57
To: users@kafka.apache.org
Subject: Re: Understanding default.deserialization.exception.handler

Your observation is correct. It's a known bug:
https://issues.apache.org/jira/browse/KAFKA-6502

In practice, it should not be a big issue though.
 - you would only hit this bug if you don't process a "good message"
afterwards
 - even if you hit this bug, you would just skip the message again

Thus, the only drawback I see is an additional log message. As long as
you don't have many _consecutive_ corrupted messages it should be impact
you much.

Hope this helps.


-Matthias

On 9/13/18 6:09 AM, Tim Ward wrote:
> With
>
> 
> props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
>  LogAndContinueExceptionHandler.class);
>
> Scenario A:
>
> Run application. Feed a message into the topic that will fail deserialization.
> Application logs exception and keeps running.
> Shut down application. Restart application.
> Application re-reads broken message, logs exception again (and keeps running).
>
> Scenario B:
>
> Run application. Feed a message into the topic that will fail deserialization.
> Application logs exception and keeps running.
> Feed a good message into deserialization.
> Application processes it normally.
> Shut down application. Restart application.
> Application does *not* re-reads broken message.
>
> So it looks like LogAndContinueExceptionHandler does not seem to commit() the 
> incoming "poison pill" message(s), and these will be re-read if the 
> application is restarted without any good messages having been read after the 
> bad ones.
>
> Have I understood this correctly? If so, is this correct behaviour as 
> designed? Is it documented to that level of detail?
>
> Tim Ward
>
> The contents of this email and any attachment are confidential to the 
> intended recipient(s). If you are not an intended recipient: (i) do not use, 
> disclose, distribute, copy or publish this email or its contents; (ii) please 
> contact the sender immediately; and (iii) delete this email. Our privacy 
> policy is available here: https://origamienergy.com/privacy-policy/. Origami 
> Energy Limited (company number 8619644); Origami Storage Limited (company 
> number 10436515) and OSSPV001 Limited (company number 10933403), each 
> registered in England and each with a registered office at: Ashcombe Court, 
> Woolsack Way, Godalming, GU7 1LQ.
>

The contents of this email and any attachment are confidential to the intended 
recipient(s). If you are not an intended recipient: (i) do not use, disclose, 
distribute, copy or publish this email or its contents; (ii) please contact the 
sender immediately; and (iii) delete this email. Our privacy policy is 
available here: https://origamienergy.com/privacy-policy/. Origami Energy 
Limited (company number 8619644); Origami Storage Limited (company number 
10436515) and OSSPV001 Limited (company number 10933403), each registered in 
England and each with a registered office at: Ashcombe Court, Woolsack Way, 
Godalming, GU7 1LQ.


Re: Kafka producer huge memory usage (leak?)

2018-09-19 Thread Manikumar
Similar issue reported here:KAFKA-7304, but on broker side.  maybe you can
create a JIRA and upload the heap dump for analysis.

On Wed, Sep 19, 2018 at 11:59 AM Shantanu Deshmukh 
wrote:

> Any thoughts on this matter? Someone, please help.
>
> On Tue, Sep 18, 2018 at 6:05 PM Shantanu Deshmukh 
> wrote:
>
> > Additionally, here's the producer config
> >
> > kafka.bootstrap.servers=x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092
> > kafka.acks=0
> >
> kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
> >
> >
> kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
> > kafka.max.block.ms=1000
> > kafka.request.timeout.ms=1000
> > kafka.max.in.flight.requests.per.connection=1
> > kafka.retries=0
> > kafka.compression.type=gzip
> > kafka.security.protocol=SSL
> > kafka.ssl.truststore.location=/data/kafka/kafka-server-truststore.jks
> > kafka.ssl.truststore.password=XX
> > kafka.linger.ms=300
> > logger.level=INFO
> >
> > On Tue, Sep 18, 2018 at 5:36 PM Shantanu Deshmukh  >
> > wrote:
> >
> >> Hello,
> >>
> >> We have a 3 broker Kafka 0.10.1.0 deployment in production. There are
> >> some applications which have Kafka Producers embedded in them which send
> >> application logs to a topic. This topic has 10 partitions with
> replication
> >> factor of 3.
> >>
> >> We are observing that memory usage on some of these application servers
> >> keep shooting through the roof intermittently. After taking heapdump we
> >> found out that top suspects were:
> >> *-*
> >>
> >>
> >> *org.apache.kafka.common.network.Selector -*
> >>
> >> occupies *352,519,104 (24.96%)* bytes. The memory is accumulated in one
> >> instance of *"byte[]"* loaded by *""*.
> >>
> >> *org.apache.kafka.common.network.KafkaChannel -*
> >>
> >> occupies *352,527,424 (24.96%)* bytes. The memory is accumulated in one
> >> instance of *"byte[]"* loaded by *""*
> >>
> >> * - *
> >>
> >> Both of these were holding about 352MB of space. 3 such instances, so
> >> they were consuming about 1.2GB of memory.
> >>
> >> Now regarding usage of producers. Not a huge amount of logs are being
> >> sent to Kafka cluster. It is about 200 msgs/sec. Only one producer
> object
> >> is being used throughout application. Async send function is used.
> >>
> >> What could be the cause of such huge memory usage? Is this some sort of
> >> memory leak in this specific Kafka version?
> >>
> >>
>