Connector configuration is invalid

2017-06-02 Thread mayank rathi
Hello All,

I am getting "Connector configuration is invalid" error with following
configuration. Can someone please help me in finding out what I am doing
wrong here

name=topic_see
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:db2://*.*.*.com:6/*:user=dsetl:password=*
mode=bulk
table.whitelist=CATALOG
topic.prefix=topic-see-
schema.registry=GIL

Thanks and Regards

-- 
NOTICE: This email message is for the sole use of the intended recipient(s)
and may contain confidential and privileged information. Any unauthorized
review, use, disclosure or distribution is prohibited. If you are not the
intended recipient, please contact the sender by reply email and destroy
all copies of the original message.


Re: Data in kafka topic in Json format

2017-06-02 Thread Hans Jespersen
You have shared the Kafka connect properties but not the source connector 
config.
Which source connector are you using? Does it override the default settings you 
provided?
Are you running the connector in standalone mode or distributed mode?
Also what are you using to consume the messages and see the message format?

-hans



> On Jun 2, 2017, at 9:10 AM, Mina Aslani  wrote:
> 
> Hi Hans,
> 
> Thank you for your quick response, appreciate it.
> 
> In *kafka-connect* docker, I see below settings in
> *kafka-connect.properties* file in *kafka-connect *directory:
> 
> key.converter.schemas.enable=false
> key.converter.schema.registry.url=http://kafka-schema-registry:
> value.converter.schema.registry.url=http://kafka-schema-registry:
> 
> key.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter=org.apache.kafka.connect.json.JsonConverter
> 
> internal.value.converter.schemas.enable=false
> rest.advertised.host.name=kafka-connect
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> And the settings in *schema-registry *directory of *kafka-connect *docker
> are as
> 
> https://github.com/confluentinc/schema-registry/tree/master/config
> 
> Should I consider any other settings for *kafka-connect* or
> *schema-registry* to get the real json object NOT string
> formatted/stringified json which has extra "\"  and is not json any more?
> 
> Best regards,
> Mina
> 
> On Fri, Jun 2, 2017 at 11:18 AM, Hans Jespersen  wrote:
> 
>> 
>> My earlier comment still applies but in Kafka Connect the equivalent of a
>> serializer/deserializer (serdes) is called a “converter”.
>> Check which converter you have configured for your source connector and if
>> it is overriding whatever the default converter is configured for the
>> connect worker it is running in.
>> 
>> -hans
>> 
>> 
>> 
>> 
>>> On Jun 2, 2017, at 8:12 AM, Mina Aslani  wrote:
>>> 
>>> Hi,
>>> 
>>> I would like to add that I use kafka-connect and schema-registery
>> version `
>>> 3.2.1-6`.
>>> 
>>> Best regards,
>>> Mina
>>> 
>>> On Fri, Jun 2, 2017 at 10:59 AM, Mina Aslani 
>> wrote:
>>> 
 Hi.
 
 Is there any way that I get the data into a Kafka topic in Json format?
 The source that I ingest the data from have the data in Json format,
 however when I look that data in the kafka topic, schema and payload
>> fields
 are added and data is not in json format.
 
 I want to avoid implementing a transformer to get the data from the
>> topic
 and publishes Json in another one.
 
 Your input is appreciated.
 
 Best regards,
 Mina
 
>> 
>> 



Re: Finding StreamsMetadata with value-dependent partitioning

2017-06-02 Thread Steven Schlansker
> 
> On Jun 2, 2017, at 3:32 PM, Matthias J. Sax  wrote:
> 
> Thanks. That helps to understand the use case better.
> 
> Rephrase to make sure I understood it correctly:
> 
> 1) you are providing a custom partitioner to Streams that is base on one
> field in your value (that's fine with regard to fault-tolerance :))
> 2) you want to use interactive queries to query the store
> 3) because of your custom partitioning schema, you need to manually
> figure out the right application instance that hosts a key
> 4) thus, you use a GlobalKTable to maintain the information from K to D
> and thus to the partition ie, streams instance that hosts K
> 
> If this is correct, than you cannot use the "by key" metadata interface.
> It's designed to find the streams instance base in the key only -- but
> your partitioner is based on the value. Internally, we call
> 
>> final Integer partition = partitioner.partition(key, null, 
>> sourceTopicsInfo.maxPartitions);
> 
> Note, that `value==null` -- at this point, we don't have any value
> available and can't provide it to the partitioner.
> 
> Thus, your approach to get all metadata is the only way you can go.

Thanks for confirming this.  The code is a little ugly but I've done worse :)

> 
> 
> Very interesting (and quite special) use case. :)
> 
> 
> -Matthias
> 
> On 6/2/17 2:32 PM, Steven Schlansker wrote:
>> 
>>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax  wrote:
>>> 
>>> I am not sure if I understand the use case correctly. Could you give
>>> some more context?
>> 
>> Happily, thanks for thinking about this!
>> 
>>> 
 backing store whose partitioning is value dependent
>>> 
>>> In infer that you are using a custom store and not default RocksDB? If
>>> yes, what do you use? What does "value dependent" mean in this context?
>> 
>> We're currently using the base in memory store.  We tried to use RocksDB
>> but the tuning to get it running appropriately in a Linux container without
>> tripping the cgroups OOM killer is nontrivial.
>> 
>> 
>>> Right now, I am wondering, why you not just set a new key to get your
>>> data grouped by the field you are interesting in? Also, if you don't
>>> partitioned your data by key, you might break your streams application
>>> with regard to fault-tolerance -- or does your custom store not rely on
>>> changelog backup for fault-tolerance?
>>> 
>> 
>> That's an interesting point about making transformed key.  But I don't think
>> it simplifies my problem too much.  Essentially, I have a list of messages
>> that should get delivered to destinations.  Each message has a primary key K
>> and a destination D.
>> 
>> We partition over D so that all messages to the same destination are handled 
>> by
>> the same worker, to preserve ordering and implement local rate limits etc.
>> 
>> I want to preserve the illusion to the client that they can look up a key 
>> with
>> only K.  So, as an intermediate step, we use the GlobalKTable to look up D.  
>> Once
>> we have K,D we can then compute the partition and execute a lookup.
>> 
>> Transforming the key to be a composite K,D isn't helpful because the end 
>> user still
>> only knows K -- D's relevance is an implementation detail I wish to hide -- 
>> so you still
>> need some sort of secondary lookup.
>> 
>> We do use the changelog backup for fault tolerance -- how would having the 
>> partition
>> based on the value break this?  Is the changelog implicitly partitioned by a 
>> partitioner
>> other than the one we give to the topology?
>> 
>> Hopefully that explains my situation a bit more?  Thanks!
>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> 
>>> On 6/2/17 10:34 AM, Steven Schlansker wrote:
 I have a KTable and backing store whose partitioning is value dependent.
 I want certain groups of messages to be ordered and that grouping is 
 determined
 by one field (D) of the (possibly large) value.
 
 When I lookup by only K, obviously you don't know the partition it should 
 be on.
 So I will build a GlobalKTable of K -> D.  This gives me enough information
 to determine the partition.
 
 Unfortunately, the KafkaStreams metadata API doesn't fit this use case 
 well.
 It allows you to either get all metadata, or by key -- but if you lookup 
 by key
 it just substitutes a null value (causing a downstream NPE)
 
 I can iterate over all metadata and compute the mapping of K -> K,D -> P
 and then iterate over all metadata looking for P.  It's not difficult but 
 ends
 up being a bit of somewhat ugly code that feels like I shouldn't have to 
 write it.
 
 Am I missing something here?  Is there a better way that I've missed?  
 Thanks!
 
>>> 
>> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Suppressing intermediate topics feeding (Global)KTable

2017-06-02 Thread Steven Schlansker
You're entirely right.  I'd forgotten that each instance would only read
a subset of the main topic.  Should have figured that out myself.  Thanks
for the sanity check! :)

