Connector configuration is invalid
Hello All, I am getting "Connector configuration is invalid" error with following configuration. Can someone please help me in finding out what I am doing wrong here name=topic_see connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:db2://*.*.*.com:6/*:user=dsetl:password=* mode=bulk table.whitelist=CATALOG topic.prefix=topic-see- schema.registry=GIL Thanks and Regards -- NOTICE: This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure or distribution is prohibited. If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.
Re: Data in kafka topic in Json format
You have shared the Kafka connect properties but not the source connector config. Which source connector are you using? Does it override the default settings you provided? Are you running the connector in standalone mode or distributed mode? Also what are you using to consume the messages and see the message format? -hans > On Jun 2, 2017, at 9:10 AM, Mina Aslani wrote: > > Hi Hans, > > Thank you for your quick response, appreciate it. > > In *kafka-connect* docker, I see below settings in > *kafka-connect.properties* file in *kafka-connect *directory: > > key.converter.schemas.enable=false > key.converter.schema.registry.url=http://kafka-schema-registry: > value.converter.schema.registry.url=http://kafka-schema-registry: > > key.converter=org.apache.kafka.connect.json.JsonConverter > value.converter=org.apache.kafka.connect.json.JsonConverter > > internal.value.converter.schemas.enable=false > rest.advertised.host.name=kafka-connect > internal.value.converter=org.apache.kafka.connect.json.JsonConverter > internal.key.converter=org.apache.kafka.connect.json.JsonConverter > And the settings in *schema-registry *directory of *kafka-connect *docker > are as > > https://github.com/confluentinc/schema-registry/tree/master/config > > Should I consider any other settings for *kafka-connect* or > *schema-registry* to get the real json object NOT string > formatted/stringified json which has extra "\" and is not json any more? > > Best regards, > Mina > > On Fri, Jun 2, 2017 at 11:18 AM, Hans Jespersen wrote: > >> >> My earlier comment still applies but in Kafka Connect the equivalent of a >> serializer/deserializer (serdes) is called a “converter”. >> Check which converter you have configured for your source connector and if >> it is overriding whatever the default converter is configured for the >> connect worker it is running in. >> >> -hans >> >> >> >> >>> On Jun 2, 2017, at 8:12 AM, Mina Aslani wrote: >>> >>> Hi, >>> >>> I would like to add that I use kafka-connect and schema-registery >> version ` >>> 3.2.1-6`. >>> >>> Best regards, >>> Mina >>> >>> On Fri, Jun 2, 2017 at 10:59 AM, Mina Aslani >> wrote: >>> Hi. Is there any way that I get the data into a Kafka topic in Json format? The source that I ingest the data from have the data in Json format, however when I look that data in the kafka topic, schema and payload >> fields are added and data is not in json format. I want to avoid implementing a transformer to get the data from the >> topic and publishes Json in another one. Your input is appreciated. Best regards, Mina >> >>
Re: Finding StreamsMetadata with value-dependent partitioning
> > On Jun 2, 2017, at 3:32 PM, Matthias J. Sax wrote: > > Thanks. That helps to understand the use case better. > > Rephrase to make sure I understood it correctly: > > 1) you are providing a custom partitioner to Streams that is base on one > field in your value (that's fine with regard to fault-tolerance :)) > 2) you want to use interactive queries to query the store > 3) because of your custom partitioning schema, you need to manually > figure out the right application instance that hosts a key > 4) thus, you use a GlobalKTable to maintain the information from K to D > and thus to the partition ie, streams instance that hosts K > > If this is correct, than you cannot use the "by key" metadata interface. > It's designed to find the streams instance base in the key only -- but > your partitioner is based on the value. Internally, we call > >> final Integer partition = partitioner.partition(key, null, >> sourceTopicsInfo.maxPartitions); > > Note, that `value==null` -- at this point, we don't have any value > available and can't provide it to the partitioner. > > Thus, your approach to get all metadata is the only way you can go. Thanks for confirming this. The code is a little ugly but I've done worse :) > > > Very interesting (and quite special) use case. :) > > > -Matthias > > On 6/2/17 2:32 PM, Steven Schlansker wrote: >> >>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax wrote: >>> >>> I am not sure if I understand the use case correctly. Could you give >>> some more context? >> >> Happily, thanks for thinking about this! >> >>> backing store whose partitioning is value dependent >>> >>> In infer that you are using a custom store and not default RocksDB? If >>> yes, what do you use? What does "value dependent" mean in this context? >> >> We're currently using the base in memory store. We tried to use RocksDB >> but the tuning to get it running appropriately in a Linux container without >> tripping the cgroups OOM killer is nontrivial. >> >> >>> Right now, I am wondering, why you not just set a new key to get your >>> data grouped by the field you are interesting in? Also, if you don't >>> partitioned your data by key, you might break your streams application >>> with regard to fault-tolerance -- or does your custom store not rely on >>> changelog backup for fault-tolerance? >>> >> >> That's an interesting point about making transformed key. But I don't think >> it simplifies my problem too much. Essentially, I have a list of messages >> that should get delivered to destinations. Each message has a primary key K >> and a destination D. >> >> We partition over D so that all messages to the same destination are handled >> by >> the same worker, to preserve ordering and implement local rate limits etc. >> >> I want to preserve the illusion to the client that they can look up a key >> with >> only K. So, as an intermediate step, we use the GlobalKTable to look up D. >> Once >> we have K,D we can then compute the partition and execute a lookup. >> >> Transforming the key to be a composite K,D isn't helpful because the end >> user still >> only knows K -- D's relevance is an implementation detail I wish to hide -- >> so you still >> need some sort of secondary lookup. >> >> We do use the changelog backup for fault tolerance -- how would having the >> partition >> based on the value break this? Is the changelog implicitly partitioned by a >> partitioner >> other than the one we give to the topology? >> >> Hopefully that explains my situation a bit more? Thanks! >> >>> >>> -Matthias >>> >>> >>> >>> On 6/2/17 10:34 AM, Steven Schlansker wrote: I have a KTable and backing store whose partitioning is value dependent. I want certain groups of messages to be ordered and that grouping is determined by one field (D) of the (possibly large) value. When I lookup by only K, obviously you don't know the partition it should be on. So I will build a GlobalKTable of K -> D. This gives me enough information to determine the partition. Unfortunately, the KafkaStreams metadata API doesn't fit this use case well. It allows you to either get all metadata, or by key -- but if you lookup by key it just substitutes a null value (causing a downstream NPE) I can iterate over all metadata and compute the mapping of K -> K,D -> P and then iterate over all metadata looking for P. It's not difficult but ends up being a bit of somewhat ugly code that feels like I shouldn't have to write it. Am I missing something here? Is there a better way that I've missed? Thanks! >>> >> > signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Suppressing intermediate topics feeding (Global)KTable
You're entirely right. I'd forgotten that each instance would only read a subset of the main topic. Should have figured that out myself. Thanks for the sanity check! :) > On Jun 2, 2017, at 3:41 PM, Matthias J. Sax wrote: > > Makes sense now (considering the explanation form the other thread). > > With regard to an "opt-in" optimization. We could simplify the API and > hide some details, but it wouldn't buy you anything from an execution > point of view. As you need all data on each instance, you need to > somehow "broadcast" the information -- and Kafka Streams applications > use topics to exchange data. Thus, we need a topic anyhow. > > Does this make sense? > > So your overall architecture seems to be sound to me. > > > -Matthias > > > On 6/2/17 2:37 PM, Steven Schlansker wrote: >> >>> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax wrote: >>> >>> Hi, >>> >>> If you want to populate a GlobalKTable you can only do this by reading a >>> topic. So the short answer for you head line is: no, you can suppress >>> the intermediate topic. >> >> Bummer! Maybe this is an opt-in optimization to consider later. >> >>> >>> However, I am wondering what the purpose of you secondary index is, and >>> why you are using a GlobalKTable for it. Maybe you can elaborate a >>> little bit? >> >> Elaborated on this a bit in the other thread, I was trying to keep separate >> problems separate, but maybe I just made things more confusing! >> >> tl;dr is that the user requests values knowing K, but there is actually a >> "hidden composite key" D that is relevant to the partitioning strategy. >> >> The GlobalKTable allows you to look up K -> D, and then find the right local >> KTable K,D -> V >> >>> >>> I am also wondering about this code snippet: >>> > builder.stream(mainTopic) > .mapValues(...) > .to(secondaryIndex1) >>> >>> Should it not be .map() that transforms (k,v) -> >>> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing. >>> >> >> In this case, the "externally visible" K needs additional information about >> the destination D so that it can be partitioned correctly. So the code looks >> like: >> >>// TODO: sucks that this materializes an intermediate topic >>msgStream >>.mapValues(v -> v == null ? null : >> v.getResolvedDestination().toString()) >>.to(Serdes.String(), Serdes.String(), DEST_INDEX); >> >>builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, >> DEST_INDEX); >> >>> >>> -Matthias >>> >>> >>> On 6/2/17 12:28 PM, Steven Schlansker wrote: Hi everyone, another question for the list :) I'm creating a cluster of KTable (and GlobalKTable) based off the same input stream K,V. It has a number of secondary indices (think like a RDBMS) K1 -> K K2 -> K etc These are all based off of trivial mappings from my main stream that also feeds the K -> V StateStore. Think one liners like v -> v.getSecondaryKey1() Currently, for each one it seems that I have to do builder.stream(mainTopic) .mapValues(...) .to(secondaryIndex1) builder.globalTable(secondaryIndex1, secondaryIndexStore1); Unfortunately the intermediate "secondaryIndex1" topic is relatively low value. In a case where my state stores are lost, I already have to read through the mainTopic to recover the main state store. While it's doing that, I'd much rather it rebuild the GlobalKTable instance from that data directly. Then I could skip having this index in Kafka at all, it's entirely redundant. The data is already loaded and deserialized for the benefit of another Processor. Any thoughts? Happy Friday, Steven >>> >> > signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Suppressing intermediate topics feeding (Global)KTable
Makes sense now (considering the explanation form the other thread). With regard to an "opt-in" optimization. We could simplify the API and hide some details, but it wouldn't buy you anything from an execution point of view. As you need all data on each instance, you need to somehow "broadcast" the information -- and Kafka Streams applications use topics to exchange data. Thus, we need a topic anyhow. Does this make sense? So your overall architecture seems to be sound to me. -Matthias On 6/2/17 2:37 PM, Steven Schlansker wrote: > >> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax wrote: >> >> Hi, >> >> If you want to populate a GlobalKTable you can only do this by reading a >> topic. So the short answer for you head line is: no, you can suppress >> the intermediate topic. > > Bummer! Maybe this is an opt-in optimization to consider later. > >> >> However, I am wondering what the purpose of you secondary index is, and >> why you are using a GlobalKTable for it. Maybe you can elaborate a >> little bit? > > Elaborated on this a bit in the other thread, I was trying to keep separate > problems separate, but maybe I just made things more confusing! > > tl;dr is that the user requests values knowing K, but there is actually a > "hidden composite key" D that is relevant to the partitioning strategy. > > The GlobalKTable allows you to look up K -> D, and then find the right local > KTable K,D -> V > >> >> I am also wondering about this code snippet: >> builder.stream(mainTopic) .mapValues(...) .to(secondaryIndex1) >> >> Should it not be .map() that transforms (k,v) -> >> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing. >> > > In this case, the "externally visible" K needs additional information about > the destination D so that it can be partitioned correctly. So the code looks > like: > > // TODO: sucks that this materializes an intermediate topic > msgStream > .mapValues(v -> v == null ? null : > v.getResolvedDestination().toString()) > .to(Serdes.String(), Serdes.String(), DEST_INDEX); > > builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, > DEST_INDEX); > >> >> -Matthias >> >> >> On 6/2/17 12:28 PM, Steven Schlansker wrote: >>> Hi everyone, another question for the list :) >>> >>> I'm creating a cluster of KTable (and GlobalKTable) based off the same >>> input stream K,V. >>> >>> It has a number of secondary indices (think like a RDBMS) >>> K1 -> K >>> K2 -> K >>> etc >>> >>> These are all based off of trivial mappings from my main stream that also >>> feeds the K -> V StateStore. Think one liners like v -> >>> v.getSecondaryKey1() >>> >>> Currently, for each one it seems that I have to do >>> >>> builder.stream(mainTopic) >>> .mapValues(...) >>> .to(secondaryIndex1) >>> >>> builder.globalTable(secondaryIndex1, secondaryIndexStore1); >>> >>> Unfortunately the intermediate "secondaryIndex1" topic is relatively >>> low value. In a case where my state stores are lost, I already have to >>> read through the mainTopic to recover the main state store. While it's >>> doing >>> that, I'd much rather it rebuild the GlobalKTable instance from that data >>> directly. Then I could skip having this index in Kafka at all, it's >>> entirely >>> redundant. The data is already loaded and deserialized for the benefit of >>> another Processor. >>> >>> Any thoughts? Happy Friday, >>> Steven >>> >> > signature.asc Description: OpenPGP digital signature
Re: Finding StreamsMetadata with value-dependent partitioning
Thanks. That helps to understand the use case better. Rephrase to make sure I understood it correctly: 1) you are providing a custom partitioner to Streams that is base on one field in your value (that's fine with regard to fault-tolerance :)) 2) you want to use interactive queries to query the store 3) because of your custom partitioning schema, you need to manually figure out the right application instance that hosts a key 4) thus, you use a GlobalKTable to maintain the information from K to D and thus to the partition ie, streams instance that hosts K If this is correct, than you cannot use the "by key" metadata interface. It's designed to find the streams instance base in the key only -- but your partitioner is based on the value. Internally, we call > final Integer partition = partitioner.partition(key, null, > sourceTopicsInfo.maxPartitions); Note, that `value==null` -- at this point, we don't have any value available and can't provide it to the partitioner. Thus, your approach to get all metadata is the only way you can go. Very interesting (and quite special) use case. :) -Matthias On 6/2/17 2:32 PM, Steven Schlansker wrote: > >> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax wrote: >> >> I am not sure if I understand the use case correctly. Could you give >> some more context? > > Happily, thanks for thinking about this! > >> >>> backing store whose partitioning is value dependent >> >> In infer that you are using a custom store and not default RocksDB? If >> yes, what do you use? What does "value dependent" mean in this context? > > We're currently using the base in memory store. We tried to use RocksDB > but the tuning to get it running appropriately in a Linux container without > tripping the cgroups OOM killer is nontrivial. > > >> Right now, I am wondering, why you not just set a new key to get your >> data grouped by the field you are interesting in? Also, if you don't >> partitioned your data by key, you might break your streams application >> with regard to fault-tolerance -- or does your custom store not rely on >> changelog backup for fault-tolerance? >> > > That's an interesting point about making transformed key. But I don't think > it simplifies my problem too much. Essentially, I have a list of messages > that should get delivered to destinations. Each message has a primary key K > and a destination D. > > We partition over D so that all messages to the same destination are handled > by > the same worker, to preserve ordering and implement local rate limits etc. > > I want to preserve the illusion to the client that they can look up a key with > only K. So, as an intermediate step, we use the GlobalKTable to look up D. > Once > we have K,D we can then compute the partition and execute a lookup. > > Transforming the key to be a composite K,D isn't helpful because the end user > still > only knows K -- D's relevance is an implementation detail I wish to hide -- > so you still > need some sort of secondary lookup. > > We do use the changelog backup for fault tolerance -- how would having the > partition > based on the value break this? Is the changelog implicitly partitioned by a > partitioner > other than the one we give to the topology? > > Hopefully that explains my situation a bit more? Thanks! > >> >> -Matthias >> >> >> >> On 6/2/17 10:34 AM, Steven Schlansker wrote: >>> I have a KTable and backing store whose partitioning is value dependent. >>> I want certain groups of messages to be ordered and that grouping is >>> determined >>> by one field (D) of the (possibly large) value. >>> >>> When I lookup by only K, obviously you don't know the partition it should >>> be on. >>> So I will build a GlobalKTable of K -> D. This gives me enough information >>> to determine the partition. >>> >>> Unfortunately, the KafkaStreams metadata API doesn't fit this use case well. >>> It allows you to either get all metadata, or by key -- but if you lookup by >>> key >>> it just substitutes a null value (causing a downstream NPE) >>> >>> I can iterate over all metadata and compute the mapping of K -> K,D -> P >>> and then iterate over all metadata looking for P. It's not difficult but >>> ends >>> up being a bit of somewhat ugly code that feels like I shouldn't have to >>> write it. >>> >>> Am I missing something here? Is there a better way that I've missed? >>> Thanks! >>> >> > signature.asc Description: OpenPGP digital signature
Re: Suppressing intermediate topics feeding (Global)KTable
> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax wrote: > > Hi, > > If you want to populate a GlobalKTable you can only do this by reading a > topic. So the short answer for you head line is: no, you can suppress > the intermediate topic. Bummer! Maybe this is an opt-in optimization to consider later. > > However, I am wondering what the purpose of you secondary index is, and > why you are using a GlobalKTable for it. Maybe you can elaborate a > little bit? Elaborated on this a bit in the other thread, I was trying to keep separate problems separate, but maybe I just made things more confusing! tl;dr is that the user requests values knowing K, but there is actually a "hidden composite key" D that is relevant to the partitioning strategy. The GlobalKTable allows you to look up K -> D, and then find the right local KTable K,D -> V > > I am also wondering about this code snippet: > >>> builder.stream(mainTopic) >>> .mapValues(...) >>> .to(secondaryIndex1) > > Should it not be .map() that transforms (k,v) -> > (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing. > In this case, the "externally visible" K needs additional information about the destination D so that it can be partitioned correctly. So the code looks like: // TODO: sucks that this materializes an intermediate topic msgStream .mapValues(v -> v == null ? null : v.getResolvedDestination().toString()) .to(Serdes.String(), Serdes.String(), DEST_INDEX); builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, DEST_INDEX); > > -Matthias > > > On 6/2/17 12:28 PM, Steven Schlansker wrote: >> Hi everyone, another question for the list :) >> >> I'm creating a cluster of KTable (and GlobalKTable) based off the same >> input stream K,V. >> >> It has a number of secondary indices (think like a RDBMS) >> K1 -> K >> K2 -> K >> etc >> >> These are all based off of trivial mappings from my main stream that also >> feeds the K -> V StateStore. Think one liners like v -> v.getSecondaryKey1() >> >> Currently, for each one it seems that I have to do >> >> builder.stream(mainTopic) >> .mapValues(...) >> .to(secondaryIndex1) >> >> builder.globalTable(secondaryIndex1, secondaryIndexStore1); >> >> Unfortunately the intermediate "secondaryIndex1" topic is relatively >> low value. In a case where my state stores are lost, I already have to >> read through the mainTopic to recover the main state store. While it's doing >> that, I'd much rather it rebuild the GlobalKTable instance from that data >> directly. Then I could skip having this index in Kafka at all, it's entirely >> redundant. The data is already loaded and deserialized for the benefit of >> another Processor. >> >> Any thoughts? Happy Friday, >> Steven >> > signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Finding StreamsMetadata with value-dependent partitioning
> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax wrote: > > I am not sure if I understand the use case correctly. Could you give > some more context? Happily, thanks for thinking about this! > >> backing store whose partitioning is value dependent > > In infer that you are using a custom store and not default RocksDB? If > yes, what do you use? What does "value dependent" mean in this context? We're currently using the base in memory store. We tried to use RocksDB but the tuning to get it running appropriately in a Linux container without tripping the cgroups OOM killer is nontrivial. > Right now, I am wondering, why you not just set a new key to get your > data grouped by the field you are interesting in? Also, if you don't > partitioned your data by key, you might break your streams application > with regard to fault-tolerance -- or does your custom store not rely on > changelog backup for fault-tolerance? > That's an interesting point about making transformed key. But I don't think it simplifies my problem too much. Essentially, I have a list of messages that should get delivered to destinations. Each message has a primary key K and a destination D. We partition over D so that all messages to the same destination are handled by the same worker, to preserve ordering and implement local rate limits etc. I want to preserve the illusion to the client that they can look up a key with only K. So, as an intermediate step, we use the GlobalKTable to look up D. Once we have K,D we can then compute the partition and execute a lookup. Transforming the key to be a composite K,D isn't helpful because the end user still only knows K -- D's relevance is an implementation detail I wish to hide -- so you still need some sort of secondary lookup. We do use the changelog backup for fault tolerance -- how would having the partition based on the value break this? Is the changelog implicitly partitioned by a partitioner other than the one we give to the topology? Hopefully that explains my situation a bit more? Thanks! > > -Matthias > > > > On 6/2/17 10:34 AM, Steven Schlansker wrote: >> I have a KTable and backing store whose partitioning is value dependent. >> I want certain groups of messages to be ordered and that grouping is >> determined >> by one field (D) of the (possibly large) value. >> >> When I lookup by only K, obviously you don't know the partition it should be >> on. >> So I will build a GlobalKTable of K -> D. This gives me enough information >> to determine the partition. >> >> Unfortunately, the KafkaStreams metadata API doesn't fit this use case well. >> It allows you to either get all metadata, or by key -- but if you lookup by >> key >> it just substitutes a null value (causing a downstream NPE) >> >> I can iterate over all metadata and compute the mapping of K -> K,D -> P >> and then iterate over all metadata looking for P. It's not difficult but >> ends >> up being a bit of somewhat ugly code that feels like I shouldn't have to >> write it. >> >> Am I missing something here? Is there a better way that I've missed? >> Thanks! >> > signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Suppressing intermediate topics feeding (Global)KTable
Hi, If you want to populate a GlobalKTable you can only do this by reading a topic. So the short answer for you head line is: no, you can suppress the intermediate topic. However, I am wondering what the purpose of you secondary index is, and why you are using a GlobalKTable for it. Maybe you can elaborate a little bit? I am also wondering about this code snippet: >> builder.stream(mainTopic) >>.mapValues(...) >>.to(secondaryIndex1) Should it not be .map() that transforms (k,v) -> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing. -Matthias On 6/2/17 12:28 PM, Steven Schlansker wrote: > Hi everyone, another question for the list :) > > I'm creating a cluster of KTable (and GlobalKTable) based off the same > input stream K,V. > > It has a number of secondary indices (think like a RDBMS) > K1 -> K > K2 -> K > etc > > These are all based off of trivial mappings from my main stream that also > feeds the K -> V StateStore. Think one liners like v -> v.getSecondaryKey1() > > Currently, for each one it seems that I have to do > > builder.stream(mainTopic) >.mapValues(...) >.to(secondaryIndex1) > > builder.globalTable(secondaryIndex1, secondaryIndexStore1); > > Unfortunately the intermediate "secondaryIndex1" topic is relatively > low value. In a case where my state stores are lost, I already have to > read through the mainTopic to recover the main state store. While it's doing > that, I'd much rather it rebuild the GlobalKTable instance from that data > directly. Then I could skip having this index in Kafka at all, it's entirely > redundant. The data is already loaded and deserialized for the benefit of > another Processor. > > Any thoughts? Happy Friday, > Steven > signature.asc Description: OpenPGP digital signature
Re: Finding StreamsMetadata with value-dependent partitioning
I am not sure if I understand the use case correctly. Could you give some more context? > backing store whose partitioning is value dependent In infer that you are using a custom store and not default RocksDB? If yes, what do you use? What does "value dependent" mean in this context? Right now, I am wondering, why you not just set a new key to get your data grouped by the field you are interesting in? Also, if you don't partitioned your data by key, you might break your streams application with regard to fault-tolerance -- or does your custom store not rely on changelog backup for fault-tolerance? -Matthias On 6/2/17 10:34 AM, Steven Schlansker wrote: > I have a KTable and backing store whose partitioning is value dependent. > I want certain groups of messages to be ordered and that grouping is > determined > by one field (D) of the (possibly large) value. > > When I lookup by only K, obviously you don't know the partition it should be > on. > So I will build a GlobalKTable of K -> D. This gives me enough information > to determine the partition. > > Unfortunately, the KafkaStreams metadata API doesn't fit this use case well. > It allows you to either get all metadata, or by key -- but if you lookup by > key > it just substitutes a null value (causing a downstream NPE) > > I can iterate over all metadata and compute the mapping of K -> K,D -> P > and then iterate over all metadata looking for P. It's not difficult but ends > up being a bit of somewhat ugly code that feels like I shouldn't have to > write it. > > Am I missing something here? Is there a better way that I've missed? Thanks! > signature.asc Description: OpenPGP digital signature
Re: Zookeeper on same server as Kafka
Usually, the overhead comes when you have kafka and zookeeper doing the housekeeping (i.e. Disk IO) on the same server. ZK even suggests that you should keep their logs on totally different physical machine for better performance. Furthermore, if a mechanical failure occurs, you might not want both zookeeper and broker going down together. Can anyone else chime in for some better points? On 2 Jun 2017 7:57 pm, "Meghana Narasimhan" wrote: Hi, What are the pros and cons of setting up Zookeeper on the same server as the Kafka broker ? Earlier offsets were being written to zookeeper which was a major overhead but with offsets being written to Kafka now, what other requirements should be taken into consideration for setting up Zookeeper on the same server as Kafka vs having a separate zookeeper cluster ? Thanks, Meghana
Suppressing intermediate topics feeding (Global)KTable
Hi everyone, another question for the list :) I'm creating a cluster of KTable (and GlobalKTable) based off the same input stream K,V. It has a number of secondary indices (think like a RDBMS) K1 -> K K2 -> K etc These are all based off of trivial mappings from my main stream that also feeds the K -> V StateStore. Think one liners like v -> v.getSecondaryKey1() Currently, for each one it seems that I have to do builder.stream(mainTopic) .mapValues(...) .to(secondaryIndex1) builder.globalTable(secondaryIndex1, secondaryIndexStore1); Unfortunately the intermediate "secondaryIndex1" topic is relatively low value. In a case where my state stores are lost, I already have to read through the mainTopic to recover the main state store. While it's doing that, I'd much rather it rebuild the GlobalKTable instance from that data directly. Then I could skip having this index in Kafka at all, it's entirely redundant. The data is already loaded and deserialized for the benefit of another Processor. Any thoughts? Happy Friday, Steven signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Losing messages in Kafka Streams after upgrading
Hi Frank, yes, retention policy is based on the embedded record timestamps and not on system time. Thus, if you send messages with an old timestamp, they can trigger log/segment rolling. >> I see that the repartition topics have timestamp.type = CreateTime, does >> that mean it uses the timestamp of the >> original message? Yes. That's the default setting on the broker side. For Streams, we maintain a so-called "stream time" that is computed based on the input record timestamps. This "stream time" is used to set the timestamp for records that are written by Stream. (so it's more or less the timestamp of the input records). >> Shouldn't that be LogAppendTime for repartition topics? No. Streams needs to preserve the original timestamp to guaranteed correct semantics for downstream window operations. Thus, it should be CreateTime -- if you switch to LogAppendTime, you might break your application logic and get wrong results. >> Or is there a way to configure that? You can configure this on a per topic basis on the brokers. >> If I hack into my Kafka streams code to force it to use LogAppendTime seems >> to solve my problem, but that appears to >> take a huge toll on the brokers. Throughput plummets, and I don't really >> know why. I am not sure what you mean by this? As it's a topic config, I don't understand how you can force this within you Streams application? IMHO, you have multiple options thoug: - increase the retention time for you re-partitioning topics - you could change the retention policy to number of bytes instead of time for the re-partitioning topics - you can implement a custom timestamp extractor and adjust the timestamps accordingly ("stream time" is based on whatever timestamp extractor return) However, if you have records with old timestamps, I am wondering why they are not truncated in your input topic? Do you not face the same issue there? -Matthias On 6/2/17 9:33 AM, Frank Lyaruu wrote: > Hi Kafka people, > > I'm running an application that pushes database changes into a Kafka topic. > I'm also running a Kafka streams application > that listens to these topics, and groups them using the high level API, and > inserts them to another database. > > All topics are compacted, with the exception of the 'repartition topics', > which are configured to be retained for 36 hours. > > Note that the changes in the original kafka topics can be old (generally > more than 36 hours), as they only change when > the data changes. > > When I start an instance of the Kafka Streams application, I see the > repartition topics being deleted immediately, > sometimes before they are processed, and it looks like the repartition > messages use the same timestamp as the > original message. > > I see that the repartition topics have timestamp.type = CreateTime, does > that mean it uses the timestamp of the > original message? Shouldn't that be LogAppendTime for repartition topics? > Or is there a way to configure that? > > If I hack into my Kafka streams code to force it to use LogAppendTime seems > to solve my problem, but that appears to > take a huge toll on the brokers. Throughput plummets, and I don't really > know why. > > Any ideas? > > Frank > signature.asc Description: OpenPGP digital signature
Zookeeper on same server as Kafka
Hi, What are the pros and cons of setting up Zookeeper on the same server as the Kafka broker ? Earlier offsets were being written to zookeeper which was a major overhead but with offsets being written to Kafka now, what other requirements should be taken into consideration for setting up Zookeeper on the same server as Kafka vs having a separate zookeeper cluster ? Thanks, Meghana
Re: LDAP integration with kafka brokers
I dont know whether ldap can directly be integrated with kafka client and server, but kafka can use kerberos type authentication. While kerberos can integrate with AD for user store and authentication of user. Hope this is helpfull Nixon On Fri, Jun 2, 2017 at 1:58 AM, Arunkumar wrote: > > Hi Group, > > We have a requirement to integrate Kafka cluster with LDAP for > authentication. I googled and I did not find much about it. > Out requirement is when a producer sends a message to a broker it would > send producer credentials like topic to which it wants to write to and > userId with passcode. We would like to authenticate it against out > Enterprise LDAP server for authenticity. If there is no LDAP support > available, we are planning to customize the code. Any insight on this is > highly appreciated. > > Thanks in advance > Arunkumar Pichaimuthu, PMP >
Finding StreamsMetadata with value-dependent partitioning
I have a KTable and backing store whose partitioning is value dependent. I want certain groups of messages to be ordered and that grouping is determined by one field (D) of the (possibly large) value. When I lookup by only K, obviously you don't know the partition it should be on. So I will build a GlobalKTable of K -> D. This gives me enough information to determine the partition. Unfortunately, the KafkaStreams metadata API doesn't fit this use case well. It allows you to either get all metadata, or by key -- but if you lookup by key it just substitutes a null value (causing a downstream NPE) I can iterate over all metadata and compute the mapping of K -> K,D -> P and then iterate over all metadata looking for P. It's not difficult but ends up being a bit of somewhat ugly code that feels like I shouldn't have to write it. Am I missing something here? Is there a better way that I've missed? Thanks! signature.asc Description: Message signed with OpenPGP using GPGMail
Losing messages in Kafka Streams after upgrading
Hi Kafka people, I'm running an application that pushes database changes into a Kafka topic. I'm also running a Kafka streams application that listens to these topics, and groups them using the high level API, and inserts them to another database. All topics are compacted, with the exception of the 'repartition topics', which are configured to be retained for 36 hours. Note that the changes in the original kafka topics can be old (generally more than 36 hours), as they only change when the data changes. When I start an instance of the Kafka Streams application, I see the repartition topics being deleted immediately, sometimes before they are processed, and it looks like the repartition messages use the same timestamp as the original message. I see that the repartition topics have timestamp.type = CreateTime, does that mean it uses the timestamp of the original message? Shouldn't that be LogAppendTime for repartition topics? Or is there a way to configure that? If I hack into my Kafka streams code to force it to use LogAppendTime seems to solve my problem, but that appears to take a huge toll on the brokers. Throughput plummets, and I don't really know why. Any ideas? Frank
Re: Data in kafka topic in Json format
Hi Hans, Thank you for your quick response, appreciate it. In *kafka-connect* docker, I see below settings in *kafka-connect.properties* file in *kafka-connect *directory: key.converter.schemas.enable=false key.converter.schema.registry.url=http://kafka-schema-registry: value.converter.schema.registry.url=http://kafka-schema-registry: key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter.schemas.enable=false rest.advertised.host.name=kafka-connect internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter And the settings in *schema-registry *directory of *kafka-connect *docker are as https://github.com/confluentinc/schema-registry/tree/master/config Should I consider any other settings for *kafka-connect* or *schema-registry* to get the real json object NOT string formatted/stringified json which has extra "\" and is not json any more? Best regards, Mina On Fri, Jun 2, 2017 at 11:18 AM, Hans Jespersen wrote: > > My earlier comment still applies but in Kafka Connect the equivalent of a > serializer/deserializer (serdes) is called a “converter”. > Check which converter you have configured for your source connector and if > it is overriding whatever the default converter is configured for the > connect worker it is running in. > > -hans > > > > > > On Jun 2, 2017, at 8:12 AM, Mina Aslani wrote: > > > > Hi, > > > > I would like to add that I use kafka-connect and schema-registery > version ` > > 3.2.1-6`. > > > > Best regards, > > Mina > > > > On Fri, Jun 2, 2017 at 10:59 AM, Mina Aslani > wrote: > > > >> Hi. > >> > >> Is there any way that I get the data into a Kafka topic in Json format? > >> The source that I ingest the data from have the data in Json format, > >> however when I look that data in the kafka topic, schema and payload > fields > >> are added and data is not in json format. > >> > >> I want to avoid implementing a transformer to get the data from the > topic > >> and publishes Json in another one. > >> > >> Your input is appreciated. > >> > >> Best regards, > >> Mina > >> > >
Re: [DISCUSS]: KIP-161: streams record processing exception handlers
Jan, I agree with you philosophically. I think one practical challenge has to do with data formats. Many people use untyped events, so there is simply no guarantee on the form of the input. E.g. many companies use JSON without any kind of schema so it becomes very hard to assert anything about the input which makes these programs very fragile to the "one accidental message publication that creates an unsolvable problem. For that reason I do wonder if limiting to just serialization actually gets you a useful solution. For JSON it will help with the problem of non-parseable JSON, but sounds like it won't help in the case where the JSON is well-formed but does not have any of the fields you expect and depend on for your processing. I expect the reason for limiting the scope is it is pretty hard to reason about correctness for anything that stops in the middle of processing an operator DAG? -Jay On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak wrote: > IMHO your doing it wrong then. + building to much support into the kafka > eco system is very counterproductive in fostering a happy userbase > > > > On 02.06.2017 13:15, Damian Guy wrote: > >> Jan, you have a choice to Fail fast if you want. This is about giving >> people options and there are times when you don't want to fail fast. >> >> >> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak >> wrote: >> >> Hi >>> >>> 1. >>> That greatly complicates monitoring. Fail Fast gives you that when you >>> monitor only the lag of all your apps >>> you are completely covered. With that sort of new application Monitoring >>> is very much more complicated as >>> you know need to monitor fail % of some special apps aswell. In my >>> opinion that is a huge downside already. >>> >>> 2. >>> using a schema regerstry like Avrostuff it might not even be the record >>> that is broken, it might be just your app >>> unable to fetch a schema it needs now know. Maybe you got partitioned >>> away from that registry. >>> >>> 3. When you get alerted because of to high fail percentage. what are the >>> steps you gonna do? >>> shut it down to buy time. fix the problem. spend way to much time to >>> find a good reprocess offset. >>> Your timewindows are in bad shape anyways, and you pretty much lost. >>> This routine is nonsense. >>> >>> Dead letter queues would be the worst possible addition to the kafka >>> toolkit that I can think of. It just doesn't fit the architecture >>> of having clients falling behind is a valid option. >>> >>> Further. I mentioned already the only bad pill ive seen so far is crc >>> errors. any plans for those? >>> >>> Best Jan >>> >>> >>> >>> >>> >>> >>> On 02.06.2017 11:34, Damian Guy wrote: >>> I agree with what Matthias has said w.r.t failing fast. There are plenty >>> of >>> times when you don't want to fail-fast and must attempt to make >>> progress. >>> The dead-letter queue is exactly for these circumstances. Of course if every record is failing, then you probably do want to give up. On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax >>> wrote: >>> First a meta comment. KIP discussion should take place on the dev list > -- if user list is cc'ed please make sure to reply to both lists. > Thanks. >>> Thanks for making the scope of the KIP clear. Makes a lot of sense to > focus on deserialization exceptions for now. > > With regard to corrupted state stores, would it make sense to fail a > task and wipe out the store to repair it via recreation from the > changelog? That's of course a quite advance pattern, but I want to > bring > it up to design the first step in a way such that we can get there (if > we think it's a reasonable idea). > > I also want to comment about fail fast vs making progress. I think that > fail-fast must not always be the best option. The scenario I have in > mind is like this: you got a bunch of producers that feed the Streams > input topic. Most producers work find, but maybe one producer miss > behaves and the data it writes is corrupted. You might not even be able > to recover this lost data at any point -- thus, there is no reason to > stop processing but you just skip over those records. Of course, you > need to fix the root cause, and thus you need to alert (either via logs > of the exception handler directly) and you need to start to investigate > to find the bad producer, shut it down and fix it. > > Here the dead letter queue comes into place. From my understanding, the > purpose of this feature is solely enable post debugging. I don't think > those record would be fed back at any point in time (so I don't see any > ordering issue -- a skipped record, with this regard, is just "fully > processed"). Thus, the dead letter queue should actually encode the > original records metadata (topic, partition offset etc) to enable such > debugging. I guess, this might also be
Re: Data in kafka topic in Json format
My earlier comment still applies but in Kafka Connect the equivalent of a serializer/deserializer (serdes) is called a “converter”. Check which converter you have configured for your source connector and if it is overriding whatever the default converter is configured for the connect worker it is running in. -hans > On Jun 2, 2017, at 8:12 AM, Mina Aslani wrote: > > Hi, > > I would like to add that I use kafka-connect and schema-registery version ` > 3.2.1-6`. > > Best regards, > Mina > > On Fri, Jun 2, 2017 at 10:59 AM, Mina Aslani wrote: > >> Hi. >> >> Is there any way that I get the data into a Kafka topic in Json format? >> The source that I ingest the data from have the data in Json format, >> however when I look that data in the kafka topic, schema and payload fields >> are added and data is not in json format. >> >> I want to avoid implementing a transformer to get the data from the topic >> and publishes Json in another one. >> >> Your input is appreciated. >> >> Best regards, >> Mina >>
Re: Data in kafka topic in Json format
Check which serializer you have configured in your producer. You are probably using an Avro serializer which will add the schema and modify the payload to avro data. You can use a String serializer or a ByteArray serializer and the data will either be Base64 encoded or not encoded at all. -hans > On Jun 2, 2017, at 7:59 AM, Mina Aslani wrote: > > Hi. > > Is there any way that I get the data into a Kafka topic in Json format? > The source that I ingest the data from have the data in Json format, > however when I look that data in the kafka topic, schema and payload fields > are added and data is not in json format. > > I want to avoid implementing a transformer to get the data from the topic > and publishes Json in another one. > > Your input is appreciated. > > Best regards, > Mina
Re: Data in kafka topic in Json format
Hi, I would like to add that I use kafka-connect and schema-registery version ` 3.2.1-6`. Best regards, Mina On Fri, Jun 2, 2017 at 10:59 AM, Mina Aslani wrote: > Hi. > > Is there any way that I get the data into a Kafka topic in Json format? > The source that I ingest the data from have the data in Json format, > however when I look that data in the kafka topic, schema and payload fields > are added and data is not in json format. > > I want to avoid implementing a transformer to get the data from the topic > and publishes Json in another one. > > Your input is appreciated. > > Best regards, > Mina >
Kafka Over TLS Error - Failed to send SSL Close message - Broken Pipe
Hi All, I have been seeing below error since three days , Can you please help me understand more about this , WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:148) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:45) at org.apache.kafka.common.network.Selector.close(Selector.java:442) at org.apache.kafka.common.network.Selector.poll(Selector.java:310) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:745) Thanks a lot.
Data in kafka topic in Json format
Hi. Is there any way that I get the data into a Kafka topic in Json format? The source that I ingest the data from have the data in Json format, however when I look that data in the kafka topic, schema and payload fields are added and data is not in json format. I want to avoid implementing a transformer to get the data from the topic and publishes Json in another one. Your input is appreciated. Best regards, Mina
Re: Cluster in weird state: no leaders no ISR for all topics, but it works!
So, I fixed the problem doing a rolling restart, and after some checks seems there was no data loss. On 1 June 2017 at 17:57, Del Barrio, Alberto < alberto.delbar...@360dialog.com> wrote: > I might give it a try tomorrow. The reason for having so large init and > sync limit times is because in the past our ZK cluster was storing large > amount of data, and lower values were not enough for the server syncs when > restarting zk processes. > > On 1 June 2017 at 17:52, Mohammed Manna wrote: > >> Cool - I will try and take a look into this - Meanwhile, do you mind >> awfuly >> to change the following and see if things improve? >> >> tickTime = 1000 >> initLimit=3 >> syncLimit=5 >> >> On 1 June 2017 at 16:49, Del Barrio, Alberto < >> alberto.delbar...@360dialog.com> wrote: >> >> > Here are the configs you were asking for: >> > >> > Zookeeper: >> > tickTime=1000 >> > initLimit=2000 >> > syncLimit=1000 >> > dataDir=/var/lib/zookeeper >> > clientPort=2181 >> > server.3=10.0.0.3:2888:3888 >> > server.2=10.0.0.2:2888:3888 >> > server.1=10.0.0.1:2888:3888 >> > >> > >> > Kafka broker (for one of them): >> > broker.id=10 >> > listeners=PLAINTEXT://10.0.0.4:9092 >> > num.network.threads=3 >> > num.io.threads=8 >> > socket.send.buffer.bytes=102400 >> > socket.receive.buffer.bytes=102400 >> > socket.request.max.bytes=104857600 >> > log.dirs=/var/lib/kafka >> > num.partitions=2 >> > num.recovery.threads.per.data.dir=1 >> > zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181/kafka >> > zookeeper.connection.timeout.ms=6000 >> > >> > In general they're pretty much the default ones. >> > I can see in Zookeeper the kafka brokers connected to it and exchanging >> > data... >> > >> > Thanks for your help and time. >> > >> > On 1 June 2017 at 17:32, Mohammed Manna wrote: >> > >> > > Could you please share your broker/zookeeper/topic configs ? >> > > >> > > On 1 June 2017 at 16:18, Del Barrio, Alberto < >> > > alberto.delbar...@360dialog.com> wrote: >> > > >> > > > I tried creating the topic and results are very similar to the >> current >> > > > situation: there are not ISR and no leader for any of the >> partitions, >> > but >> > > > now kafka-topics shows *Leader: none* when for all the other >> topics, it >> > > > shows *Leader: -1* >> > > > >> > > > >> > > > On 1 June 2017 at 17:05, Mohammed Manna wrote: >> > > > >> > > > > I had a similar situation, but only 1 of my ZKs was struggling - >> but >> > > > since >> > > > > the ISR synching time is configurable I was confident to bounce 1 >> ZK >> > > at a >> > > > > time and it worked out. >> > > > > does it happen even when you create a new topic with a >> > > > > replication:partition ration of 1? >> > > > > >> > > > > i meant, 3 replicas, 3 partitions :) >> > > > > >> > > > > On 1 June 2017 at 15:58, Del Barrio, Alberto < >> > > > > alberto.delbar...@360dialog.com> wrote: >> > > > > >> > > > > > Hi Mohammed, >> > > > > > >> > > > > > thanks for your answer. >> > > > > > The ZK cluster is not located in the servers where Kafka runs >> but >> > in >> > > > > other >> > > > > > 3 different machines. This ZK cluster is used by several other >> > > services >> > > > > > which are not reporting problems. >> > > > > > As you suggested, I haven't tried restarting the kafka-server >> > > processes >> > > > > > because there's no leader for topic partitions, so I don't know >> > what >> > > > will >> > > > > > happen. Never been in a similar situation with Kafka after some >> > years >> > > > > using >> > > > > > it. >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > On 1 June 2017 at 16:29, Mohammed Manna >> > wrote: >> > > > > > >> > > > > > > Hi Alberto, >> > > > > > > >> > > > > > > Usually this means that the leader election/replica syncing >> > > couldn't >> > > > be >> > > > > > > successful and the zookeeper logs should be able to show this >> > > > > information >> > > > > > > too. The leader -1 is what worries me. For your case (3 broker >> > > > > cluster), >> > > > > > I >> > > > > > > am assuming you have done the cluster configuration to have 1 >> > > > > > > broker-zookeeper setup ? >> > > > > > > If that's the case, you should be able to bounce 1 zookeeper >> at a >> > > > time >> > > > > > and >> > > > > > > see if that resolves the issue. >> > > > > > > >> > > > > > > That said, have you restarted your servers since this issue >> > > > surfaced? >> > > > > > > >> > > > > > > On 1 June 2017 at 14:11, Del Barrio, Alberto < >> > > > > > > alberto.delbar...@360dialog.com> wrote: >> > > > > > > >> > > > > > > > Hi all, >> > > > > > > > >> > > > > > > > I'm experiencing an issue which I don't know how to solve, >> so >> > I'm >> > > > > > trying >> > > > > > > to >> > > > > > > > find some guidance on the topic. >> > > > > > > > >> > > > > > > > I have a cluster composed by 3 servers, one broker per >> server >> > > > running >> > > > > > > Kafka >> > > > > > > > 0.10.0.1-1 which runs in production with around 100 topics, >> > most >> > > of >> >
Re: [DISCUSS]: KIP-161: streams record processing exception handlers
IMHO your doing it wrong then. + building to much support into the kafka eco system is very counterproductive in fostering a happy userbase On 02.06.2017 13:15, Damian Guy wrote: Jan, you have a choice to Fail fast if you want. This is about giving people options and there are times when you don't want to fail fast. On Fri, 2 Jun 2017 at 11:00 Jan Filipiak wrote: Hi 1. That greatly complicates monitoring. Fail Fast gives you that when you monitor only the lag of all your apps you are completely covered. With that sort of new application Monitoring is very much more complicated as you know need to monitor fail % of some special apps aswell. In my opinion that is a huge downside already. 2. using a schema regerstry like Avrostuff it might not even be the record that is broken, it might be just your app unable to fetch a schema it needs now know. Maybe you got partitioned away from that registry. 3. When you get alerted because of to high fail percentage. what are the steps you gonna do? shut it down to buy time. fix the problem. spend way to much time to find a good reprocess offset. Your timewindows are in bad shape anyways, and you pretty much lost. This routine is nonsense. Dead letter queues would be the worst possible addition to the kafka toolkit that I can think of. It just doesn't fit the architecture of having clients falling behind is a valid option. Further. I mentioned already the only bad pill ive seen so far is crc errors. any plans for those? Best Jan On 02.06.2017 11:34, Damian Guy wrote: I agree with what Matthias has said w.r.t failing fast. There are plenty of times when you don't want to fail-fast and must attempt to make progress. The dead-letter queue is exactly for these circumstances. Of course if every record is failing, then you probably do want to give up. On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax wrote: First a meta comment. KIP discussion should take place on the dev list -- if user list is cc'ed please make sure to reply to both lists. Thanks. Thanks for making the scope of the KIP clear. Makes a lot of sense to focus on deserialization exceptions for now. With regard to corrupted state stores, would it make sense to fail a task and wipe out the store to repair it via recreation from the changelog? That's of course a quite advance pattern, but I want to bring it up to design the first step in a way such that we can get there (if we think it's a reasonable idea). I also want to comment about fail fast vs making progress. I think that fail-fast must not always be the best option. The scenario I have in mind is like this: you got a bunch of producers that feed the Streams input topic. Most producers work find, but maybe one producer miss behaves and the data it writes is corrupted. You might not even be able to recover this lost data at any point -- thus, there is no reason to stop processing but you just skip over those records. Of course, you need to fix the root cause, and thus you need to alert (either via logs of the exception handler directly) and you need to start to investigate to find the bad producer, shut it down and fix it. Here the dead letter queue comes into place. From my understanding, the purpose of this feature is solely enable post debugging. I don't think those record would be fed back at any point in time (so I don't see any ordering issue -- a skipped record, with this regard, is just "fully processed"). Thus, the dead letter queue should actually encode the original records metadata (topic, partition offset etc) to enable such debugging. I guess, this might also be possible if you just log the bad records, but it would be harder to access (you first must find the Streams instance that did write the log and extract the information from there). Reading it from topic is much simpler. I also want to mention the following. Assume you have such a topic with some bad records and some good records. If we always fail-fast, it's going to be super hard to process the good data. You would need to write an extra app that copied the data into a new topic filtering out the bad records (or apply the map() workaround withing stream). So I don't think that failing fast is most likely the best option in production is necessarily, true. Or do you think there are scenarios, for which you can recover the corrupted records successfully? And even if this is possible, it might be a case for reprocessing instead of failing the whole application? Also, if you think you can "repair" a corrupted record, should the handler allow to return a "fixed" record? This would solve the ordering problem. -Matthias On 5/30/17 1:47 AM, Michael Noll wrote: Thanks for your work on this KIP, Eno -- much appreciated! - I think it would help to improve the KIP by adding an end-to-end code example that demonstrates, with the DSL and with the Processor API, how the user would write a simple application that would then be augmented with the proposed KIP
Re: [DISCUSS]: KIP-161: streams record processing exception handlers
Jan, you have a choice to Fail fast if you want. This is about giving people options and there are times when you don't want to fail fast. On Fri, 2 Jun 2017 at 11:00 Jan Filipiak wrote: > Hi > > 1. > That greatly complicates monitoring. Fail Fast gives you that when you > monitor only the lag of all your apps > you are completely covered. With that sort of new application Monitoring > is very much more complicated as > you know need to monitor fail % of some special apps aswell. In my > opinion that is a huge downside already. > > 2. > using a schema regerstry like Avrostuff it might not even be the record > that is broken, it might be just your app > unable to fetch a schema it needs now know. Maybe you got partitioned > away from that registry. > > 3. When you get alerted because of to high fail percentage. what are the > steps you gonna do? > shut it down to buy time. fix the problem. spend way to much time to > find a good reprocess offset. > Your timewindows are in bad shape anyways, and you pretty much lost. > This routine is nonsense. > > Dead letter queues would be the worst possible addition to the kafka > toolkit that I can think of. It just doesn't fit the architecture > of having clients falling behind is a valid option. > > Further. I mentioned already the only bad pill ive seen so far is crc > errors. any plans for those? > > Best Jan > > > > > > > On 02.06.2017 11:34, Damian Guy wrote: > > I agree with what Matthias has said w.r.t failing fast. There are plenty > of > > times when you don't want to fail-fast and must attempt to make > progress. > > The dead-letter queue is exactly for these circumstances. Of course if > > every record is failing, then you probably do want to give up. > > > > On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax > wrote: > > > >> First a meta comment. KIP discussion should take place on the dev list > >> -- if user list is cc'ed please make sure to reply to both lists. > Thanks. > >> > >> Thanks for making the scope of the KIP clear. Makes a lot of sense to > >> focus on deserialization exceptions for now. > >> > >> With regard to corrupted state stores, would it make sense to fail a > >> task and wipe out the store to repair it via recreation from the > >> changelog? That's of course a quite advance pattern, but I want to bring > >> it up to design the first step in a way such that we can get there (if > >> we think it's a reasonable idea). > >> > >> I also want to comment about fail fast vs making progress. I think that > >> fail-fast must not always be the best option. The scenario I have in > >> mind is like this: you got a bunch of producers that feed the Streams > >> input topic. Most producers work find, but maybe one producer miss > >> behaves and the data it writes is corrupted. You might not even be able > >> to recover this lost data at any point -- thus, there is no reason to > >> stop processing but you just skip over those records. Of course, you > >> need to fix the root cause, and thus you need to alert (either via logs > >> of the exception handler directly) and you need to start to investigate > >> to find the bad producer, shut it down and fix it. > >> > >> Here the dead letter queue comes into place. From my understanding, the > >> purpose of this feature is solely enable post debugging. I don't think > >> those record would be fed back at any point in time (so I don't see any > >> ordering issue -- a skipped record, with this regard, is just "fully > >> processed"). Thus, the dead letter queue should actually encode the > >> original records metadata (topic, partition offset etc) to enable such > >> debugging. I guess, this might also be possible if you just log the bad > >> records, but it would be harder to access (you first must find the > >> Streams instance that did write the log and extract the information from > >> there). Reading it from topic is much simpler. > >> > >> I also want to mention the following. Assume you have such a topic with > >> some bad records and some good records. If we always fail-fast, it's > >> going to be super hard to process the good data. You would need to write > >> an extra app that copied the data into a new topic filtering out the bad > >> records (or apply the map() workaround withing stream). So I don't think > >> that failing fast is most likely the best option in production is > >> necessarily, true. > >> > >> Or do you think there are scenarios, for which you can recover the > >> corrupted records successfully? And even if this is possible, it might > >> be a case for reprocessing instead of failing the whole application? > >> Also, if you think you can "repair" a corrupted record, should the > >> handler allow to return a "fixed" record? This would solve the ordering > >> problem. > >> > >> > >> > >> -Matthias > >> > >> > >> > >> > >> On 5/30/17 1:47 AM, Michael Noll wrote: > >>> Thanks for your work on this KIP, Eno -- much appreciated! > >>> > >>> - I think it would help to improve the KIP by
Re: [DISCUSS]: KIP-161: streams record processing exception handlers
Hi 1. That greatly complicates monitoring. Fail Fast gives you that when you monitor only the lag of all your apps you are completely covered. With that sort of new application Monitoring is very much more complicated as you know need to monitor fail % of some special apps aswell. In my opinion that is a huge downside already. 2. using a schema regerstry like Avrostuff it might not even be the record that is broken, it might be just your app unable to fetch a schema it needs now know. Maybe you got partitioned away from that registry. 3. When you get alerted because of to high fail percentage. what are the steps you gonna do? shut it down to buy time. fix the problem. spend way to much time to find a good reprocess offset. Your timewindows are in bad shape anyways, and you pretty much lost. This routine is nonsense. Dead letter queues would be the worst possible addition to the kafka toolkit that I can think of. It just doesn't fit the architecture of having clients falling behind is a valid option. Further. I mentioned already the only bad pill ive seen so far is crc errors. any plans for those? Best Jan On 02.06.2017 11:34, Damian Guy wrote: I agree with what Matthias has said w.r.t failing fast. There are plenty of times when you don't want to fail-fast and must attempt to make progress. The dead-letter queue is exactly for these circumstances. Of course if every record is failing, then you probably do want to give up. On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax wrote: First a meta comment. KIP discussion should take place on the dev list -- if user list is cc'ed please make sure to reply to both lists. Thanks. Thanks for making the scope of the KIP clear. Makes a lot of sense to focus on deserialization exceptions for now. With regard to corrupted state stores, would it make sense to fail a task and wipe out the store to repair it via recreation from the changelog? That's of course a quite advance pattern, but I want to bring it up to design the first step in a way such that we can get there (if we think it's a reasonable idea). I also want to comment about fail fast vs making progress. I think that fail-fast must not always be the best option. The scenario I have in mind is like this: you got a bunch of producers that feed the Streams input topic. Most producers work find, but maybe one producer miss behaves and the data it writes is corrupted. You might not even be able to recover this lost data at any point -- thus, there is no reason to stop processing but you just skip over those records. Of course, you need to fix the root cause, and thus you need to alert (either via logs of the exception handler directly) and you need to start to investigate to find the bad producer, shut it down and fix it. Here the dead letter queue comes into place. From my understanding, the purpose of this feature is solely enable post debugging. I don't think those record would be fed back at any point in time (so I don't see any ordering issue -- a skipped record, with this regard, is just "fully processed"). Thus, the dead letter queue should actually encode the original records metadata (topic, partition offset etc) to enable such debugging. I guess, this might also be possible if you just log the bad records, but it would be harder to access (you first must find the Streams instance that did write the log and extract the information from there). Reading it from topic is much simpler. I also want to mention the following. Assume you have such a topic with some bad records and some good records. If we always fail-fast, it's going to be super hard to process the good data. You would need to write an extra app that copied the data into a new topic filtering out the bad records (or apply the map() workaround withing stream). So I don't think that failing fast is most likely the best option in production is necessarily, true. Or do you think there are scenarios, for which you can recover the corrupted records successfully? And even if this is possible, it might be a case for reprocessing instead of failing the whole application? Also, if you think you can "repair" a corrupted record, should the handler allow to return a "fixed" record? This would solve the ordering problem. -Matthias On 5/30/17 1:47 AM, Michael Noll wrote: Thanks for your work on this KIP, Eno -- much appreciated! - I think it would help to improve the KIP by adding an end-to-end code example that demonstrates, with the DSL and with the Processor API, how the user would write a simple application that would then be augmented with the proposed KIP changes to handle exceptions. It should also become much clearer then that e.g. the KIP would lead to different code paths for the happy case and any failure scenarios. - Do we have sufficient information available to make informed decisions on what to do next? For example, do we know in which part of the topology the record failed? `ConsumerRecord` gives
Re: [DISCUSS]: KIP-161: streams record processing exception handlers
I agree with what Matthias has said w.r.t failing fast. There are plenty of times when you don't want to fail-fast and must attempt to make progress. The dead-letter queue is exactly for these circumstances. Of course if every record is failing, then you probably do want to give up. On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax wrote: > First a meta comment. KIP discussion should take place on the dev list > -- if user list is cc'ed please make sure to reply to both lists. Thanks. > > Thanks for making the scope of the KIP clear. Makes a lot of sense to > focus on deserialization exceptions for now. > > With regard to corrupted state stores, would it make sense to fail a > task and wipe out the store to repair it via recreation from the > changelog? That's of course a quite advance pattern, but I want to bring > it up to design the first step in a way such that we can get there (if > we think it's a reasonable idea). > > I also want to comment about fail fast vs making progress. I think that > fail-fast must not always be the best option. The scenario I have in > mind is like this: you got a bunch of producers that feed the Streams > input topic. Most producers work find, but maybe one producer miss > behaves and the data it writes is corrupted. You might not even be able > to recover this lost data at any point -- thus, there is no reason to > stop processing but you just skip over those records. Of course, you > need to fix the root cause, and thus you need to alert (either via logs > of the exception handler directly) and you need to start to investigate > to find the bad producer, shut it down and fix it. > > Here the dead letter queue comes into place. From my understanding, the > purpose of this feature is solely enable post debugging. I don't think > those record would be fed back at any point in time (so I don't see any > ordering issue -- a skipped record, with this regard, is just "fully > processed"). Thus, the dead letter queue should actually encode the > original records metadata (topic, partition offset etc) to enable such > debugging. I guess, this might also be possible if you just log the bad > records, but it would be harder to access (you first must find the > Streams instance that did write the log and extract the information from > there). Reading it from topic is much simpler. > > I also want to mention the following. Assume you have such a topic with > some bad records and some good records. If we always fail-fast, it's > going to be super hard to process the good data. You would need to write > an extra app that copied the data into a new topic filtering out the bad > records (or apply the map() workaround withing stream). So I don't think > that failing fast is most likely the best option in production is > necessarily, true. > > Or do you think there are scenarios, for which you can recover the > corrupted records successfully? And even if this is possible, it might > be a case for reprocessing instead of failing the whole application? > Also, if you think you can "repair" a corrupted record, should the > handler allow to return a "fixed" record? This would solve the ordering > problem. > > > > -Matthias > > > > > On 5/30/17 1:47 AM, Michael Noll wrote: > > Thanks for your work on this KIP, Eno -- much appreciated! > > > > - I think it would help to improve the KIP by adding an end-to-end code > > example that demonstrates, with the DSL and with the Processor API, how > the > > user would write a simple application that would then be augmented with > the > > proposed KIP changes to handle exceptions. It should also become much > > clearer then that e.g. the KIP would lead to different code paths for the > > happy case and any failure scenarios. > > > > - Do we have sufficient information available to make informed decisions > on > > what to do next? For example, do we know in which part of the topology > the > > record failed? `ConsumerRecord` gives us access to topic, partition, > > offset, timestamp, etc., but what about topology-related information > (e.g. > > what is the associated state store, if any)? > > > > - Only partly on-topic for the scope of this KIP, but this is about the > > bigger picture: This KIP would give users the option to send corrupted > > records to dead letter queue (quarantine topic). But, what pattern would > > we advocate to process such a dead letter queue then, e.g. how to allow > for > > retries with backoff ("If the first record in the dead letter queue fails > > again, then try the second record for the time being and go back to the > > first record at a later time"). Jay and Jan already alluded to ordering > > problems that will be caused by dead letter queues. As I said, retries > > might be out of scope but perhaps the implications should be considered > if > > possible? > > > > Also, I wrote the text below before reaching the point in the > conversation > > that this KIP's scope will be limited to exceptions in the category of > > poiso