Re: KafkaStreams: ProducerFencedException causing StreamThread death

2018-09-28 Thread Aravind Dongara
Hi Guozhang

Thanks for your reply.
We are using Kafka 1.1.1

Thanks
Aravind


> On Sep 28, 2018, at 4:45 PM, Guozhang Wang  wrote:
> 
> Hello Aravind,
> 
> Which version of Kafka are you currently using? What you described seems to
> be fixed in the latest version already, so I want to check if you are using
> an older version and if yes, what's the best way to work around it.
> 
> 
> Guozhang
> 
> 
> On Thu, Sep 27, 2018 at 12:54 PM, Aravind Dongara <
> adong...@yahoo.com.invalid> wrote:
> 
>> 
>> During a rebalance triggered by kafka-coordinator-heartbeat-thread losing
>> connection to ‘Group coordinator’, we noticed that a stream thread is
>> shutting down when it catches a ProducerFencedExcpetion while flushing the
>> state store.
>> This also causes the stream-state on that node to be stuck in
>> ‘REBALANCING’ state, even though the partitions have been rebalanced to
>> other threads across nodes.
>> During rebalance there seems to be a race condition between flushState on
>> one node vs ProducerId creation on other node for the same partition. If
>> the flushState is slower than the other it encounters
>> ProducerFencedException.
>> 
>> It would be nice if Kafka streams can handle this exception gracefully and
>> not shutdown the thread, so that we don’t end up with uneven number of
>> threads across nodes.
>> Can you guys please suggest any work arounds for this situation?
>> 
>> Thanks
>> Aravind
>> 
>> 
>> [2018-09-26T15:39:54,662Z]  [ERROR]  [kafka-producer-network-thread |
>> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16-0_55-producer]  
>> [o.a.k.c.producer.internals.Sender]
>> [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>> a27a-0a4b8d6753a2-StreamThread-16-0_55-producer,
>> transactionalId=upsert-merger-stream-oa43-1-0_55] Aborting producer
>> batches due to fatal error
>> [2018-09-26T15:39:54,665Z]  [ERROR]  [upsert-merger-stream-oa43-1-
>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  
>> [o.a.k.s.p.i.ProcessorStateManager]
>> task [0_55] Failed to flush state store upsert-store:
>> org.apache.kafka.streams.errors.StreamsException: task [0_55] Abort
>> sending since an error caught with a previous record (key
>> de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp
>> 1537976392104) to topic upsert-merger-stream-oa43-1-upsert-store-changelog
>> due to Cannot perform send because at least one previous transactional or
>> idempotent request has failed with errors.
>>at org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.send(RecordCollectorImpl.java:197)
>>at org.apache.kafka.streams.state.internals.
>> StoreChangeLogger.logChange(StoreChangeLogger.java:59)
>>at org.apache.kafka.streams.state.internals.
>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
>> e.java:69)
>>at org.apache.kafka.streams.state.internals.
>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
>> e.java:29)
>>at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
>> putAndMaybeForward(CachingKeyValueStore.java:105)
>>at org.apache.kafka.streams.state.internals.
>> CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
>>at org.apache.kafka.streams.state.internals.
>> CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
>>at org.apache.kafka.streams.state.internals.NamedCache.
>> flush(NamedCache.java:142)
>>at org.apache.kafka.streams.state.internals.NamedCache.
>> flush(NamedCache.java:100)
>>at org.apache.kafka.streams.state.internals.ThreadCache.
>> flush(ThreadCache.java:127)
>>at org.apache.kafka.streams.state.internals.
>> CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
>>at org.apache.kafka.streams.state.internals.
>> InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
>>at org.apache.kafka.streams.state.internals.
>> MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
>>at org.apache.kafka.streams.processor.internals.
>> ProcessorStateManager.flush(ProcessorStateManager.java:243)
>>at org.apache.kafka.streams.processor.internals.
>> AbstractTask.flushState(AbstractTask.java:195)
>>at org.apache.kafka.streams.processor.internals.
>> StreamTask.flushState(StreamTask.java:339)
>>at org.apache.kafka.streams.processor.internals.
>> StreamTask$1.run(StreamTask.java:312)
>>at org.apache.kafka.streams.processor.internals.
>> StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
>>at org.apache.kafka.streams.processor.internals.
>> StreamTask.commit(StreamTask.java:307)
>>at org.apache.kafka.streams.processor.internals.
>> StreamTask.suspend(StreamTask.java:440)
>>at org.apache.kafka.streams.processor.internals.
>> StreamTask.suspend(StreamTask.java:422)
>>at org.apache.kafka.streams.processor.internals.
>> 