> On Jun 2, 2017, at 3:41 PM, Matthias J. Sax  wrote:
> 
> Makes sense now (considering the explanation form the other thread).
> 
> With regard to an "opt-in" optimization. We could simplify the API and
> hide some details, but it wouldn't buy you anything from an execution
> point of view. As you need all data on each instance, you need to
> somehow "broadcast" the information -- and Kafka Streams applications
> use topics to exchange data. Thus, we need a topic anyhow.
> 
> Does this make sense?
> 
> So your overall architecture seems to be sound to me.
> 
> 
> -Matthias
> 
> 
> On 6/2/17 2:37 PM, Steven Schlansker wrote:
>> 
>>> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax  wrote:
>>> 
>>> Hi,
>>> 
>>> If you want to populate a GlobalKTable you can only do this by reading a
>>> topic. So the short answer for you head line is: no, you can suppress
>>> the intermediate topic.
>> 
>> Bummer!  Maybe this is an opt-in optimization to consider later.
>> 
>>> 
>>> However, I am wondering what the purpose of you secondary index is, and
>>> why you are using a GlobalKTable for it. Maybe you can elaborate a
>>> little bit?
>> 
>> Elaborated on this a bit in the other thread, I was trying to keep separate
>> problems separate, but maybe I just made things more confusing!
>> 
>> tl;dr is that the user requests values knowing K, but there is actually a
>> "hidden composite key" D that is relevant to the partitioning strategy.
>> 
>> The GlobalKTable allows you to look up K -> D, and then find the right local 
>> KTable K,D -> V
>> 
>>> 
>>> I am also wondering about this code snippet:
>>> 
> builder.stream(mainTopic)
>  .mapValues(...)
>  .to(secondaryIndex1)
>>> 
>>> Should it not be .map() that transforms (k,v) ->
>>> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing.
>>> 
>> 
>> In this case, the "externally visible" K needs additional information about
>> the destination D so that it can be partitioned correctly.  So the code looks
>> like:
>> 
>>// TODO: sucks that this materializes an intermediate topic
>>msgStream
>>.mapValues(v -> v == null ? null : 
>> v.getResolvedDestination().toString())
>>.to(Serdes.String(), Serdes.String(), DEST_INDEX);
>> 
>>builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, 
>> DEST_INDEX);
>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 6/2/17 12:28 PM, Steven Schlansker wrote:
 Hi everyone, another question for the list :)
 
 I'm creating a cluster of KTable (and GlobalKTable) based off the same
 input stream K,V.
 
 It has a number of secondary indices (think like a RDBMS)
 K1 -> K
 K2 -> K
 etc
 
 These are all based off of trivial mappings from my main stream that also
 feeds the K -> V StateStore.  Think one liners like v -> 
 v.getSecondaryKey1()
 
 Currently, for each one it seems that I have to do
 
 builder.stream(mainTopic)
  .mapValues(...)
  .to(secondaryIndex1)
 
 builder.globalTable(secondaryIndex1, secondaryIndexStore1);
 
 Unfortunately the intermediate "secondaryIndex1" topic is relatively
 low value.  In a case where my state stores are lost, I already have to
 read through the mainTopic to recover the main state store.  While it's 
 doing
 that, I'd much rather it rebuild the GlobalKTable instance from that data
 directly.  Then I could skip having this index in Kafka at all, it's 
 entirely
 redundant.  The data is already loaded and deserialized for the benefit of
 another Processor.
 
 Any thoughts?  Happy Friday,
 Steven
 
>>> 
>> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Suppressing intermediate topics feeding (Global)KTable

2017-06-02 Thread Matthias J. Sax
Makes sense now (considering the explanation form the other thread).

With regard to an "opt-in" optimization. We could simplify the API and
hide some details, but it wouldn't buy you anything from an execution
point of view. As you need all data on each instance, you need to
somehow "broadcast" the information -- and Kafka Streams applications
use topics to exchange data. Thus, we need a topic anyhow.

Does this make sense?

So your overall architecture seems to be sound to me.


-Matthias


On 6/2/17 2:37 PM, Steven Schlansker wrote:
> 
>> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax  wrote:
>>
>> Hi,
>>
>> If you want to populate a GlobalKTable you can only do this by reading a
>> topic. So the short answer for you head line is: no, you can suppress
>> the intermediate topic.
> 
> Bummer!  Maybe this is an opt-in optimization to consider later.
> 
>>
>> However, I am wondering what the purpose of you secondary index is, and
>> why you are using a GlobalKTable for it. Maybe you can elaborate a
>> little bit?
> 
> Elaborated on this a bit in the other thread, I was trying to keep separate
> problems separate, but maybe I just made things more confusing!
> 
> tl;dr is that the user requests values knowing K, but there is actually a
> "hidden composite key" D that is relevant to the partitioning strategy.
> 
> The GlobalKTable allows you to look up K -> D, and then find the right local 
> KTable K,D -> V
> 
>>
>> I am also wondering about this code snippet:
>>
 builder.stream(mainTopic)
   .mapValues(...)
   .to(secondaryIndex1)
>>
>> Should it not be .map() that transforms (k,v) ->
>> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing.
>>
> 
> In this case, the "externally visible" K needs additional information about
> the destination D so that it can be partitioned correctly.  So the code looks
> like:
> 
> // TODO: sucks that this materializes an intermediate topic
> msgStream
> .mapValues(v -> v == null ? null : 
> v.getResolvedDestination().toString())
> .to(Serdes.String(), Serdes.String(), DEST_INDEX);
> 
> builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, 
> DEST_INDEX);
> 
>>
>> -Matthias
>>
>>
>> On 6/2/17 12:28 PM, Steven Schlansker wrote:
>>> Hi everyone, another question for the list :)
>>>
>>> I'm creating a cluster of KTable (and GlobalKTable) based off the same
>>> input stream K,V.
>>>
>>> It has a number of secondary indices (think like a RDBMS)
>>> K1 -> K
>>> K2 -> K
>>> etc
>>>
>>> These are all based off of trivial mappings from my main stream that also
>>> feeds the K -> V StateStore.  Think one liners like v -> 
>>> v.getSecondaryKey1()
>>>
>>> Currently, for each one it seems that I have to do
>>>
>>> builder.stream(mainTopic)
>>>   .mapValues(...)
>>>   .to(secondaryIndex1)
>>>
>>> builder.globalTable(secondaryIndex1, secondaryIndexStore1);
>>>
>>> Unfortunately the intermediate "secondaryIndex1" topic is relatively
>>> low value.  In a case where my state stores are lost, I already have to
>>> read through the mainTopic to recover the main state store.  While it's 
>>> doing
>>> that, I'd much rather it rebuild the GlobalKTable instance from that data
>>> directly.  Then I could skip having this index in Kafka at all, it's 
>>> entirely
>>> redundant.  The data is already loaded and deserialized for the benefit of
>>> another Processor.
>>>
>>> Any thoughts?  Happy Friday,
>>> Steven
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Finding StreamsMetadata with value-dependent partitioning

2017-06-02 Thread Matthias J. Sax
Thanks. That helps to understand the use case better.

Rephrase to make sure I understood it correctly:

1) you are providing a custom partitioner to Streams that is base on one
field in your value (that's fine with regard to fault-tolerance :))
2) you want to use interactive queries to query the store
3) because of your custom partitioning schema, you need to manually
figure out the right application instance that hosts a key
4) thus, you use a GlobalKTable to maintain the information from K to D
and thus to the partition ie, streams instance that hosts K

If this is correct, than you cannot use the "by key" metadata interface.
It's designed to find the streams instance base in the key only -- but
your partitioner is based on the value. Internally, we call

> final Integer partition = partitioner.partition(key, null, 
> sourceTopicsInfo.maxPartitions);

Note, that `value==null` -- at this point, we don't have any value
available and can't provide it to the partitioner.

Thus, your approach to get all metadata is the only way you can go.


Very interesting (and quite special) use case. :)


-Matthias

On 6/2/17 2:32 PM, Steven Schlansker wrote:
> 
>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax  wrote:
>>
>> I am not sure if I understand the use case correctly. Could you give
>> some more context?
> 
> Happily, thanks for thinking about this!
> 
>>
>>> backing store whose partitioning is value dependent
>>
>> In infer that you are using a custom store and not default RocksDB? If
>> yes, what do you use? What does "value dependent" mean in this context?
> 
> We're currently using the base in memory store.  We tried to use RocksDB
> but the tuning to get it running appropriately in a Linux container without
> tripping the cgroups OOM killer is nontrivial.
> 
> 
>> Right now, I am wondering, why you not just set a new key to get your
>> data grouped by the field you are interesting in? Also, if you don't
>> partitioned your data by key, you might break your streams application
>> with regard to fault-tolerance -- or does your custom store not rely on
>> changelog backup for fault-tolerance?
>>
> 
> That's an interesting point about making transformed key.  But I don't think
> it simplifies my problem too much.  Essentially, I have a list of messages
> that should get delivered to destinations.  Each message has a primary key K
> and a destination D.
> 
> We partition over D so that all messages to the same destination are handled 
> by
> the same worker, to preserve ordering and implement local rate limits etc.
> 
> I want to preserve the illusion to the client that they can look up a key with
> only K.  So, as an intermediate step, we use the GlobalKTable to look up D.  
> Once
> we have K,D we can then compute the partition and execute a lookup.
> 
> Transforming the key to be a composite K,D isn't helpful because the end user 
> still
> only knows K -- D's relevance is an implementation detail I wish to hide -- 
> so you still
> need some sort of secondary lookup.
> 
> We do use the changelog backup for fault tolerance -- how would having the 
> partition
> based on the value break this?  Is the changelog implicitly partitioned by a 
> partitioner
> other than the one we give to the topology?
> 
> Hopefully that explains my situation a bit more?  Thanks!
> 
>>
>> -Matthias
>>
>>
>>
>> On 6/2/17 10:34 AM, Steven Schlansker wrote:
>>> I have a KTable and backing store whose partitioning is value dependent.
>>> I want certain groups of messages to be ordered and that grouping is 
>>> determined
>>> by one field (D) of the (possibly large) value.
>>>
>>> When I lookup by only K, obviously you don't know the partition it should 
>>> be on.
>>> So I will build a GlobalKTable of K -> D.  This gives me enough information
>>> to determine the partition.
>>>
>>> Unfortunately, the KafkaStreams metadata API doesn't fit this use case well.
>>> It allows you to either get all metadata, or by key -- but if you lookup by 
>>> key
>>> it just substitutes a null value (causing a downstream NPE)
>>>
>>> I can iterate over all metadata and compute the mapping of K -> K,D -> P
>>> and then iterate over all metadata looking for P.  It's not difficult but 
>>> ends
>>> up being a bit of somewhat ugly code that feels like I shouldn't have to 
>>> write it.
>>>
>>> Am I missing something here?  Is there a better way that I've missed?  
>>> Thanks!
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Suppressing intermediate topics feeding (Global)KTable

2017-06-02 Thread Steven Schlansker

> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax  wrote:
> 
> Hi,
> 
> If you want to populate a GlobalKTable you can only do this by reading a
> topic. So the short answer for you head line is: no, you can suppress
> the intermediate topic.

Bummer!  Maybe this is an opt-in optimization to consider later.

> 
> However, I am wondering what the purpose of you secondary index is, and
> why you are using a GlobalKTable for it. Maybe you can elaborate a
> little bit?

Elaborated on this a bit in the other thread, I was trying to keep separate
problems separate, but maybe I just made things more confusing!

tl;dr is that the user requests values knowing K, but there is actually a
"hidden composite key" D that is relevant to the partitioning strategy.

The GlobalKTable allows you to look up K -> D, and then find the right local 
KTable K,D -> V

> 
> I am also wondering about this code snippet:
> 
>>> builder.stream(mainTopic)
>>>   .mapValues(...)
>>>   .to(secondaryIndex1)
> 
> Should it not be .map() that transforms (k,v) ->
> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing.
> 

In this case, the "externally visible" K needs additional information about
the destination D so that it can be partitioned correctly.  So the code looks
like:

// TODO: sucks that this materializes an intermediate topic
msgStream
.mapValues(v -> v == null ? null : 
v.getResolvedDestination().toString())
.to(Serdes.String(), Serdes.String(), DEST_INDEX);

builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, 
DEST_INDEX);

> 
> -Matthias
> 
> 
> On 6/2/17 12:28 PM, Steven Schlansker wrote:
>> Hi everyone, another question for the list :)
>> 
>> I'm creating a cluster of KTable (and GlobalKTable) based off the same
>> input stream K,V.
>> 
>> It has a number of secondary indices (think like a RDBMS)
>> K1 -> K
>> K2 -> K
>> etc
>> 
>> These are all based off of trivial mappings from my main stream that also
>> feeds the K -> V StateStore.  Think one liners like v -> v.getSecondaryKey1()
>> 
>> Currently, for each one it seems that I have to do
>> 
>> builder.stream(mainTopic)
>>   .mapValues(...)
>>   .to(secondaryIndex1)
>> 
>> builder.globalTable(secondaryIndex1, secondaryIndexStore1);
>> 
>> Unfortunately the intermediate "secondaryIndex1" topic is relatively
>> low value.  In a case where my state stores are lost, I already have to
>> read through the mainTopic to recover the main state store.  While it's doing
>> that, I'd much rather it rebuild the GlobalKTable instance from that data
>> directly.  Then I could skip having this index in Kafka at all, it's entirely
>> redundant.  The data is already loaded and deserialized for the benefit of
>> another Processor.
>> 
>> Any thoughts?  Happy Friday,
>> Steven
>> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Finding StreamsMetadata with value-dependent partitioning

2017-06-02 Thread Steven Schlansker

> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax  wrote:
> 
> I am not sure if I understand the use case correctly. Could you give
> some more context?

Happily, thanks for thinking about this!

> 
>> backing store whose partitioning is value dependent
> 
> In infer that you are using a custom store and not default RocksDB? If
> yes, what do you use? What does "value dependent" mean in this context?

We're currently using the base in memory store.  We tried to use RocksDB
but the tuning to get it running appropriately in a Linux container without
tripping the cgroups OOM killer is nontrivial.


> Right now, I am wondering, why you not just set a new key to get your
> data grouped by the field you are interesting in? Also, if you don't
> partitioned your data by key, you might break your streams application
> with regard to fault-tolerance -- or does your custom store not rely on
> changelog backup for fault-tolerance?
> 

That's an interesting point about making transformed key.  But I don't think
it simplifies my problem too much.  Essentially, I have a list of messages
that should get delivered to destinations.  Each message has a primary key K
and a destination D.

We partition over D so that all messages to the same destination are handled by
the same worker, to preserve ordering and implement local rate limits etc.

I want to preserve the illusion to the client that they can look up a key with
only K.  So, as an intermediate step, we use the GlobalKTable to look up D.  
Once
we have K,D we can then compute the partition and execute a lookup.

Transforming the key to be a composite K,D isn't helpful because the end user 
still
only knows K -- D's relevance is an implementation detail I wish to hide -- so 
you still
need some sort of secondary lookup.

We do use the changelog backup for fault tolerance -- how would having the 
partition
based on the value break this?  Is the changelog implicitly partitioned by a 
partitioner
other than the one we give to the topology?

Hopefully that explains my situation a bit more?  Thanks!

> 
> -Matthias
> 
> 
> 
> On 6/2/17 10:34 AM, Steven Schlansker wrote:
>> I have a KTable and backing store whose partitioning is value dependent.
>> I want certain groups of messages to be ordered and that grouping is 
>> determined
>> by one field (D) of the (possibly large) value.
>> 
>> When I lookup by only K, obviously you don't know the partition it should be 
>> on.
>> So I will build a GlobalKTable of K -> D.  This gives me enough information
>> to determine the partition.
>> 
>> Unfortunately, the KafkaStreams metadata API doesn't fit this use case well.
>> It allows you to either get all metadata, or by key -- but if you lookup by 
>> key
>> it just substitutes a null value (causing a downstream NPE)
>> 
>> I can iterate over all metadata and compute the mapping of K -> K,D -> P
>> and then iterate over all metadata looking for P.  It's not difficult but 
>> ends
>> up being a bit of somewhat ugly code that feels like I shouldn't have to 
>> write it.
>> 
>> Am I missing something here?  Is there a better way that I've missed?  
>> Thanks!
>> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Suppressing intermediate topics feeding (Global)KTable

2017-06-02 Thread Matthias J. Sax
Hi,

If you want to populate a GlobalKTable you can only do this by reading a
topic. So the short answer for you head line is: no, you can suppress
the intermediate topic.

However, I am wondering what the purpose of you secondary index is, and
why you are using a GlobalKTable for it. Maybe you can elaborate a
little bit?

I am also wondering about this code snippet:

>> builder.stream(mainTopic)
>>.mapValues(...)
>>.to(secondaryIndex1)

Should it not be .map() that transforms (k,v) ->
(v.getSecondaryKey1(),k) ? Just for my understanding what you are doing.


-Matthias