Re: Kafka ACL issue - Operation denied despite having full access to the topic

2018-09-28 Thread Vahid Hashemian
Your produce needs to have Write access to the topic. But as you mentioned
All should cover Write. Which version of Kafka are you using?
FYI, more authn/authz information can be found here for some of the common
client operations:
https://developer.ibm.com/opentech/2017/05/31/kafka-acls-in-practice/

--Vahid

On Fri, Sep 28, 2018 at 9:13 AM Bala  wrote:

> Producer using the Java API. I did configure the jaas config as per docs.
> It looks like is working and the authentication is succeeded but the
> authorization is not honoring the ACL
>On Friday, September 28, 2018, 11:56:24 AM EDT, Vahid Hashemian <
> vahid.hashem...@gmail.com> wrote:
>
>  Hi Bala,
>
> What operation/command are you trying that gives you this error?
>
> --Vahid
>
> On Fri, Sep 28, 2018 at 7:12 AM Bala  wrote:
>
> > I have a kafka with kerberos security and trying to use the ACL and am
> not
> > able to make it work.
> >
> > Here is the error I am seeing in the server log.[2018-09-28 14:06:54,152]
> > INFO Principal = User:storm-mytestcluster is Denied Operation = Describe
> > from host =  on resource = Topic:icd_alpha
> > (kafka.authorizer.logger)
> > [2018-09-28 14:06:54,312] INFO Principal = User:storm-mytestcluster is
> > Denied Operation = Describe from host =  on resource
> =
> > Topic:icd_alpha (kafka.authorizer.logger)
> > [2018-09-28 14:06:54,472] INFO Principal = User:storm-mytestcluster is
> > Denied Operation = Describe from host =  on resource
> =
> > Topic:icd_alpha (kafka.authorizer.logger)
> > [2018-09-28 14:06:54,631] INFO Principal = User:storm-mytestcluster is
> > Denied Operation = Describe from host =  on resource
> =
> > Topic:icd_alpha (kafka.authorizer.logger)
> > [2018-09-28 14:06:54,793] INFO Principal = User:storm-mytestcluster is
> > Denied Operation = Describe from host =  on resource
> =
> > Topic:icd_alpha (kafka.authorizer.logger)
> > [2018-09-28 14:06:54,953] INFO Principal = User:storm-mytestcluster is
> > Denied Operation = Describe from host =  on resource
> =
> > Topic:icd_alpha (kafka.authorizer.logger)
> >
> >
> >
> > But the user has full access to the topic: Here is the output of `list `
> > command
> >
> > Current ACLs for resource `Topic:icd_alpha`:
> >  user:storm-mytestcluster has Allow permission for operations: All
> > from hosts: *
> >
> > Please help me, as I am kind of blocked and don't know how to proceed
> > further.
> > ThanksBala
> >
>


Re: KafkaStreams: ProducerFencedException causing StreamThread death

2018-09-28 Thread Guozhang Wang
Hello Aravind,

Which version of Kafka are you currently using? What you described seems to
be fixed in the latest version already, so I want to check if you are using
an older version and if yes, what's the best way to work around it.


Guozhang


On Thu, Sep 27, 2018 at 12:54 PM, Aravind Dongara <
adong...@yahoo.com.invalid> wrote:

>
> During a rebalance triggered by kafka-coordinator-heartbeat-thread losing
> connection to ‘Group coordinator’, we noticed that a stream thread is
> shutting down when it catches a ProducerFencedExcpetion while flushing the
> state store.
> This also causes the stream-state on that node to be stuck in
> ‘REBALANCING’ state, even though the partitions have been rebalanced to
> other threads across nodes.
> During rebalance there seems to be a race condition between flushState on
> one node vs ProducerId creation on other node for the same partition. If
> the flushState is slower than the other it encounters
> ProducerFencedException.
>
> It would be nice if Kafka streams can handle this exception gracefully and
> not shutdown the thread, so that we don’t end up with uneven number of
> threads across nodes.
> Can you guys please suggest any work arounds for this situation?
>
> Thanks
> Aravind
>
>
> [2018-09-26T15:39:54,662Z]  [ERROR]  [kafka-producer-network-thread |
> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> 0a4b8d6753a2-StreamThread-16-0_55-producer]  
> [o.a.k.c.producer.internals.Sender]
> [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
> a27a-0a4b8d6753a2-StreamThread-16-0_55-producer,
> transactionalId=upsert-merger-stream-oa43-1-0_55] Aborting producer
> batches due to fatal error
> [2018-09-26T15:39:54,665Z]  [ERROR]  [upsert-merger-stream-oa43-1-
> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  
> [o.a.k.s.p.i.ProcessorStateManager]
> task [0_55] Failed to flush state store upsert-store:
> org.apache.kafka.streams.errors.StreamsException: task [0_55] Abort
> sending since an error caught with a previous record (key
> de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp
> 1537976392104) to topic upsert-merger-stream-oa43-1-upsert-store-changelog
> due to Cannot perform send because at least one previous transactional or
> idempotent request has failed with errors.
> at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(RecordCollectorImpl.java:197)
> at org.apache.kafka.streams.state.internals.
> StoreChangeLogger.logChange(StoreChangeLogger.java:59)
> at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
> e.java:69)
> at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
> e.java:29)
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> putAndMaybeForward(CachingKeyValueStore.java:105)
> at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
> at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:142)
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:100)
> at org.apache.kafka.streams.state.internals.ThreadCache.
> flush(ThreadCache.java:127)
> at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
> at org.apache.kafka.streams.state.internals.
> InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
> at org.apache.kafka.streams.state.internals.
> MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(ProcessorStateManager.java:243)
> at org.apache.kafka.streams.processor.internals.
> AbstractTask.flushState(AbstractTask.java:195)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.flushState(StreamTask.java:339)
> at org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:312)
> at org.apache.kafka.streams.processor.internals.
> StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:307)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.suspend(StreamTask.java:440)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.suspend(StreamTask.java:422)
> at org.apache.kafka.streams.processor.internals.
> AssignedTasks.suspendTasks(AssignedTasks.java:182)
> at org.apache.kafka.streams.processor.internals.
> AssignedTasks.suspend(AssignedTasks.java:147)
> at org.apache.kafka.streams.processor.internals.TaskManager.
> 

Re: Kafka ACL issue - Operation denied despite having full access to the topic

2018-09-28 Thread Bala
Producer using the Java API. I did configure the jaas config as per docs. It 
looks like is working and the authentication is succeeded but the authorization 
is not honoring the ACL
   On Friday, September 28, 2018, 11:56:24 AM EDT, Vahid Hashemian 
 wrote:  
 
 Hi Bala,

What operation/command are you trying that gives you this error?

--Vahid

On Fri, Sep 28, 2018 at 7:12 AM Bala  wrote:

> I have a kafka with kerberos security and trying to use the ACL and am not
> able to make it work.
>
> Here is the error I am seeing in the server log.[2018-09-28 14:06:54,152]
> INFO Principal = User:storm-mytestcluster is Denied Operation = Describe
> from host =  on resource = Topic:icd_alpha
> (kafka.authorizer.logger)
> [2018-09-28 14:06:54,312] INFO Principal = User:storm-mytestcluster is
> Denied Operation = Describe from host =  on resource =
> Topic:icd_alpha (kafka.authorizer.logger)
> [2018-09-28 14:06:54,472] INFO Principal = User:storm-mytestcluster is
> Denied Operation = Describe from host =  on resource =
> Topic:icd_alpha (kafka.authorizer.logger)
> [2018-09-28 14:06:54,631] INFO Principal = User:storm-mytestcluster is
> Denied Operation = Describe from host =  on resource =
> Topic:icd_alpha (kafka.authorizer.logger)
> [2018-09-28 14:06:54,793] INFO Principal = User:storm-mytestcluster is
> Denied Operation = Describe from host =  on resource =
> Topic:icd_alpha (kafka.authorizer.logger)
> [2018-09-28 14:06:54,953] INFO Principal = User:storm-mytestcluster is
> Denied Operation = Describe from host =  on resource =
> Topic:icd_alpha (kafka.authorizer.logger)
>
>
>
> But the user has full access to the topic: Here is the output of `list `
> command
>
> Current ACLs for resource `Topic:icd_alpha`:
>      user:storm-mytestcluster has Allow permission for operations: All
> from hosts: *
>
> Please help me, as I am kind of blocked and don't know how to proceed
> further.
> ThanksBala
>
  