On 6/2/17 12:28 PM, Steven Schlansker wrote:
> Hi everyone, another question for the list :)
> 
> I'm creating a cluster of KTable (and GlobalKTable) based off the same
> input stream K,V.
> 
> It has a number of secondary indices (think like a RDBMS)
> K1 -> K
> K2 -> K
> etc
> 
> These are all based off of trivial mappings from my main stream that also
> feeds the K -> V StateStore.  Think one liners like v -> v.getSecondaryKey1()
> 
> Currently, for each one it seems that I have to do
> 
> builder.stream(mainTopic)
>.mapValues(...)
>.to(secondaryIndex1)
> 
> builder.globalTable(secondaryIndex1, secondaryIndexStore1);
> 
> Unfortunately the intermediate "secondaryIndex1" topic is relatively
> low value.  In a case where my state stores are lost, I already have to
> read through the mainTopic to recover the main state store.  While it's doing
> that, I'd much rather it rebuild the GlobalKTable instance from that data
> directly.  Then I could skip having this index in Kafka at all, it's entirely
> redundant.  The data is already loaded and deserialized for the benefit of
> another Processor.
> 
> Any thoughts?  Happy Friday,
> Steven
> 



signature.asc
Description: OpenPGP digital signature


Re: Finding StreamsMetadata with value-dependent partitioning

2017-06-02 Thread Matthias J. Sax
I am not sure if I understand the use case correctly. Could you give
some more context?

> backing store whose partitioning is value dependent

In infer that you are using a custom store and not default RocksDB? If
yes, what do you use? What does "value dependent" mean in this context?


Right now, I am wondering, why you not just set a new key to get your
data grouped by the field you are interesting in? Also, if you don't
partitioned your data by key, you might break your streams application
with regard to fault-tolerance -- or does your custom store not rely on
changelog backup for fault-tolerance?


-Matthias



On 6/2/17 10:34 AM, Steven Schlansker wrote:
> I have a KTable and backing store whose partitioning is value dependent.
> I want certain groups of messages to be ordered and that grouping is 
> determined
> by one field (D) of the (possibly large) value.
> 
> When I lookup by only K, obviously you don't know the partition it should be 
> on.
> So I will build a GlobalKTable of K -> D.  This gives me enough information
> to determine the partition.
> 
> Unfortunately, the KafkaStreams metadata API doesn't fit this use case well.
> It allows you to either get all metadata, or by key -- but if you lookup by 
> key
> it just substitutes a null value (causing a downstream NPE)
> 
> I can iterate over all metadata and compute the mapping of K -> K,D -> P
> and then iterate over all metadata looking for P.  It's not difficult but ends
> up being a bit of somewhat ugly code that feels like I shouldn't have to 
> write it.
> 
> Am I missing something here?  Is there a better way that I've missed?  Thanks!
> 



signature.asc
Description: OpenPGP digital signature


Re: Zookeeper on same server as Kafka

2017-06-02 Thread Mohammed Manna
Usually, the overhead comes when you have kafka and zookeeper doing the
housekeeping (i.e. Disk IO) on the same server. ZK even suggests that you
should keep their logs on totally different physical machine for better
performance. Furthermore, if a mechanical failure occurs, you might not
want both zookeeper and broker going down together.

Can anyone else chime in for some better points?


On 2 Jun 2017 7:57 pm, "Meghana Narasimhan" 
wrote:

Hi,
What are the pros and cons of setting up Zookeeper on the same server as
the Kafka broker ? Earlier offsets were being written to zookeeper which
was a major overhead but with offsets being written to Kafka now, what
other requirements should be taken into consideration for setting up
Zookeeper on the same server as Kafka vs having a separate zookeeper
cluster ?

Thanks,
Meghana


Suppressing intermediate topics feeding (Global)KTable

2017-06-02 Thread Steven Schlansker
Hi everyone, another question for the list :)

I'm creating a cluster of KTable (and GlobalKTable) based off the same
input stream K,V.

It has a number of secondary indices (think like a RDBMS)
K1 -> K
K2 -> K
etc

These are all based off of trivial mappings from my main stream that also
feeds the K -> V StateStore.  Think one liners like v -> v.getSecondaryKey1()

Currently, for each one it seems that I have to do

builder.stream(mainTopic)
   .mapValues(...)
   .to(secondaryIndex1)

builder.globalTable(secondaryIndex1, secondaryIndexStore1);

Unfortunately the intermediate "secondaryIndex1" topic is relatively
low value.  In a case where my state stores are lost, I already have to
read through the mainTopic to recover the main state store.  While it's doing
that, I'd much rather it rebuild the GlobalKTable instance from that data
directly.  Then I could skip having this index in Kafka at all, it's entirely
redundant.  The data is already loaded and deserialized for the benefit of
another Processor.

Any thoughts?  Happy Friday,
Steven



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Losing messages in Kafka Streams after upgrading

2017-06-02 Thread Matthias J. Sax
Hi Frank,

yes, retention policy is based on the embedded record timestamps and not
on system time. Thus, if you send messages with an old timestamp, they
can trigger log/segment rolling.

>> I see that the repartition topics have timestamp.type = CreateTime, does
>> that mean it uses the timestamp of the
>> original message? 

Yes. That's the default setting on the broker side. For Streams, we
maintain a so-called "stream time" that is computed based on the input
record timestamps. This "stream time" is used to set the timestamp for
records that are written by Stream. (so it's more or less the timestamp
of the input records).

>> Shouldn't that be LogAppendTime for repartition topics?

No. Streams needs to preserve the original timestamp to guaranteed
correct semantics for downstream window operations. Thus, it should be
CreateTime -- if you switch to LogAppendTime, you might break your
application logic and get wrong results.

>> Or is there a way to configure that?

You can configure this on a per topic basis on the brokers.

>> If I hack into my Kafka streams code to force it to use LogAppendTime seems
>> to solve my problem, but that appears to
>> take a huge toll on the brokers. Throughput plummets, and I don't really
>> know why.

I am not sure what you mean by this? As it's a topic config, I don't
understand how you can force this within you Streams application?


IMHO, you have multiple options thoug:
 - increase the retention time for you re-partitioning topics
 - you could change the retention policy to number of bytes instead of
time for the re-partitioning topics
 - you can implement a custom timestamp extractor and adjust the
timestamps accordingly ("stream time" is based on whatever timestamp
extractor return)

However, if you have records with old timestamps, I am wondering why
they are not truncated in your input topic? Do you not face the same
issue there?


-Matthias





On 6/2/17 9:33 AM, Frank Lyaruu wrote:
> Hi Kafka people,
> 
> I'm running an application that pushes database changes into a Kafka topic.
> I'm also running a Kafka streams application
> that listens to these topics, and groups them using the high level API, and
> inserts them to another database.
> 
> All topics are compacted, with the exception of the 'repartition topics',
> which are configured to be retained for 36 hours.
> 
> Note that the changes in the original kafka topics can be old (generally
> more than 36 hours), as they only change when
> the data changes.
> 
> When I start an instance of the Kafka Streams application, I see the
> repartition topics being deleted immediately,
> sometimes before they are processed, and it looks like the repartition
> messages use the same timestamp as the
> original message.
> 
> I see that the repartition topics have timestamp.type = CreateTime, does
> that mean it uses the timestamp of the
> original message? Shouldn't that be LogAppendTime for repartition topics?
> Or is there a way to configure that?
> 
> If I hack into my Kafka streams code to force it to use LogAppendTime seems
> to solve my problem, but that appears to
> take a huge toll on the brokers. Throughput plummets, and I don't really
> know why.
> 
> Any ideas?
> 
> Frank
> 



signature.asc
Description: OpenPGP digital signature


Zookeeper on same server as Kafka

2017-06-02 Thread Meghana Narasimhan
Hi,
What are the pros and cons of setting up Zookeeper on the same server as
the Kafka broker ? Earlier offsets were being written to zookeeper which
was a major overhead but with offsets being written to Kafka now, what
other requirements should be taken into consideration for setting up
Zookeeper on the same server as Kafka vs having a separate zookeeper
cluster ?

Thanks,
Meghana


Re: LDAP integration with kafka brokers

2017-06-02 Thread Nixon Rodrigues
I dont know whether ldap can directly be integrated with kafka client and
server, but kafka can use kerberos type authentication.
While kerberos can integrate with AD for user store and authentication of
user.

Hope this is helpfull

Nixon

On Fri, Jun 2, 2017 at 1:58 AM, Arunkumar 
wrote:

>
> Hi Group,
>
> We have a requirement to integrate Kafka cluster with LDAP for
> authentication. I googled and I did not find much about it.
> Out requirement is when a producer sends a message to a broker it would
> send producer credentials like topic to which it wants to write to and
> userId with passcode. We would like to authenticate it against out
> Enterprise LDAP server for authenticity. If there is no LDAP support
> available,  we are planning to customize the code. Any insight on this is
> highly appreciated.
>
> Thanks in advance
> Arunkumar Pichaimuthu, PMP
>


Finding StreamsMetadata with value-dependent partitioning

2017-06-02 Thread Steven Schlansker
I have a KTable and backing store whose partitioning is value dependent.
I want certain groups of messages to be ordered and that grouping is determined
by one field (D) of the (possibly large) value.

When I lookup by only K, obviously you don't know the partition it should be on.
So I will build a GlobalKTable of K -> D.  This gives me enough information
to determine the partition.

Unfortunately, the KafkaStreams metadata API doesn't fit this use case well.
It allows you to either get all metadata, or by key -- but if you lookup by key
it just substitutes a null value (causing a downstream NPE)

I can iterate over all metadata and compute the mapping of K -> K,D -> P
and then iterate over all metadata looking for P.  It's not difficult but ends
up being a bit of somewhat ugly code that feels like I shouldn't have to write 
it.

Am I missing something here?  Is there a better way that I've missed?  Thanks!



signature.asc
Description: Message signed with OpenPGP using GPGMail


Losing messages in Kafka Streams after upgrading

2017-06-02 Thread Frank Lyaruu
Hi Kafka people,

I'm running an application that pushes database changes into a Kafka topic.
I'm also running a Kafka streams application
that listens to these topics, and groups them using the high level API, and
inserts them to another database.

All topics are compacted, with the exception of the 'repartition topics',
which are configured to be retained for 36 hours.

Note that the changes in the original kafka topics can be old (generally
more than 36 hours), as they only change when
the data changes.

When I start an instance of the Kafka Streams application, I see the
repartition topics being deleted immediately,
sometimes before they are processed, and it looks like the repartition
messages use the same timestamp as the
original message.

I see that the repartition topics have timestamp.type = CreateTime, does
that mean it uses the timestamp of the
original message? Shouldn't that be LogAppendTime for repartition topics?
Or is there a way to configure that?

If I hack into my Kafka streams code to force it to use LogAppendTime seems
to solve my problem, but that appears to
take a huge toll on the brokers. Throughput plummets, and I don't really
know why.

Any ideas?

Frank


Re: Data in kafka topic in Json format

2017-06-02 Thread Mina Aslani
Hi Hans,

Thank you for your quick response, appreciate it.

In *kafka-connect* docker, I see below settings in
*kafka-connect.properties* file in *kafka-connect *directory:

key.converter.schemas.enable=false
key.converter.schema.registry.url=http://kafka-schema-registry:
value.converter.schema.registry.url=http://kafka-schema-registry:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter.schemas.enable=false
rest.advertised.host.name=kafka-connect
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
And the settings in *schema-registry *directory of *kafka-connect *docker
are as

https://github.com/confluentinc/schema-registry/tree/master/config

Should I consider any other settings for *kafka-connect* or
*schema-registry* to get the real json object NOT string
formatted/stringified json which has extra "\"  and is not json any more?

Best regards,
Mina

On Fri, Jun 2, 2017 at 11:18 AM, Hans Jespersen  wrote:

>
> My earlier comment still applies but in Kafka Connect the equivalent of a
> serializer/deserializer (serdes) is called a “converter”.
> Check which converter you have configured for your source connector and if
> it is overriding whatever the default converter is configured for the
> connect worker it is running in.
>
> -hans
>
>
>
>
> > On Jun 2, 2017, at 8:12 AM, Mina Aslani  wrote:
> >
> > Hi,
> >
> > I would like to add that I use kafka-connect and schema-registery
> version `
> > 3.2.1-6`.
> >
> > Best regards,
> > Mina
> >
> > On Fri, Jun 2, 2017 at 10:59 AM, Mina Aslani 
> wrote:
> >
> >> Hi.
> >>
> >> Is there any way that I get the data into a Kafka topic in Json format?
> >> The source that I ingest the data from have the data in Json format,
> >> however when I look that data in the kafka topic, schema and payload
> fields
> >> are added and data is not in json format.
> >>
> >> I want to avoid implementing a transformer to get the data from the
> topic
> >> and publishes Json in another one.
> >>
> >> Your input is appreciated.
> >>
> >> Best regards,
> >> Mina
> >>
>
>


Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Jay Kreps
Jan, I agree with you philosophically. I think one practical challenge has
to do with data formats. Many people use untyped events, so there is simply
no guarantee on the form of the input. E.g. many companies use JSON without
any kind of schema so it becomes very hard to assert anything about the
input which makes these programs very fragile to the "one accidental
message publication that creates an unsolvable problem.

For that reason I do wonder if limiting to just serialization actually gets
you a useful solution. For JSON it will help with the problem of
non-parseable JSON, but sounds like it won't help in the case where the
JSON is well-formed but does not have any of the fields you expect and
depend on for your processing. I expect the reason for limiting the scope
is it is pretty hard to reason about correctness for anything that stops in
the middle of processing an operator DAG?

-Jay

On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak 
wrote:

> IMHO your doing it wrong then. + building to much support into the kafka
> eco system is very counterproductive in fostering a happy userbase
>
>
>
> On 02.06.2017 13:15, Damian Guy wrote:
>
>> Jan, you have a choice to Fail fast if you want. This is about giving
>> people options and there are times when you don't want to fail fast.
>>
>>
>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak 
>> wrote:
>>
>> Hi
>>>
>>> 1.
>>> That greatly complicates monitoring.  Fail Fast gives you that when you
>>> monitor only the lag of all your apps
>>> you are completely covered. With that sort of new application Monitoring
>>> is very much more complicated as
>>> you know need to monitor fail % of some special apps aswell. In my
>>> opinion that is a huge downside already.
>>>
>>> 2.
>>> using a schema regerstry like Avrostuff it might not even be the record
>>> that is broken, it might be just your app
>>> unable to fetch a schema it needs now know. Maybe you got partitioned
>>> away from that registry.
>>>
>>> 3. When you get alerted because of to high fail percentage. what are the
>>> steps you gonna do?
>>> shut it down to buy time. fix the problem. spend way to much time to
>>> find a good reprocess offset.
>>> Your timewindows are in bad shape anyways, and you pretty much lost.
>>> This routine is nonsense.
>>>
>>> Dead letter queues would be the worst possible addition to the kafka
>>> toolkit that I can think of. It just doesn't fit the architecture
>>> of having clients falling behind is a valid option.
>>>
>>> Further. I mentioned already the only bad pill ive seen so far is crc
>>> errors. any plans for those?
>>>
>>> Best Jan
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 02.06.2017 11:34, Damian Guy wrote:
>>>
 I agree with what Matthias has said w.r.t failing fast. There are plenty

>>> of
>>>
 times when you don't want to fail-fast and must attempt to  make

>>> progress.
>>>
 The dead-letter queue is exactly for these circumstances. Of course if
 every record is failing, then you probably do want to give up.

 On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax 

>>> wrote:
>>>
 First a meta comment. KIP discussion should take place on the dev list
> -- if user list is cc'ed please make sure to reply to both lists.
>
 Thanks.
>>>
 Thanks for making the scope of the KIP clear. Makes a lot of sense to