Re: Kafka ACL issue - Operation denied despite having full access to the topic

2018-09-28 Thread Vahid Hashemian
Hi Bala,

What operation/command are you trying that gives you this error?

--Vahid

On Fri, Sep 28, 2018 at 7:12 AM Bala  wrote:

> I have a kafka with kerberos security and trying to use the ACL and am not
> able to make it work.
>
> Here is the error I am seeing in the server log.[2018-09-28 14:06:54,152]
> INFO Principal = User:storm-mytestcluster is Denied Operation = Describe
> from host =  on resource = Topic:icd_alpha
> (kafka.authorizer.logger)
> [2018-09-28 14:06:54,312] INFO Principal = User:storm-mytestcluster is
> Denied Operation = Describe from host =  on resource =
> Topic:icd_alpha (kafka.authorizer.logger)
> [2018-09-28 14:06:54,472] INFO Principal = User:storm-mytestcluster is
> Denied Operation = Describe from host =  on resource =
> Topic:icd_alpha (kafka.authorizer.logger)
> [2018-09-28 14:06:54,631] INFO Principal = User:storm-mytestcluster is
> Denied Operation = Describe from host =  on resource =
> Topic:icd_alpha (kafka.authorizer.logger)
> [2018-09-28 14:06:54,793] INFO Principal = User:storm-mytestcluster is
> Denied Operation = Describe from host =  on resource =
> Topic:icd_alpha (kafka.authorizer.logger)
> [2018-09-28 14:06:54,953] INFO Principal = User:storm-mytestcluster is
> Denied Operation = Describe from host =  on resource =
> Topic:icd_alpha (kafka.authorizer.logger)
>
>
>
> But the user has full access to the topic: Here is the output of `list `
> command
>
> Current ACLs for resource `Topic:icd_alpha`:
>  user:storm-mytestcluster has Allow permission for operations: All
> from hosts: *
>
> Please help me, as I am kind of blocked and don't know how to proceed
> further.
> ThanksBala
>


Kafka ACL issue - Operation denied despite having full access to the topic

2018-09-28 Thread Bala
I have a kafka with kerberos security and trying to use the ACL and am not able 
to make it work. 

Here is the error I am seeing in the server log.[2018-09-28 14:06:54,152] INFO 
Principal = User:storm-mytestcluster is Denied Operation = Describe from host = 
 on resource = Topic:icd_alpha (kafka.authorizer.logger)
[2018-09-28 14:06:54,312] INFO Principal = User:storm-mytestcluster is Denied 
Operation = Describe from host =  on resource = 
Topic:icd_alpha (kafka.authorizer.logger)
[2018-09-28 14:06:54,472] INFO Principal = User:storm-mytestcluster is Denied 
Operation = Describe from host =  on resource = 
Topic:icd_alpha (kafka.authorizer.logger)
[2018-09-28 14:06:54,631] INFO Principal = User:storm-mytestcluster is Denied 
Operation = Describe from host =  on resource = 
Topic:icd_alpha (kafka.authorizer.logger)
[2018-09-28 14:06:54,793] INFO Principal = User:storm-mytestcluster is Denied 
Operation = Describe from host =  on resource = 
Topic:icd_alpha (kafka.authorizer.logger)
[2018-09-28 14:06:54,953] INFO Principal = User:storm-mytestcluster is Denied 
Operation = Describe from host =  on resource = 
Topic:icd_alpha (kafka.authorizer.logger)