> focus on deserialization exceptions for now.
>
> With regard to corrupted state stores, would it make sense to fail a
> task and wipe out the store to repair it via recreation from the
> changelog? That's of course a quite advance pattern, but I want to
> bring
> it up to design the first step in a way such that we can get there (if
> we think it's a reasonable idea).
>
> I also want to comment about fail fast vs making progress. I think that
> fail-fast must not always be the best option. The scenario I have in
> mind is like this: you got a bunch of producers that feed the Streams
> input topic. Most producers work find, but maybe one producer miss
> behaves and the data it writes is corrupted. You might not even be able
> to recover this lost data at any point -- thus, there is no reason to
> stop processing but you just skip over those records. Of course, you
> need to fix the root cause, and thus you need to alert (either via logs
> of the exception handler directly) and you need to start to investigate
> to find the bad producer, shut it down and fix it.
>
> Here the dead letter queue comes into place. From my understanding, the
> purpose of this feature is solely enable post debugging. I don't think
> those record would be fed back at any point in time (so I don't see any
> ordering issue -- a skipped record, with this regard, is just "fully
> processed"). Thus, the dead letter queue should actually encode the
> original records metadata (topic, partition offset etc) to enable such
> debugging. I guess, this might also be 

Re: Data in kafka topic in Json format

2017-06-02 Thread Hans Jespersen

My earlier comment still applies but in Kafka Connect the equivalent of a 
serializer/deserializer (serdes) is called a “converter”.
Check which converter you have configured for your source connector and if it 
is overriding whatever the default converter is configured for the connect 
worker it is running in.

-hans




> On Jun 2, 2017, at 8:12 AM, Mina Aslani  wrote:
> 
> Hi,
> 
> I would like to add that I use kafka-connect and schema-registery version `
> 3.2.1-6`.
> 
> Best regards,
> Mina
> 
> On Fri, Jun 2, 2017 at 10:59 AM, Mina Aslani  wrote:
> 
>> Hi.
>> 
>> Is there any way that I get the data into a Kafka topic in Json format?
>> The source that I ingest the data from have the data in Json format,
>> however when I look that data in the kafka topic, schema and payload fields
>> are added and data is not in json format.
>> 
>> I want to avoid implementing a transformer to get the data from the topic
>> and publishes Json in another one.
>> 
>> Your input is appreciated.
>> 
>> Best regards,
>> Mina
>> 



Re: Data in kafka topic in Json format

2017-06-02 Thread Hans Jespersen

Check which serializer you have configured in your producer. You are probably 
using an Avro serializer which will add the schema and modify the payload to 
avro data. You can use a String serializer or a ByteArray serializer and the 
data will either be Base64 encoded or not encoded at all.

-hans



> On Jun 2, 2017, at 7:59 AM, Mina Aslani  wrote:
> 
> Hi.
> 
> Is there any way that I get the data into a Kafka topic in Json format?
> The source that I ingest the data from have the data in Json format,
> however when I look that data in the kafka topic, schema and payload fields
> are added and data is not in json format.
> 
> I want to avoid implementing a transformer to get the data from the topic
> and publishes Json in another one.
> 
> Your input is appreciated.
> 
> Best regards,
> Mina



Re: Data in kafka topic in Json format

2017-06-02 Thread Mina Aslani
Hi,

I would like to add that I use kafka-connect and schema-registery version `
3.2.1-6`.

Best regards,
Mina

On Fri, Jun 2, 2017 at 10:59 AM, Mina Aslani  wrote:

> Hi.
>
> Is there any way that I get the data into a Kafka topic in Json format?
> The source that I ingest the data from have the data in Json format,
> however when I look that data in the kafka topic, schema and payload fields
> are added and data is not in json format.
>
> I want to avoid implementing a transformer to get the data from the topic
> and publishes Json in another one.
>
> Your input is appreciated.
>
> Best regards,
> Mina
>


Kafka Over TLS Error - Failed to send SSL Close message - Broken Pipe

2017-06-02 Thread IT Consultant
Hi All,

I have been seeing below error since three days ,

Can you please help me understand more about this ,


WARN Failed to send SSL Close message
(org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
 at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
 at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
 at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
 at sun.nio.ch.IOUtil.write(IOUtil.java:65)
 at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
 at
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
 at
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:148)
 at
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:45)
 at
org.apache.kafka.common.network.Selector.close(Selector.java:442)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:310)
 at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
 at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
 at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
 at java.lang.Thread.run(Thread.java:745)


Thanks  a lot.


Data in kafka topic in Json format

2017-06-02 Thread Mina Aslani
Hi.

Is there any way that I get the data into a Kafka topic in Json format?
The source that I ingest the data from have the data in Json format,
however when I look that data in the kafka topic, schema and payload fields
are added and data is not in json format.

I want to avoid implementing a transformer to get the data from the topic
and publishes Json in another one.

Your input is appreciated.

Best regards,
Mina


Re: Cluster in weird state: no leaders no ISR for all topics, but it works!

2017-06-02 Thread Del Barrio, Alberto
So, I fixed the problem doing a rolling restart, and after some checks
seems there was no data loss.

On 1 June 2017 at 17:57, Del Barrio, Alberto <
alberto.delbar...@360dialog.com> wrote:

> I might give it a try tomorrow. The reason for having so large init and
> sync limit times is because in the past our ZK cluster was storing large
> amount of data, and lower values were not enough for the server syncs when
> restarting zk processes.
>
> On 1 June 2017 at 17:52, Mohammed Manna  wrote:
>
>> Cool - I will try and take a look into this - Meanwhile, do you mind
>> awfuly
>> to change the following and see if things improve?
>>
>> tickTime = 1000
>> initLimit=3
>> syncLimit=5
>>
>> On 1 June 2017 at 16:49, Del Barrio, Alberto <
>> alberto.delbar...@360dialog.com> wrote:
>>
>> > Here are the configs you were asking for:
>> >
>> > Zookeeper:
>> > tickTime=1000
>> > initLimit=2000
>> > syncLimit=1000
>> > dataDir=/var/lib/zookeeper
>> > clientPort=2181
>> > server.3=10.0.0.3:2888:3888
>> > server.2=10.0.0.2:2888:3888
>> > server.1=10.0.0.1:2888:3888
>> >
>> >
>> > Kafka broker (for one of them):
>> > broker.id=10
>> > listeners=PLAINTEXT://10.0.0.4:9092
>> > num.network.threads=3
>> > num.io.threads=8
>> > socket.send.buffer.bytes=102400
>> > socket.receive.buffer.bytes=102400
>> > socket.request.max.bytes=104857600
>> > log.dirs=/var/lib/kafka
>> > num.partitions=2
>> > num.recovery.threads.per.data.dir=1
>> > zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181/kafka
>> > zookeeper.connection.timeout.ms=6000
>> >
>> > In general they're pretty much the default ones.
>> > I can see in Zookeeper the kafka brokers connected to it and exchanging
>> > data...
>> >
>> > Thanks for your help and time.
>> >
>> > On 1 June 2017 at 17:32, Mohammed Manna  wrote:
>> >
>> > > Could you please share your broker/zookeeper/topic configs ?
>> > >
>> > > On 1 June 2017 at 16:18, Del Barrio, Alberto <
>> > > alberto.delbar...@360dialog.com> wrote:
>> > >
>> > > > I tried creating the topic and results are very similar to the
>> current
>> > > > situation: there are not ISR and no leader for any of the
>> partitions,
>> > but
>> > > > now kafka-topics shows *Leader: none* when for all the other
>> topics, it
>> > > > shows *Leader: -1*
>> > > >
>> > > >
>> > > > On 1 June 2017 at 17:05, Mohammed Manna  wrote:
>> > > >
>> > > > > I had a similar situation, but only 1 of my ZKs was struggling -
>> but
>> > > > since
>> > > > > the ISR synching time is configurable I was confident to bounce 1
>> ZK
>> > > at a
>> > > > > time and it worked out.
>> > > > > does it happen even when you create a new topic with a
>> > > > > replication:partition ration of 1?
>> > > > >
>> > > > > i meant, 3 replicas, 3 partitions :)
>> > > > >
>> > > > > On 1 June 2017 at 15:58, Del Barrio, Alberto <
>> > > > > alberto.delbar...@360dialog.com> wrote:
>> > > > >
>> > > > > > Hi Mohammed,
>> > > > > >
>> > > > > > thanks for your answer.
>> > > > > > The ZK cluster is not located in the servers where Kafka runs
>> but
>> > in
>> > > > > other
>> > > > > > 3 different machines. This ZK cluster is used by several other
>> > > services
>> > > > > > which are not reporting problems.
>> > > > > > As you suggested, I haven't tried restarting the kafka-server
>> > > processes
>> > > > > > because there's no leader for topic partitions, so I don't know
>> > what
>> > > > will
>> > > > > > happen. Never been in a similar situation with Kafka after some
>> > years
>> > > > > using
>> > > > > > it.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On 1 June 2017 at 16:29, Mohammed Manna 
>> > wrote:
>> > > > > >
>> > > > > > > Hi Alberto,
>> > > > > > >
>> > > > > > > Usually this means that the leader election/replica syncing
>> > > couldn't
>> > > > be
>> > > > > > > successful and the zookeeper logs should be able to show this
>> > > > > information
>> > > > > > > too. The leader -1 is what worries me. For your case (3 broker
>> > > > > cluster),
>> > > > > > I
>> > > > > > > am assuming you have done the cluster configuration to have 1
>> > > > > > > broker-zookeeper setup ?
>> > > > > > > If that's the case, you should be able to bounce 1 zookeeper
>> at a
>> > > > time
>> > > > > > and
>> > > > > > > see if that resolves the issue.
>> > > > > > >
>> > > > > > > That said, have you restarted  your servers since this issue
>> > > > surfaced?
>> > > > > > >
>> > > > > > > On 1 June 2017 at 14:11, Del Barrio, Alberto <
>> > > > > > > alberto.delbar...@360dialog.com> wrote:
>> > > > > > >
>> > > > > > > > Hi all,
>> > > > > > > >
>> > > > > > > > I'm experiencing an issue which I don't know how to solve,
>> so
>> > I'm
>> > > > > > trying
>> > > > > > > to
>> > > > > > > > find some guidance on the topic.
>> > > > > > > >
>> > > > > > > > I have a cluster composed by 3 servers, one broker per
>> server
>> > > > running
>> > > > > > > Kafka
>> > > > > > > > 0.10.0.1-1 which runs in production with around 100 topics,
>> > most
>> > > of
>> > 

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Jan Filipiak
IMHO your doing it wrong then. + building to much support into the kafka 
eco system is very counterproductive in fostering a happy userbase



On 02.06.2017 13:15, Damian Guy wrote:

Jan, you have a choice to Fail fast if you want. This is about giving
people options and there are times when you don't want to fail fast.


On Fri, 2 Jun 2017 at 11:00 Jan Filipiak  wrote:


Hi

1.
That greatly complicates monitoring.  Fail Fast gives you that when you
monitor only the lag of all your apps
you are completely covered. With that sort of new application Monitoring
is very much more complicated as
you know need to monitor fail % of some special apps aswell. In my
opinion that is a huge downside already.

2.
using a schema regerstry like Avrostuff it might not even be the record
that is broken, it might be just your app
unable to fetch a schema it needs now know. Maybe you got partitioned
away from that registry.

3. When you get alerted because of to high fail percentage. what are the
steps you gonna do?
shut it down to buy time. fix the problem. spend way to much time to
find a good reprocess offset.
Your timewindows are in bad shape anyways, and you pretty much lost.
This routine is nonsense.

Dead letter queues would be the worst possible addition to the kafka
toolkit that I can think of. It just doesn't fit the architecture
of having clients falling behind is a valid option.

Further. I mentioned already the only bad pill ive seen so far is crc
errors. any plans for those?

Best Jan






On 02.06.2017 11:34, Damian Guy wrote:

I agree with what Matthias has said w.r.t failing fast. There are plenty

of

times when you don't want to fail-fast and must attempt to  make

progress.

The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax 

wrote:

First a meta comment. KIP discussion should take place on the dev list
-- if user list is cc'ed please make sure to reply to both lists.

Thanks.

Thanks for making the scope of the KIP clear. Makes a lot of sense to
focus on deserialization exceptions for now.