But the user has full access to the topic: Here is the output of `list ` command

Current ACLs for resource `Topic:icd_alpha`: 
     user:storm-mytestcluster has Allow permission for operations: All from 
hosts: * 

Please help me, as I am kind of blocked and don't know how to proceed further.
ThanksBala


Re: Have connector be paused from start

2018-09-28 Thread Svante Karlsson
Sound like a workflow/pipeline thing in jenkins (or equivalent) to me.



Den ons 26 sep. 2018 kl 17:27 skrev Rickard Cardell
:

> Hi
> Is there a way to have a Kafka Connect connector begin in state 'PAUSED'?
> I.e I would like to have the connector set to paused before it can process
> any data from Kafka.
>
> Some background:
>
> I have a use case where we will push data from Kafka into S3 using Kafka
> Connect. It also involves a one-time backfill of data from Hadoop to get
> all the historic data into S3 as well, into the same dataset.
>
> To avoid too many duplicates we want the Kafka Connect pipeline and the
> HDFS-to-S3 pipeline to overlap just a few hours, i.e:
>
> 1. start kafka-connect kafka-to-s3 pipeline
> 2. wait a few hours
> 3. start pushing data from Hadoop to S3
>
> However, I have one process that deploys Kafka Connect connectors and
> another one that will handle this backfilling process, so one way of
> solving this would be if a connector could start in paused state and be
> resumed by the backfilling process.
>
> One less pretty solution to make the connector be paused before it can
> consume any data is by deploying it with faulty Kafka settings, set it to
> paused and then correct the settings, but I hope there are better solutions
> than that
>
> regards
> Rickard
>
> --
>
> *Rickard Cardell*
> Software developer
> Data Infrastructure
>
> Klarna Bank AB (publ)
> Sveavägen 46, 111 34 Stockholm
> Tel: +46 8 120 120 00 <+46812012000>
> Reg no: 556737-0431
> klarna.com
>


Re: How to setup Kafka security ? but only for some

2018-09-28 Thread Tobias Eriksson
That is exactly what I am after I think
Now I need to figure out how to do the Access Control (ACL) too
Thanx
-Tobias

On 2018-09-28, 12:33, "Daniel Nägele"  wrote:

Hello Tobias,

you can declare multiple listeners, I use the following setup for instance:

listeners=PLAINTEXT://fqdn:9092,SASL_SSL://fqdn:9093

I plan to turn PLAINTEXT off however, because why not encrypt the
internal communication too.

Best regards,
Daniel

On 9/27/18 10:09 AM, Tobias Eriksson wrote:
> We have Kafka v1.1.0
> Is there a really good tutorial somewhere on how to set up security with 
SSL and ACL
> 
> I would like to have ONE cluster, where
> * Our internal services does not have to use SSL / ACL
> * The 3;rd party applications HAVE TO use SSL / ACL
> 
> is this possible ?
> 
> -Tobias
> 
> --
> Tobias Eriksson
> Chief Architect Research – CTO Office
> Qvantel Sweden AB
> Tel; +46 768 832453
> e-mail; tobias.eriks...@qvantel.com
> 
> 




Re: How to setup Kafka security ? but only for some

2018-09-28 Thread Daniel Nägele
Hello Tobias,

you can declare multiple listeners, I use the following setup for instance:

listeners=PLAINTEXT://fqdn:9092,SASL_SSL://fqdn:9093

I plan to turn PLAINTEXT off however, because why not encrypt the
internal communication too.

Best regards,
Daniel

On 9/27/18 10:09 AM, Tobias Eriksson wrote:
> We have Kafka v1.1.0
> Is there a really good tutorial somewhere on how to set up security with SSL 
> and ACL
> 
> I would like to have ONE cluster, where
> * Our internal services does not have to use SSL / ACL
> * The 3;rd party applications HAVE TO use SSL / ACL
> 
> is this possible ?
> 
> -Tobias
> 
> --
> Tobias Eriksson
> Chief Architect Research – CTO Office
> Qvantel Sweden AB
> Tel; +46 768 832453
> e-mail; tobias.eriks...@qvantel.com
> 
>