With regard to corrupted state stores, would it make sense to fail a
task and wipe out the store to repair it via recreation from the
changelog? That's of course a quite advance pattern, but I want to bring
it up to design the first step in a way such that we can get there (if
we think it's a reasonable idea).

I also want to comment about fail fast vs making progress. I think that
fail-fast must not always be the best option. The scenario I have in
mind is like this: you got a bunch of producers that feed the Streams
input topic. Most producers work find, but maybe one producer miss
behaves and the data it writes is corrupted. You might not even be able
to recover this lost data at any point -- thus, there is no reason to
stop processing but you just skip over those records. Of course, you
need to fix the root cause, and thus you need to alert (either via logs
of the exception handler directly) and you need to start to investigate
to find the bad producer, shut it down and fix it.

Here the dead letter queue comes into place. From my understanding, the
purpose of this feature is solely enable post debugging. I don't think
those record would be fed back at any point in time (so I don't see any
ordering issue -- a skipped record, with this regard, is just "fully
processed"). Thus, the dead letter queue should actually encode the
original records metadata (topic, partition offset etc) to enable such
debugging. I guess, this might also be possible if you just log the bad
records, but it would be harder to access (you first must find the
Streams instance that did write the log and extract the information from
there). Reading it from topic is much simpler.

I also want to mention the following. Assume you have such a topic with
some bad records and some good records. If we always fail-fast, it's
going to be super hard to process the good data. You would need to write
an extra app that copied the data into a new topic filtering out the bad
records (or apply the map() workaround withing stream). So I don't think
that failing fast is most likely the best option in production is
necessarily, true.

Or do you think there are scenarios, for which you can recover the
corrupted records successfully? And even if this is possible, it might
be a case for reprocessing instead of failing the whole application?
Also, if you think you can "repair" a corrupted record, should the
handler allow to return a "fixed" record? This would solve the ordering
problem.



-Matthias




On 5/30/17 1:47 AM, Michael Noll wrote:

Thanks for your work on this KIP, Eno -- much appreciated!

- I think it would help to improve the KIP by adding an end-to-end code
example that demonstrates, with the DSL and with the Processor API, how

the

user would write a simple application that would then be augmented with

the

proposed KIP

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Damian Guy
Jan, you have a choice to Fail fast if you want. This is about giving
people options and there are times when you don't want to fail fast.


On Fri, 2 Jun 2017 at 11:00 Jan Filipiak  wrote:

> Hi
>
> 1.
> That greatly complicates monitoring.  Fail Fast gives you that when you
> monitor only the lag of all your apps
> you are completely covered. With that sort of new application Monitoring
> is very much more complicated as
> you know need to monitor fail % of some special apps aswell. In my
> opinion that is a huge downside already.
>
> 2.
> using a schema regerstry like Avrostuff it might not even be the record
> that is broken, it might be just your app
> unable to fetch a schema it needs now know. Maybe you got partitioned
> away from that registry.
>
> 3. When you get alerted because of to high fail percentage. what are the
> steps you gonna do?
> shut it down to buy time. fix the problem. spend way to much time to
> find a good reprocess offset.
> Your timewindows are in bad shape anyways, and you pretty much lost.
> This routine is nonsense.
>
> Dead letter queues would be the worst possible addition to the kafka
> toolkit that I can think of. It just doesn't fit the architecture
> of having clients falling behind is a valid option.
>
> Further. I mentioned already the only bad pill ive seen so far is crc
> errors. any plans for those?
>
> Best Jan
>
>
>
>
>
>
> On 02.06.2017 11:34, Damian Guy wrote:
> > I agree with what Matthias has said w.r.t failing fast. There are plenty
> of
> > times when you don't want to fail-fast and must attempt to  make
> progress.
> > The dead-letter queue is exactly for these circumstances. Of course if
> > every record is failing, then you probably do want to give up.
> >
> > On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax 
> wrote:
> >
> >> First a meta comment. KIP discussion should take place on the dev list
> >> -- if user list is cc'ed please make sure to reply to both lists.
> Thanks.
> >>
> >> Thanks for making the scope of the KIP clear. Makes a lot of sense to
> >> focus on deserialization exceptions for now.
> >>
> >> With regard to corrupted state stores, would it make sense to fail a
> >> task and wipe out the store to repair it via recreation from the
> >> changelog? That's of course a quite advance pattern, but I want to bring
> >> it up to design the first step in a way such that we can get there (if
> >> we think it's a reasonable idea).
> >>
> >> I also want to comment about fail fast vs making progress. I think that
> >> fail-fast must not always be the best option. The scenario I have in
> >> mind is like this: you got a bunch of producers that feed the Streams
> >> input topic. Most producers work find, but maybe one producer miss
> >> behaves and the data it writes is corrupted. You might not even be able
> >> to recover this lost data at any point -- thus, there is no reason to
> >> stop processing but you just skip over those records. Of course, you
> >> need to fix the root cause, and thus you need to alert (either via logs
> >> of the exception handler directly) and you need to start to investigate
> >> to find the bad producer, shut it down and fix it.
> >>
> >> Here the dead letter queue comes into place. From my understanding, the
> >> purpose of this feature is solely enable post debugging. I don't think
> >> those record would be fed back at any point in time (so I don't see any
> >> ordering issue -- a skipped record, with this regard, is just "fully
> >> processed"). Thus, the dead letter queue should actually encode the
> >> original records metadata (topic, partition offset etc) to enable such
> >> debugging. I guess, this might also be possible if you just log the bad
> >> records, but it would be harder to access (you first must find the
> >> Streams instance that did write the log and extract the information from
> >> there). Reading it from topic is much simpler.
> >>
> >> I also want to mention the following. Assume you have such a topic with
> >> some bad records and some good records. If we always fail-fast, it's
> >> going to be super hard to process the good data. You would need to write
> >> an extra app that copied the data into a new topic filtering out the bad
> >> records (or apply the map() workaround withing stream). So I don't think
> >> that failing fast is most likely the best option in production is
> >> necessarily, true.
> >>
> >> Or do you think there are scenarios, for which you can recover the
> >> corrupted records successfully? And even if this is possible, it might
> >> be a case for reprocessing instead of failing the whole application?
> >> Also, if you think you can "repair" a corrupted record, should the
> >> handler allow to return a "fixed" record? This would solve the ordering
> >> problem.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >>
> >> On 5/30/17 1:47 AM, Michael Noll wrote:
> >>> Thanks for your work on this KIP, Eno -- much appreciated!
> >>>
> >>> - I think it would help to improve the KIP by 

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Jan Filipiak

Hi

1.
That greatly complicates monitoring.  Fail Fast gives you that when you 
monitor only the lag of all your apps
you are completely covered. With that sort of new application Monitoring 
is very much more complicated as
you know need to monitor fail % of some special apps aswell. In my 
opinion that is a huge downside already.


2.
using a schema regerstry like Avrostuff it might not even be the record 
that is broken, it might be just your app
unable to fetch a schema it needs now know. Maybe you got partitioned 
away from that registry.


3. When you get alerted because of to high fail percentage. what are the 
steps you gonna do?
shut it down to buy time. fix the problem. spend way to much time to 
find a good reprocess offset.

Your timewindows are in bad shape anyways, and you pretty much lost.
This routine is nonsense.

Dead letter queues would be the worst possible addition to the kafka 
toolkit that I can think of. It just doesn't fit the architecture

of having clients falling behind is a valid option.

Further. I mentioned already the only bad pill ive seen so far is crc 
errors. any plans for those?


Best Jan






On 02.06.2017 11:34, Damian Guy wrote:

I agree with what Matthias has said w.r.t failing fast. There are plenty of
times when you don't want to fail-fast and must attempt to  make progress.
The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax  wrote:


First a meta comment. KIP discussion should take place on the dev list
-- if user list is cc'ed please make sure to reply to both lists. Thanks.

Thanks for making the scope of the KIP clear. Makes a lot of sense to
focus on deserialization exceptions for now.

With regard to corrupted state stores, would it make sense to fail a
task and wipe out the store to repair it via recreation from the
changelog? That's of course a quite advance pattern, but I want to bring
it up to design the first step in a way such that we can get there (if
we think it's a reasonable idea).

I also want to comment about fail fast vs making progress. I think that
fail-fast must not always be the best option. The scenario I have in
mind is like this: you got a bunch of producers that feed the Streams
input topic. Most producers work find, but maybe one producer miss
behaves and the data it writes is corrupted. You might not even be able
to recover this lost data at any point -- thus, there is no reason to
stop processing but you just skip over those records. Of course, you
need to fix the root cause, and thus you need to alert (either via logs
of the exception handler directly) and you need to start to investigate
to find the bad producer, shut it down and fix it.

Here the dead letter queue comes into place. From my understanding, the
purpose of this feature is solely enable post debugging. I don't think
those record would be fed back at any point in time (so I don't see any
ordering issue -- a skipped record, with this regard, is just "fully
processed"). Thus, the dead letter queue should actually encode the
original records metadata (topic, partition offset etc) to enable such
debugging. I guess, this might also be possible if you just log the bad
records, but it would be harder to access (you first must find the
Streams instance that did write the log and extract the information from
there). Reading it from topic is much simpler.

I also want to mention the following. Assume you have such a topic with
some bad records and some good records. If we always fail-fast, it's
going to be super hard to process the good data. You would need to write
an extra app that copied the data into a new topic filtering out the bad
records (or apply the map() workaround withing stream). So I don't think
that failing fast is most likely the best option in production is
necessarily, true.

Or do you think there are scenarios, for which you can recover the
corrupted records successfully? And even if this is possible, it might
be a case for reprocessing instead of failing the whole application?
Also, if you think you can "repair" a corrupted record, should the
handler allow to return a "fixed" record? This would solve the ordering
problem.



-Matthias




On 5/30/17 1:47 AM, Michael Noll wrote:

Thanks for your work on this KIP, Eno -- much appreciated!

- I think it would help to improve the KIP by adding an end-to-end code
example that demonstrates, with the DSL and with the Processor API, how

the

user would write a simple application that would then be augmented with

the

proposed KIP changes to handle exceptions.  It should also become much
clearer then that e.g. the KIP would lead to different code paths for the
happy case and any failure scenarios.

- Do we have sufficient information available to make informed decisions

on

what to do next?  For example, do we know in which part of the topology

the

record failed? `ConsumerRecord` gives 

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Damian Guy
I agree with what Matthias has said w.r.t failing fast. There are plenty of
times when you don't want to fail-fast and must attempt to  make progress.
The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax  wrote:

> First a meta comment. KIP discussion should take place on the dev list
> -- if user list is cc'ed please make sure to reply to both lists. Thanks.
>
> Thanks for making the scope of the KIP clear. Makes a lot of sense to
> focus on deserialization exceptions for now.
>
> With regard to corrupted state stores, would it make sense to fail a
> task and wipe out the store to repair it via recreation from the
> changelog? That's of course a quite advance pattern, but I want to bring
> it up to design the first step in a way such that we can get there (if
> we think it's a reasonable idea).
>
> I also want to comment about fail fast vs making progress. I think that
> fail-fast must not always be the best option. The scenario I have in
> mind is like this: you got a bunch of producers that feed the Streams
> input topic. Most producers work find, but maybe one producer miss
> behaves and the data it writes is corrupted. You might not even be able
> to recover this lost data at any point -- thus, there is no reason to
> stop processing but you just skip over those records. Of course, you
> need to fix the root cause, and thus you need to alert (either via logs
> of the exception handler directly) and you need to start to investigate
> to find the bad producer, shut it down and fix it.
>
> Here the dead letter queue comes into place. From my understanding, the
> purpose of this feature is solely enable post debugging. I don't think
> those record would be fed back at any point in time (so I don't see any
> ordering issue -- a skipped record, with this regard, is just "fully
> processed"). Thus, the dead letter queue should actually encode the
> original records metadata (topic, partition offset etc) to enable such
> debugging. I guess, this might also be possible if you just log the bad
> records, but it would be harder to access (you first must find the
> Streams instance that did write the log and extract the information from
> there). Reading it from topic is much simpler.
>
> I also want to mention the following. Assume you have such a topic with
> some bad records and some good records. If we always fail-fast, it's
> going to be super hard to process the good data. You would need to write
> an extra app that copied the data into a new topic filtering out the bad
> records (or apply the map() workaround withing stream). So I don't think
> that failing fast is most likely the best option in production is
> necessarily, true.
>
> Or do you think there are scenarios, for which you can recover the
> corrupted records successfully? And even if this is possible, it might
> be a case for reprocessing instead of failing the whole application?
> Also, if you think you can "repair" a corrupted record, should the
> handler allow to return a "fixed" record? This would solve the ordering
> problem.
>
>
>
> -Matthias
>
>
>
>
> On 5/30/17 1:47 AM, Michael Noll wrote:
> > Thanks for your work on this KIP, Eno -- much appreciated!
> >
> > - I think it would help to improve the KIP by adding an end-to-end code
> > example that demonstrates, with the DSL and with the Processor API, how
> the
> > user would write a simple application that would then be augmented with
> the
> > proposed KIP changes to handle exceptions.  It should also become much
> > clearer then that e.g. the KIP would lead to different code paths for the
> > happy case and any failure scenarios.
> >
> > - Do we have sufficient information available to make informed decisions
> on
> > what to do next?  For example, do we know in which part of the topology
> the
> > record failed? `ConsumerRecord` gives us access to topic, partition,
> > offset, timestamp, etc., but what about topology-related information
> (e.g.
> > what is the associated state store, if any)?
> >
> > - Only partly on-topic for the scope of this KIP, but this is about the
> > bigger picture: This KIP would give users the option to send corrupted
> > records to dead letter queue (quarantine topic).  But, what pattern would
> > we advocate to process such a dead letter queue then, e.g. how to allow
> for
> > retries with backoff ("If the first record in the dead letter queue fails
> > again, then try the second record for the time being and go back to the
> > first record at a later time").  Jay and Jan already alluded to ordering
> > problems that will be caused by dead letter queues. As I said, retries
> > might be out of scope but perhaps the implications should be considered
> if
> > possible?
> >
> > Also, I wrote the text below before reaching the point in the
> conversation
> > that this KIP's scope will be limited to exceptions in the category of
> > poiso