[jira] [Created] (KAFKA-13485) Restart connectors after RetriableException raised from Task::start()
Gunnar Morling created KAFKA-13485: -- Summary: Restart connectors after RetriableException raised from Task::start() Key: KAFKA-13485 URL: https://issues.apache.org/jira/browse/KAFKA-13485 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Gunnar Morling If a {{RetriableException}} is raised from {{Task::start()}}, this doesn't trigger an attempt to start that connector again. I.e. the restart functionality currently is only implemented for exceptions raised from {{poll()}}/{{put()}}. Triggering restarts also upon failures during {{start()}} would be desirable, so to circumvent temporary failure conditions like a network hickup which currrently require a manual restart of the affected tasks, if a connector for instance establishes a database connection during {{start()}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13478) KIP-802: Validation Support for Kafka Connect SMT Options
[ https://issues.apache.org/jira/browse/KAFKA-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-13478: --- Description: Implement [KIP-802|https://cwiki.apache.org/confluence/display/KAFKA/KIP-802%3A+Validation+Support+for+Kafka+Connect+SMT+Options], adding validation support for SMT options. (was: Implement [KIP-802|KIP-802: Validation Support for Kafka Connect SMT Options], adding validation support for SMT options.) > KIP-802: Validation Support for Kafka Connect SMT Options > - > > Key: KAFKA-13478 > URL: https://issues.apache.org/jira/browse/KAFKA-13478 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > Implement > [KIP-802|https://cwiki.apache.org/confluence/display/KAFKA/KIP-802%3A+Validation+Support+for+Kafka+Connect+SMT+Options], > adding validation support for SMT options. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13478) KIP-802: Validation Support for Kafka Connect SMT Options
Gunnar Morling created KAFKA-13478: -- Summary: KIP-802: Validation Support for Kafka Connect SMT Options Key: KAFKA-13478 URL: https://issues.apache.org/jira/browse/KAFKA-13478 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Gunnar Morling Implement [KIP-802|KIP-802: Validation Support for Kafka Connect SMT Options], adding validation support for SMT options. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss
[ https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348804#comment-17348804 ] Gunnar Morling commented on KAFKA-12801: Ok, so I've added the two JFR recordings to my example repository (https://github.com/gunnarmorling/debezium-examples/commit/e6ce9f2db398ee042e9cc0e611310eeddf3c9427), which also contains the set-up for reproducing this. > High CPU load after restarting brokers subsequent to quorum loss > > > Key: KAFKA-12801 > URL: https://issues.apache.org/jira/browse/KAFKA-12801 > Project: Kafka > Issue Type: Bug > Components: core, KafkaConnect >Affects Versions: 2.8.0 >Reporter: Gunnar Morling >Priority: Major > > I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of > three Kafka nodes (all combined nodes), and one Kafka Connect node. After > starting all components, I first stop the current controller of the Kafka > cluster, then I stop the then controller of the Kafka cluster. At this point, > only one Kafka node out of the original three and Connect is running. > When now restarting the two stopped Kafka nodes, CPU load on the Connect node > and the two broker nodes goes up to 100% and remains at that level for an > indefinite amount of time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss
[ https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348802#comment-17348802 ] Gunnar Morling commented on KAFKA-12801: Hum, so even with zipping (which brings the size down to 1.6 MB), I'm receiving this error upon upload: {quote} File "jfr.zip" was not uploaded An internal error has occurred. Please contact your administrator {quote} > High CPU load after restarting brokers subsequent to quorum loss > > > Key: KAFKA-12801 > URL: https://issues.apache.org/jira/browse/KAFKA-12801 > Project: Kafka > Issue Type: Bug > Components: core, KafkaConnect >Affects Versions: 2.8.0 >Reporter: Gunnar Morling >Priority: Major > > I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of > three Kafka nodes (all combined nodes), and one Kafka Connect node. After > starting all components, I first stop the current controller of the Kafka > cluster, then I stop the then controller of the Kafka cluster. At this point, > only one Kafka node out of the original three and Connect is running. > When now restarting the two stopped Kafka nodes, CPU load on the Connect node > and the two broker nodes goes up to 100% and remains at that level for an > indefinite amount of time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss
[ https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348204#comment-17348204 ] Gunnar Morling commented on KAFKA-12801: Hey [~lucasbradstreet], ok, so I got two JFR recordings (one from one of the Kafka nodes with high CPU load, one from Connect), but attaching to this issue fails (something about the file type perhaps). Do you have any other way of receiving those files? They have a size of 5.2 MB together. Alternatively, to reproduce this by yourself, you also could follow the steps I've described here: https://www.morling.dev/blog/exploring-zookeeper-less-kafka/. > High CPU load after restarting brokers subsequent to quorum loss > > > Key: KAFKA-12801 > URL: https://issues.apache.org/jira/browse/KAFKA-12801 > Project: Kafka > Issue Type: Bug > Components: core, KafkaConnect >Affects Versions: 2.8.0 >Reporter: Gunnar Morling >Priority: Major > > I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of > three Kafka nodes (all combined nodes), and one Kafka Connect node. After > starting all components, I first stop the current controller of the Kafka > cluster, then I stop the then controller of the Kafka cluster. At this point, > only one Kafka node out of the original three and Connect is running. > When now restarting the two stopped Kafka nodes, CPU load on the Connect node > and the two broker nodes goes up to 100% and remains at that level for an > indefinite amount of time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12806) KRaft: Confusing leadership status exposed in metrics for controller without quorum
[ https://issues.apache.org/jira/browse/KAFKA-12806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-12806: --- Summary: KRaft: Confusing leadership status exposed in metrics for controller without quorum (was: KRaft: Confusing leadership status exposed for controller without quorum) > KRaft: Confusing leadership status exposed in metrics for controller without > quorum > --- > > Key: KAFKA-12806 > URL: https://issues.apache.org/jira/browse/KAFKA-12806 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Gunnar Morling >Priority: Major > > I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of > three Kafka nodes (all combined nodes). After starting all components, I > first stop the current controller of the Kafka cluster, then I stop the then > controller of the Kafka cluster. At this point, only one Kafka node out of > the original three and Connect is running. In the new KRaft-based metrics, > "leader" is exposed as the role for that node, and its id is shown as the > current leader. Also in the metadata shell, that node is shown as the quorum > leader via /metadataQuorum/leader. This is pretty confusing, as one out of > three nodes cannot have the quorum. I believe this is mostly an issue of > displaying the status, as for instance creating a topic in this state times > out. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12806) KRaft: Confusing leadership status exposed for controller without quorum
[ https://issues.apache.org/jira/browse/KAFKA-12806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-12806: --- Description: I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of three Kafka nodes (all combined nodes). After starting all components, I first stop the current controller of the Kafka cluster, then I stop the then controller of the Kafka cluster. At this point, only one Kafka node out of the original three and Connect is running. In the new KRaft-based metrics, "leader" is exposed as the role for that node, and its id is shown as the current leader. Also in the metadata shell, that node is shown as the quorum leader via /metadataQuorum/leader. This is pretty confusing, as one out of three nodes cannot have the quorum. I believe this is mostly an issue of displaying the status, as for instance creating a topic in this state times out. > KRaft: Confusing leadership status exposed for controller without quorum > > > Key: KAFKA-12806 > URL: https://issues.apache.org/jira/browse/KAFKA-12806 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Gunnar Morling >Priority: Major > > I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of > three Kafka nodes (all combined nodes). After starting all components, I > first stop the current controller of the Kafka cluster, then I stop the then > controller of the Kafka cluster. At this point, only one Kafka node out of > the original three and Connect is running. In the new KRaft-based metrics, > "leader" is exposed as the role for that node, and its id is shown as the > current leader. Also in the metadata shell, that node is shown as the quorum > leader via /metadataQuorum/leader. This is pretty confusing, as one out of > three nodes cannot have the quorum. I believe this is mostly an issue of > displaying the status, as for instance creating a topic in this state times > out. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12806) KRaft: Confusing leadership status exposed for controller without quorum
Gunnar Morling created KAFKA-12806: -- Summary: KRaft: Confusing leadership status exposed for controller without quorum Key: KAFKA-12806 URL: https://issues.apache.org/jira/browse/KAFKA-12806 Project: Kafka Issue Type: Bug Affects Versions: 2.8.0 Reporter: Gunnar Morling -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss
[ https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346835#comment-17346835 ] Gunnar Morling commented on KAFKA-12801: Would a JFR profiling work, too? I could provide that one a bit more easily, as I'm more familiar with this tool. > High CPU load after restarting brokers subsequent to quorum loss > > > Key: KAFKA-12801 > URL: https://issues.apache.org/jira/browse/KAFKA-12801 > Project: Kafka > Issue Type: Bug > Components: core, KafkaConnect >Affects Versions: 2.8.0 >Reporter: Gunnar Morling >Priority: Major > > I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of > three Kafka nodes (all combined nodes), and one Kafka Connect node. After > starting all components, I first stop the current controller of the Kafka > cluster, then I stop the then controller of the Kafka cluster. At this point, > only one Kafka node out of the original three and Connect is running. > When now restarting the two stopped Kafka nodes, CPU load on the Connect node > and the two broker nodes goes up to 100% and remains at that level for an > indefinite amount of time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss
Gunnar Morling created KAFKA-12801: -- Summary: High CPU load after restarting brokers subsequent to quorum loss Key: KAFKA-12801 URL: https://issues.apache.org/jira/browse/KAFKA-12801 Project: Kafka Issue Type: Bug Components: core, KafkaConnect Affects Versions: 2.8.0 Reporter: Gunnar Morling I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of three Kafka nodes (all combined nodes), and one Kafka Connect node. After starting all components, I first stop the current controller of the Kafka cluster, then I stop the then controller of the Kafka cluster. At this point, only one Kafka node out of the original three and Connect is running. When now restarting the two stopped Kafka nodes, CPU load on the Connect node and the two broker nodes goes up to 100% and remains at that level for an indefinite amount of time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17032552#comment-17032552 ] Gunnar Morling commented on KAFKA-7052: --- Thanks for commenting, [~rhauch]! My intention was to actually keep the current behavior by means of the default setting of the new option: * "fail" for records with a schema (it's raising an {{IllegalArgumentException}} but could be NPE of course if the exact exception type is a concern; I think it shouldn't, as the original NPE really is a weakness of the existing implementation that shouldn't be relied upon * "return-null" for records without schema I.e. without any explicit setting, the behavior will be exactly be the same as today (ignoring the changed exception type). That's why I didn't assume that'd need a KIP, but as per what you're saying, any new option mandates a KIP? > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031745#comment-17031745 ] Gunnar Morling commented on KAFKA-7052: --- Yes, that'd certainly work for me and I'd be happy to send a PR. Perhaps \{{behavior.on.non.existant.field}} = \{{(fail|drop|passon)}}? > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031720#comment-17031720 ] Gunnar Morling commented on KAFKA-7052: --- Hey [~rmoff], what I meant to say: raising _any_ exception is rather disruptive here. We surely could improve the exception message, but I don't think it would help you really, as the connector still would be in FAILED state when encountering a schema change event. I think your case would be best addressed if the SMT just left the key/value as-is, in case the extract field operation cannot be applied. Taking a step back, it could also indicate a more general short-coming of Connect: SMTs applied to a source connector such as Debezium are applied to _all_ topics, whereas it actually may be desirable to apply them only to a _subset_ of topics produced by the connector (in case of Debezium, actual change data topics, while schema change and heartbeat topics shouldn't be targetted). Perhaps worth a KIP? [~tombentley], WDYT? > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7273) Converters should have access to headers.
[ https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948868#comment-16948868 ] Gunnar Morling commented on KAFKA-7273: --- {quote} Headers is modifiable, it has add/remove methods {quote} [~sap1ens], yes, but e.g. the passed object could be a copy so changes to it are ignored, or an implementation is passed whose mutators raise an exeception etc. I've filed https://github.com/apache/kafka/pull/7489 for making this explicit in the JavaDoc of the method. > Converters should have access to headers. > - > > Key: KAFKA-7273 > URL: https://issues.apache.org/jira/browse/KAFKA-7273 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jeremy Custenborder >Assignee: Jeremy Custenborder >Priority: Major > Fix For: 2.4.0 > > > I found myself wanting to build a converter that stored additional type > information within headers. The converter interface does not allow a > developer to access to the headers in a Converter. I'm not suggesting that we > change the method for serializing them, rather that > *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* > and *toConnectData*. For example something like this. > {code:java} > import org.apache.kafka.connect.data.Schema; > import org.apache.kafka.connect.data.SchemaAndValue; > import org.apache.kafka.connect.header.Headers; > import org.apache.kafka.connect.storage.Converter; > public interface Converter { > default byte[] fromConnectData(String topic, Headers headers, Schema > schema, Object object) { > return fromConnectData(topic, schema, object); > } > default SchemaAndValue toConnectData(String topic, Headers headers, byte[] > payload) { > return toConnectData(topic, payload); > } > void configure(Map var1, boolean var2); > byte[] fromConnectData(String var1, Schema var2, Object var3); > SchemaAndValue toConnectData(String var1, byte[] var2); > } > {code} > This would be a similar approach to what was already done with > ExtendedDeserializer and ExtendedSerializer in the Kafka client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7273) Converters should have access to headers.
[ https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948354#comment-16948354 ] Gunnar Morling edited comment on KAFKA-7273 at 10/10/19 9:46 AM: - Hey [~rhauch], [~jcustenborder], just came across this change, which I'd have suggested myself otherwise :-) Very nice for a specific use case I have in mind. One question for clarification, though: is the {{Headers}} object passed to {{fromConnectData()}} modifiable, i.e. can a converter add/override/remove existing headers before the message is sent to Kafka? This is what I'd like to do, and it's not quite clear to me whether that's allowed or not. Modifying the input parameter as a side-effect might be a bit surprising, given the method returns something. If it is supported, I'll be happy to send a PR with a clarifying sentence in the method JavaDoc. Thanks! was (Author: gunnar.morling): Hey [~rhauch], [~jcustenborder], just came across this change, which I'd have suggested myself otherwise :-) Very nice for a specific use case I have in mind. One question for clarification, though: is the {{Headers}} object passed to {{fromConnectData()}} modifiable, i.e. can a converter add/override existing headers for before the message is sent to Kafka? This is what I'd like to do, and it's not quite clear to me whether that's allowed or not. Modifying the input parameter as a side-effect might be a bit surprising given the method returns something. If it is supported, I'll be happy to send a PR with a clarifying sentence in the method JavaDoc. Thanks! > Converters should have access to headers. > - > > Key: KAFKA-7273 > URL: https://issues.apache.org/jira/browse/KAFKA-7273 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jeremy Custenborder >Assignee: Jeremy Custenborder >Priority: Major > Fix For: 2.4.0 > > > I found myself wanting to build a converter that stored additional type > information within headers. The converter interface does not allow a > developer to access to the headers in a Converter. I'm not suggesting that we > change the method for serializing them, rather that > *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* > and *toConnectData*. For example something like this. > {code:java} > import org.apache.kafka.connect.data.Schema; > import org.apache.kafka.connect.data.SchemaAndValue; > import org.apache.kafka.connect.header.Headers; > import org.apache.kafka.connect.storage.Converter; > public interface Converter { > default byte[] fromConnectData(String topic, Headers headers, Schema > schema, Object object) { > return fromConnectData(topic, schema, object); > } > default SchemaAndValue toConnectData(String topic, Headers headers, byte[] > payload) { > return toConnectData(topic, payload); > } > void configure(Map var1, boolean var2); > byte[] fromConnectData(String var1, Schema var2, Object var3); > SchemaAndValue toConnectData(String var1, byte[] var2); > } > {code} > This would be a similar approach to what was already done with > ExtendedDeserializer and ExtendedSerializer in the Kafka client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7273) Converters should have access to headers.
[ https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948354#comment-16948354 ] Gunnar Morling commented on KAFKA-7273: --- Hey [~rhauch], [~jcustenborder], just came across this change, which I'd have suggested myself otherwise :-) Very nice for a specific use case I have in mind. One question for clarification, though: is the {{Headers}} object passed to {{fromConnectData()}} modifiable, i.e. can a converter add/override existing headers for before the message is sent to Kafka? This is what I'd like to do, and it's not quite clear to me whether that's allowed or not. Modifying the input parameter as a side-effect might be a bit surprising given the method returns something. If it is supported, I'll be happy to send a PR with a clarifying sentence in the method JavaDoc. Thanks! > Converters should have access to headers. > - > > Key: KAFKA-7273 > URL: https://issues.apache.org/jira/browse/KAFKA-7273 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jeremy Custenborder >Assignee: Jeremy Custenborder >Priority: Major > Fix For: 2.4.0 > > > I found myself wanting to build a converter that stored additional type > information within headers. The converter interface does not allow a > developer to access to the headers in a Converter. I'm not suggesting that we > change the method for serializing them, rather that > *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* > and *toConnectData*. For example something like this. > {code:java} > import org.apache.kafka.connect.data.Schema; > import org.apache.kafka.connect.data.SchemaAndValue; > import org.apache.kafka.connect.header.Headers; > import org.apache.kafka.connect.storage.Converter; > public interface Converter { > default byte[] fromConnectData(String topic, Headers headers, Schema > schema, Object object) { > return fromConnectData(topic, schema, object); > } > default SchemaAndValue toConnectData(String topic, Headers headers, byte[] > payload) { > return toConnectData(topic, payload); > } > void configure(Map var1, boolean var2); > byte[] fromConnectData(String var1, Schema var2, Object var3); > SchemaAndValue toConnectData(String var1, byte[] var2); > } > {code} > This would be a similar approach to what was already done with > ExtendedDeserializer and ExtendedSerializer in the Kafka client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939744#comment-16939744 ] Gunnar Morling commented on KAFKA-7052: --- Looking into this, and it's an interesting case. The problem arises when the SMT gets one of the [schema change events|https://debezium.io/documentation/reference/0.9/connectors/mysql.html#schema-change-topic] from the Debezium MySQL connector which don't have the primary key structure as for instance your customers table. So I think actually it's not your intention to apply the SMT to these messages to begin with. Question is how to deal with this case; I could see these options when encountering a message which doesn't have the given field: * raise a more meaningful exception than NPE (pretty disruptive) * return null * leave the key/value unchanged I think for your use case, the last option is the most useful one. But returning null might also make sense in others. This might require an option perhaps? > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-8523: -- Description: When applying the {{InsertField}} transformation to a tombstone event, an exception is raised: {code} org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [field insertion], found: null at org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) at org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) at org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) at org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) {code} ~~AFAICS, the transform can still be made working in in this case by simply building up a new value map from scratch.~~ Update as per the discussion in the comments: tombstones should be left as-is by this SMT, as any insertion would defeat their purpose of enabling log compaction. was: When applying the {{InsertField}} transformation to a tombstone event, an exception is raised: {code} org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [field insertion], found: null at org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) at org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) at org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) at org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) {code} AFAICS, the transform can still be made working in in this case by simply building up a new value map from scratch. > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > Attachments: image-2019-09-17-15-53-44-038.png > > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > ~~AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch.~~ > Update as per the discussion in the comments: tombstones should be left as-is > by this SMT, as any insertion would defeat their purpose of enabling log > compaction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-8523: -- Description: When applying the {{InsertField}} transformation to a tombstone event, an exception is raised: {code} org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [field insertion], found: null at org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) at org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) at org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) at org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) {code} -AFAICS, the transform can still be made working in in this case by simply building up a new value map from scratch.- Update as per the discussion in the comments: tombstones should be left as-is by this SMT, as any insertion would defeat their purpose of enabling log compaction. was: When applying the {{InsertField}} transformation to a tombstone event, an exception is raised: {code} org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [field insertion], found: null at org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) at org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) at org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) at org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) {code} ~~AFAICS, the transform can still be made working in in this case by simply building up a new value map from scratch.~~ Update as per the discussion in the comments: tombstones should be left as-is by this SMT, as any insertion would defeat their purpose of enabling log compaction. > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > Attachments: image-2019-09-17-15-53-44-038.png > > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > -AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch.- > Update as per the discussion in the comments: tombstones should be left as-is > by this SMT, as any insertion would defeat their purpose of enabling log > compaction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939730#comment-16939730 ] Gunnar Morling commented on KAFKA-8523: --- Hey [~rhauch], [~frederic.tardif], yes, we're all on the same page: tombstones shouldn't be modified at all by this SMT. I've updated and force-pushed the PR accordingly. It's good to go from my PoV now. > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > Attachments: image-2019-09-17-15-53-44-038.png > > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931671#comment-16931671 ] Gunnar Morling commented on KAFKA-7052: --- I'm wondering: is it solely a question of having a more meaningful exception, or should rather be null returned in this case. Seems like one might want either, depending on the use case. > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931662#comment-16931662 ] Gunnar Morling edited comment on KAFKA-8523 at 9/17/19 5:08 PM: Yes, I was arriving at pretty much the same conclusion; that said: is an option even needed: I've come to think that a tombstone should remain just that, a tombstone, i.e. I don't see a good reason to inject any value into a tombstone really; after all, it has specific semantics -- enabling compaction -- which shouldn't be altered. So I would suggest I simply update my PR so that it passes on tombstones unmodified and that should be good enought. WDYT? Note, if we wanted to have an option, I think it would allow to pass on tombstones unmodified (as suggested above) OR to insert the new field, i.e. there wouldn't be an empty map really returned, but a map with a single entry for that new field (so what the current PR is doing). This could be done, but as argued it'd change the message's nature of being a tombstone, so it's probably not desirable? was (Author: gunnar.morling): Yes, I was arriving at pretty much the same conclusion; that said: is an option even needed: I've come to think that a tombstone should remain just that, a tombstone, i.e. I don't see a good reason to inject any value into a tombstone really; after all, it has specific semantics -- enabling compaction -- which shouldn't be altered. So I would suggest I simply update my PR so that it passes on tombstones unmodified and that should be good enought. WDYT? > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931662#comment-16931662 ] Gunnar Morling commented on KAFKA-8523: --- Yes, I was arriving at pretty much the same conclusion; that said: is an option even needed: I've come to think that a tombstone should remain just that, a tombstone, i.e. I don't see a good reason to inject any value into a tombstone really; after all, it has specific semantics -- enabling compaction -- which shouldn't be altered. So I would suggest I simply update my PR so that it passes on tombstones unmodified and that should be good enought. WDYT? > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16868422#comment-16868422 ] Gunnar Morling commented on KAFKA-8523: --- Good question, [~rhauch]; I didn't consider it, but I could see both approaches making sense. Should we make this an option (insert into tombstones vs. pass them on unmodified) for that SMT? > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860794#comment-16860794 ] Gunnar Morling commented on KAFKA-8523: --- Filed https://github.com/apache/kafka/pull/6914. > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860688#comment-16860688 ] Gunnar Morling commented on KAFKA-8523: --- I'll provide a PR shortly. > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
Gunnar Morling created KAFKA-8523: - Summary: InsertField transformation fails when encountering tombstone event Key: KAFKA-8523 URL: https://issues.apache.org/jira/browse/KAFKA-8523 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Gunnar Morling When applying the {{InsertField}} transformation to a tombstone event, an exception is raised: {code} org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [field insertion], found: null at org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) at org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) at org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) at org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) {code} AFAICS, the transform can still be made working in in this case by simply building up a new value map from scratch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8476) Kafka 2.2.1 distribution contains JAX-RS API twice
[ https://issues.apache.org/jira/browse/KAFKA-8476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-8476: -- Description: In kafka_2.12-2.2.1.tgz there is both javax.ws.rs-api-2.1.jar and javax.ws.rs-api-2.1.1.jar. I reckon only one should be there. There are also two versions of the inject API (javax.inject-1.jar and javax.inject-2.5.0-b42.jar) which might indicate an issue with dependency convergence. (was: In kafka_2.12-2.2.1.tgz there is both javax.ws.rs-api-2.1.jar and javax.ws.rs-api-2.1.1.jar. I reckon only one should be there.) > Kafka 2.2.1 distribution contains JAX-RS API twice > -- > > Key: KAFKA-8476 > URL: https://issues.apache.org/jira/browse/KAFKA-8476 > Project: Kafka > Issue Type: Bug >Reporter: Gunnar Morling >Priority: Minor > > In kafka_2.12-2.2.1.tgz there is both javax.ws.rs-api-2.1.jar and > javax.ws.rs-api-2.1.1.jar. I reckon only one should be there. There are also > two versions of the inject API (javax.inject-1.jar and > javax.inject-2.5.0-b42.jar) which might indicate an issue with dependency > convergence. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8476) Kafka 2.2.1 distribution contains JAX-RS API twice
[ https://issues.apache.org/jira/browse/KAFKA-8476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-8476: -- Priority: Minor (was: Major) > Kafka 2.2.1 distribution contains JAX-RS API twice > -- > > Key: KAFKA-8476 > URL: https://issues.apache.org/jira/browse/KAFKA-8476 > Project: Kafka > Issue Type: Bug >Reporter: Gunnar Morling >Priority: Minor > > In kafka_2.12-2.2.1.tgz there is both javax.ws.rs-api-2.1.jar and > javax.ws.rs-api-2.1.1.jar. I reckon only one should be there. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8476) Kafka 2.2.1 distribution contains JAX-RS API twice
Gunnar Morling created KAFKA-8476: - Summary: Kafka 2.2.1 distribution contains JAX-RS API twice Key: KAFKA-8476 URL: https://issues.apache.org/jira/browse/KAFKA-8476 Project: Kafka Issue Type: Bug Reporter: Gunnar Morling In kafka_2.12-2.2.1.tgz there is both javax.ws.rs-api-2.1.jar and javax.ws.rs-api-2.1.1.jar. I reckon only one should be there. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value
[ https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719137#comment-16719137 ] Gunnar Morling commented on KAFKA-3832: --- What would be the right fix to this issue then, could it simply be changed to return a null value in this case? That should be simple enough, but I'm wondering whether there'll be a problem if some records have a schema (non-tombstones) and some others don't (tombstones). > Kafka Connect's JSON Converter never outputs a null value > - > > Key: KAFKA-3832 > URL: https://issues.apache.org/jira/browse/KAFKA-3832 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Assignee: Prasanna Subburaj >Priority: Major > Labels: newbie > > Kafka Connect's JSON Converter will never output a null value when > {{enableSchemas=true}}. This means that when a connector outputs a > {{SourceRecord}} with a null value, the JSON Converter will always produce a > message value with: > {code:javascript} > { > "schema": null, > "payload": null > } > {code} > And, this means that while Kafka log compaction will always be able to remove > earlier messages with the same key, log compaction will never remove _all_ of > the messages with the same key. > The JSON Connector's {{fromConnectData(...)}} should always return null when > it is supplied a null value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7336) Kafka Connect source task hangs when producing record with invalid topic name
[ https://issues.apache.org/jira/browse/KAFKA-7336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-7336: -- Summary: Kafka Connect source task hangs when producing record with invalid topic name (was: Kafka Connect source task when producing record with invalid topic name) > Kafka Connect source task hangs when producing record with invalid topic name > - > > Key: KAFKA-7336 > URL: https://issues.apache.org/jira/browse/KAFKA-7336 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Gunnar Morling >Priority: Major > > If a Kafka Connect source task returns a {{SourceRecord}} with an invalid > topic name (e.g. "dbserver1.inventory.test@data"), that source task hangs > (presumably indefinitely?) and doesn't continue it's polling loop. The log is > flooded with this message: > {code} > connect_1| 2018-08-24 08:47:29,014 WARN || [Producer > clientId=producer-4] Error while fetching metadata with correlation id 833 : > {dbserver1.inventory.test@data=INVALID_TOPIC_EXCEPTION} > [org.apache.kafka.clients.NetworkClient] > {code} > The producer thread is stuck in the loop here: > {code} > KafkaProducer.waitOnMetadata(String, Integer, long) line: 938 > KafkaProducer.doSend(ProducerRecord, Callback) line: 823 > KafkaProducer.send(ProducerRecord, Callback) line: 803 > WorkerSourceTask.sendRecords() line: 318 > WorkerSourceTask.execute() line: 228 > WorkerSourceTask(WorkerTask).doRun() line: 175 > WorkerSourceTask(WorkerTask).run() line: 219 > Executors$RunnableAdapter.call() line: 511 > FutureTask.run() line: 266 > ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149 > ThreadPoolExecutor$Worker.run() line: 624 > Thread.run() line: 748 > {code} > This causes the task to remain in RUNNING state, but no further invocations > of {{poll()}} are done. > Of course we'll work around this and make sure to not produce records with > invalid topic names, but I think the source task should transition to FAILED > state in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7336) Kafka Connect source task when producing record with invalid topic name
Gunnar Morling created KAFKA-7336: - Summary: Kafka Connect source task when producing record with invalid topic name Key: KAFKA-7336 URL: https://issues.apache.org/jira/browse/KAFKA-7336 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.0.0 Reporter: Gunnar Morling If a Kafka Connect source task returns a {{SourceRecord}} with an invalid topic name (e.g. "dbserver1.inventory.test@data"), that source task hangs (presumably indefinitely?) and doesn't continue it's polling loop. The log is flooded with this message: {code} connect_1| 2018-08-24 08:47:29,014 WARN || [Producer clientId=producer-4] Error while fetching metadata with correlation id 833 : {dbserver1.inventory.test@data=INVALID_TOPIC_EXCEPTION} [org.apache.kafka.clients.NetworkClient] {code} The producer thread is stuck in the loop here: {code} KafkaProducer.waitOnMetadata(String, Integer, long) line: 938 KafkaProducer.doSend(ProducerRecord, Callback) line: 823 KafkaProducer.send(ProducerRecord, Callback) line: 803 WorkerSourceTask.sendRecords() line: 318 WorkerSourceTask.execute() line: 228 WorkerSourceTask(WorkerTask).doRun() line: 175 WorkerSourceTask(WorkerTask).run() line: 219 Executors$RunnableAdapter.call() line: 511 FutureTask.run() line: 266 ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149 ThreadPoolExecutor$Worker.run() line: 624 Thread.run() line: 748 {code} This causes the task to remain in RUNNING state, but no further invocations of {{poll()}} are done. Of course we'll work around this and make sure to not produce records with invalid topic names, but I think the source task should transition to FAILED state in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16588621#comment-16588621 ] Gunnar Morling commented on KAFKA-3821: --- Hey [~rhauch], [~ewencp] et al., trying to move forward with this issue, I've created a quick draft PR of what I had in mind: [PR #5553|https://github.com/apache/kafka/pull/5553]. Can you let me know whether you think that's an acceptable solution for this issue? Thanks! > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > Fix For: 2.1.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564900#comment-16564900 ] Gunnar Morling commented on KAFKA-3821: --- One other alternative came to my mind which avoids modelling {{OffsetSourceRecord}} as a sub-class of {{SourceRecord}}. There could be this new method: {code} interface SourceTask { default OffsetPosition getOffset() { return null; } } {code} This method would be called by Kafka Connect always after calling {{poll()}}. {{OffsetPosition}} would be a container containing {{Map sourcePartition}} and {{Map sourceOffset}}. The default implementation would be a default method returning null, i.e. the change would be backwards-compatible. If a connector implements the new method, it can return its current source offset, without emitting another actual source record (by returning an empty list from {{poll()}}). This would address the two use cases we have in Debezium for this: * Emit an offset indicating that an initial DB snapshot has been completed after the last snapshot record has been emitted * Regularly emit the processed offsets from the source DB (e.g. MySQL binlog position) also if we don't emit any actual source records for a longer period of time. Currently it can happen due to filter configuration (the user is only interested in capturing some of the tables from their source DB) that we process the DB logs for a long time without a way communicate the processed offsets to Kafka Connect. This will cause large parts of the log to be reprocessed after a connector restart and also causes larger parts of the logs than needed to be retained in the source DB). Would that be an acceptable way forward? I've come to think that modelling {{OffsetSourceRecord}} just isn't right; it feels a bit like Java's {{Stack}} class which extends {{List}} and that way exposes lots of methods which shouldn't exist in its API. > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > Fix For: 2.1.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512618#comment-16512618 ] Gunnar Morling commented on KAFKA-7058: --- Thanks for the quick review, [~rhauch]. It's definitely not a blocker for us, we worked around it by having our own equals() implementation for schemas for now. I'm curious about the Kafka versions maintained going forward, though. Will it be 0.10.x and 2.0, but not 1.x? > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0, 1.1.0 >Reporter: Gunnar Morling >Priority: Major > > {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' > default values, but this doesn't work correctly if the default values in fact > are arrays. In this case, always {{false}} will be returned, also if the > default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-7058: -- Component/s: KafkaConnect > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > {ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default > values, but this doesn't work correctly if the default values in fact are > arrays. In this case, always {false} will be returned, also if the default > value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-7058: -- Description: {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' default values, but this doesn't work correctly if the default values in fact are arrays. In this case, always {{false}} will be returned, also if the default value arrays actually are the same. (was: {ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default values, but this doesn't work correctly if the default values in fact are arrays. In this case, always {false} will be returned, also if the default value arrays actually are the same.) > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' > default values, but this doesn't work correctly if the default values in fact > are arrays. In this case, always {{false}} will be returned, also if the > default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512261#comment-16512261 ] Gunnar Morling commented on KAFKA-7058: --- I'll send a pull request for this in a bit. > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' > default values, but this doesn't work correctly if the default values in fact > are arrays. In this case, always {{false}} will be returned, also if the > default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
Gunnar Morling created KAFKA-7058: - Summary: ConnectSchema#equals() broken for array-typed default values Key: KAFKA-7058 URL: https://issues.apache.org/jira/browse/KAFKA-7058 Project: Kafka Issue Type: Bug Reporter: Gunnar Morling {ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default values, but this doesn't work correctly if the default values in fact are arrays. In this case, always {false} will be returned, also if the default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16483856#comment-16483856 ] Gunnar Morling commented on KAFKA-3821: --- Hey [~rhauch], thanks for getting back to this one. I don't think anything in particular speaks against the sub-type idea apart from your very first comment: "Neither is clean or elegant". IMO, such {{OffsetSourceRecord}} shouldn't extend {{SourceRecord}} from an OO modelling perspective, as it doesn't have key, value, schemas nor a topic. It kinda would have made sense to model it the other way around ({{SourceRecord extends OffsetRecord}}), but it's too late to change it that way. That's why I'd have preferred the suggested new version of {{poll()}} with a receiver parameter, it seemed more logical in terms of proper modelling. But I'm not hung up on this one, if you think the sub-class approach is the way to go and it's something you'd find acceptable in the Kafka API, why not. I can try and start writing up a KIP along these lines, but I'm afraid that'll miss the next release's deadline (this week?). > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > Fix For: 2.0.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410487#comment-16410487 ] Gunnar Morling commented on KAFKA-3821: --- Hey [~ewencp], thanks for your extensive reply! I agree this discussion shouldn't be centered around saving or not saving that one allocation. I just added this to point out that the receiver approach leaves Kafka Connect more freedom in terms of implementing this. I.e. that way, in theory, you could even do crazy things such as storing incoming records off-heap. But yeah, most likely you'd just use a list yourselves. But that's not why I was suggesting this. The more substantial advantage I see is that it allows to add further methods down the road without breaking existing implementors of this contract. [~rhauch] e.g. mentioned methods for TX handling. Assuming these calls must be done in the correct ordering when submitting records, such methods couldn't really be added to {{ConnectorSourceTask}} itself. It's the same for offset handling which is the original use case we're after here. On the offset handling itself, there's two aspects to this. One is that we'd like to submit an offset once a snapshot is completed. Currently, we're doing what you described with {quote} would another option be having Debezium read 1 record forward to determine before returning the record and/or constructing its offset whether this is the final record of the snapshot {quote} But this leads back to complexity of this contract for implementors. Having a dedicated way for submitting the offset once the snapshot is done is arguably simpler to implement (and reason about when reading the code) than doing this "read forward" and delayed processing of it. In terms of uniqueness of offsets, they _are_ unique also during snapshotting (binlog position), but we need a way to say that a snapshot has been completed. I see though how that could be considered as a "style thing" mostly. There's another case though where we'd benefit - substantially - from being able to submit offsets explicitly. This is where a connector supports whitelisting of captured tables and no changes are done to the whitelisted tables in a while (a [comment above|https://issues.apache.org/jira/browse/KAFKA-3821?focusedCommentId=15973506&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15973506] touches on this). In this case the offset in the event source progresses (e.g. MySQL's binlog position), but as no record is of interest as per the whitelist config, we have no way to ever submit these offsets. Then, after a connector restart, we'd be forced to re-read much larger portions of the binlog than actually required. We currently work around this in Debezium by regularly emitting records to a heartbeat topic. This allows us to submit these offsets, also if no changes to the whitelisted tables are applied. Now the original idea above was to emit specific subclasses of {{SourceRecord}} for such "offset-only" records, but I wanted to bring up the receiver parameter idea because a) it feels less hackish to me and b) opens the door for further API additions as described above. I hope this makes sense; what's proposed is the best coming to my mind right for the issues we try resolve (while keeping the API reasonably abstract so to cater for other use cases, too). Happy about any alternative proposals of course. Thanks for this great discussion! > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404435#comment-16404435 ] Gunnar Morling commented on KAFKA-3821: --- Hey [~rhauch], another alternative API crossed my mind. It's a bigger deviation from the current design but also is more flexible in terms of future extensions: {code} public interface WorkerSourceTask2 { public void poll(SourceRecordReceiver receiver); // other methods from WorkerSourceTask public interface SourceRecordReceiver { void sourceRecord(SourceRecord record); void offset(Map sourcePartition, Map sourceOffset); } } {code} I.e. instead of having {{SourceRecord}} subclasses, there'd be a new receiver-type parameter which is invoked to emit any records. This also opens up the door for future other extensions, e.g. a task might enforce commit of offsets at a given time. Adding methods like that can be done in a compatible way via {{SourceRecordReceiver}} (as it's implemented by KC, not connectors). As a small further plus, it also avoids allocation of lists if just a single record must be emitted. Any thoughts on that one? If you don't rule it out for some blocking reason I oversaw, how should this be discussed then? As an alternative in the KIP document? > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404435#comment-16404435 ] Gunnar Morling edited comment on KAFKA-3821 at 3/19/18 7:07 AM: Hey [~rhauch], another alternative API crossed my mind. It's a bigger deviation from the current design but also is more flexible in terms of future extensions: {code} public interface WorkerSourceTask2 { public void poll(SourceRecordReceiver receiver); // other methods from WorkerSourceTask public interface SourceRecordReceiver { void sourceRecord(SourceRecord record); void offset(Map sourcePartition, Map sourceOffset); } } {code} I.e. instead of having {{SourceRecord}} subclasses, there'd be a new receiver-type parameter which is invoked by {{poll()}} implementations to emit any records. This also opens up the door for future other extensions, e.g. a task might enforce commit of offsets at a given time. Adding methods like that can be done in a compatible way via {{SourceRecordReceiver}} (as it's implemented by KC, not connectors). As a small further plus, it also avoids allocation of lists if just a single record must be emitted. Any thoughts on that one? If you don't rule it out for some blocking reason I oversaw, how should this be discussed then? As an alternative in the KIP document? was (Author: gunnar.morling): Hey [~rhauch], another alternative API crossed my mind. It's a bigger deviation from the current design but also is more flexible in terms of future extensions: {code} public interface WorkerSourceTask2 { public void poll(SourceRecordReceiver receiver); // other methods from WorkerSourceTask public interface SourceRecordReceiver { void sourceRecord(SourceRecord record); void offset(Map sourcePartition, Map sourceOffset); } } {code} I.e. instead of having {{SourceRecord}} subclasses, there'd be a new receiver-type parameter which is invoked to emit any records. This also opens up the door for future other extensions, e.g. a task might enforce commit of offsets at a given time. Adding methods like that can be done in a compatible way via {{SourceRecordReceiver}} (as it's implemented by KC, not connectors). As a small further plus, it also avoids allocation of lists if just a single record must be emitted. Any thoughts on that one? If you don't rule it out for some blocking reason I oversaw, how should this be discussed then? As an alternative in the KIP document? > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366411#comment-16366411 ] Gunnar Morling commented on KAFKA-6566: --- +1 for more exploration whether calling stop() was the original intention. FWIW, several comments in Debezium indicate that this at least was the asumption on that side when the code was written (of course that assumption may have been right or wrong). Note that calling stop() would be the only chance for the task to clean up its resources after an exception on the KC side of the polling loop (while we could have our entire polling logic in a try/catch block, there's no chance to react to exceptions in WorkerSourceTask). > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List> taskConfigs(int maxTasks) { > return Collections.singletonList(Collections.singletonMap("foo", > "bar")); > } > @Override > public void stop() { > } > @Override > public ConfigDef config() { > return new ConfigDef(); > } > public static class TestTask extends SourceTask { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public List poll() throws InterruptedException { > throw new RuntimeException(); > } > @Override > public void stop() { > System.out.println("stop() called"); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-6566: -- Affects Version/s: 1.0.0 > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List> taskConfigs(int maxTasks) { > return Collections.singletonList(Collections.singletonMap("foo", > "bar")); > } > @Override > public void stop() { > } > @Override > public ConfigDef config() { > return new ConfigDef(); > } > public static class TestTask extends SourceTask { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public List poll() throws InterruptedException { > throw new RuntimeException(); > } > @Override > public void stop() { > System.out.println("stop() called"); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
Gunnar Morling created KAFKA-6566: - Summary: SourceTask#stop() not called after exception raised in poll() Key: KAFKA-6566 URL: https://issues.apache.org/jira/browse/KAFKA-6566 Project: Kafka Issue Type: Bug Reporter: Gunnar Morling Having discussed this with [~rhauch], it has been my assumption that {{SourceTask#stop()}} will be called by the Kafka Connect framework in case an exception has been raised in {{poll()}}. That's not the case, though. As an example see the connector and task below. Calling {{stop()}} after an exception in {{poll()}} seems like a very useful action to take, as it'll allow the task to clean up any resources such as releasing any database connections, right after that failure and not only once the connector is stopped. {code} package com.example; import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; public class TestConnector extends SourceConnector { @Override public String version() { return null; } @Override public void start(Map props) { } @Override public Class taskClass() { return TestTask.class; } @Override public List> taskConfigs(int maxTasks) { return Collections.singletonList(Collections.singletonMap("foo", "bar")); } @Override public void stop() { } @Override public ConfigDef config() { return new ConfigDef(); } public static class TestTask extends SourceTask { @Override public String version() { return null; } @Override public void start(Map props) { } @Override public List poll() throws InterruptedException { throw new RuntimeException(); } @Override public void stop() { System.out.println("stop() called"); } } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6551) Unbounded queues in WorkerSourceTask cause OutOfMemoryError
Gunnar Morling created KAFKA-6551: - Summary: Unbounded queues in WorkerSourceTask cause OutOfMemoryError Key: KAFKA-6551 URL: https://issues.apache.org/jira/browse/KAFKA-6551 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Gunnar Morling A Debezium user reported an {{OutOfMemoryError}} to us, with over 50,000 messages in the {{WorkerSourceTask#outstandingMessages}} map. This map is unbounded and I can't see any way of "rate limiting" which would control how many records are added to it. Growth can only indirectly be limited by reducing the offset flush interval, but as connectors can return large amounts of messages in single {{poll()}} calls that's not sufficient in all cases. Note the user reported this issue during snapshotting a database, i.e. a high number of records arrived in a very short period of time. To solve the problem I'd suggest to make this map backpressure-aware and thus prevent its indefinite growth, so that no further records will be polled from the connector until messages have been taken out of the map again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349409#comment-16349409 ] Gunnar Morling commented on KAFKA-3821: --- Thanks for the pointers, I'll give it a try. > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6515) Add toString() method to kafka connect Field class
[ https://issues.apache.org/jira/browse/KAFKA-6515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349389#comment-16349389 ] Gunnar Morling commented on KAFKA-6515: --- Filed a PR as shown above, but can't transition this to "Patch available" due to lack of permissions I reckon. > Add toString() method to kafka connect Field class > -- > > Key: KAFKA-6515 > URL: https://issues.apache.org/jira/browse/KAFKA-6515 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Bartłomiej Tartanus >Priority: Minor > > Currently testing is really painful: > {code:java} > org.apache.kafka.connect.data.Field@1d51df1f was not equal to > org.apache.kafka.connect.data.Field@c0d62cd8{code} > > toString() method would fix this, so please add one. :) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348515#comment-16348515 ] Gunnar Morling commented on KAFKA-3821: --- This is a feature which would be very helpful for the Debezium project. If no one else is working on it yet, I'd like to give it a try. Is there a description of the process to follow? I.e. where could I find some information on how I had to go about proposing a KIP for this? > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
[ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348345#comment-16348345 ] Gunnar Morling edited comment on KAFKA-6252 at 2/1/18 10:27 AM: I also see this issue and can confirm it's not a connector still stuck in its polling loop. That being said I think interrupting {{poll()}} after a while after a task fails to stop seems a reasonable thing to do to me (i.e. option #1 from [~rhauch]'s comment above). In fact, I was surprised that it is not happening yet, given that {{poll()}} is declared to throw {{InterruptedException}}. Apart from the matter of metrics it may help to prevent threads of incorrectly implemented connectors hang around after stopping the connector. It won't work in all cases but it should help in many ({{poll()}} will usually invoke and block on a method dealing correctly with interruption). was (Author: gunnar.morling): I also see this issue and can confirm it's not a connector still stuck in its polling loop. That being said I think interrupting {{poll()}} after a while after a task fails to stop seems a reasonable thing to do to me (i.e. option 1) from [~rhauch] comment above). In fact, I was surprised that it is not happening yet, given that {{poll()}} is declared to throw {{InterruptedException}}. Apart from the matter of metrics it may help to prevent threads of incorrectly implemented connectors hang around after stopping the connector. It won't work in all cases but it should help in many ({{poll()}} will usually invoke and block on a method dealing correctly with interruption). > A metric named 'XX' already exists, can't register another one. > --- > > Key: KAFKA-6252 > URL: https://issues.apache.org/jira/browse/KAFKA-6252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Alexis Sellier >Assignee: Arjun Satish >Priority: Critical > Fix For: 1.1.0, 1.0.1 > > > When a connector crashes (or is not implemented correctly by not > stopping/interrupting {{poll()}}), It cannot be restarted and an exception > like this is thrown > {code:java} > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=offset-commit-max-time-ms, group=connector-task-metrics, > description=The maximum time in milliseconds taken by this task to commit > offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already > exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241) > at > org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328) > at > org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98) > at > org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > I guess it's because the function taskMetricsGroup.close is not call in all > the cases -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
[ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348345#comment-16348345 ] Gunnar Morling edited comment on KAFKA-6252 at 2/1/18 10:27 AM: I also see this issue and can confirm it's not a connector still stuck in its polling loop. That being said I think interrupting {{poll()}} after a while after a task fails to stop seems a reasonable thing to do to me (i.e. option 1) from [~rhauch] comment above). In fact, I was surprised that it is not happening yet, given that {{poll()}} is declared to throw {{InterruptedException}}. Apart from the matter of metrics it may help to prevent threads of incorrectly implemented connectors hang around after stopping the connector. It won't work in all cases but it should help in many ({{poll()}} will usually invoke and block on a method dealing correctly with interruption). was (Author: gunnar.morling): I also see this issue and can confirm it's not a connector still stuck in its polling loop. That being said I think interrupting {{poll()}} after a while after a task not stopping seems a reasonable thing to do to me. In fact, I was surprised that it is not happening yet, given that {{poll()}} is declared to throw {{InterruptedException}}. Apart from the matter of metrics it may help to prevent threads of incorrectly implemented connectors hang around after stopping the connector. It won't work in all cases but it should help in many ({{poll()}} will usually invoke and block on a method dealing correctly with interruption), > A metric named 'XX' already exists, can't register another one. > --- > > Key: KAFKA-6252 > URL: https://issues.apache.org/jira/browse/KAFKA-6252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Alexis Sellier >Assignee: Arjun Satish >Priority: Critical > Fix For: 1.1.0, 1.0.1 > > > When a connector crashes (or is not implemented correctly by not > stopping/interrupting {{poll()}}), It cannot be restarted and an exception > like this is thrown > {code:java} > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=offset-commit-max-time-ms, group=connector-task-metrics, > description=The maximum time in milliseconds taken by this task to commit > offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already > exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241) > at > org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328) > at > org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98) > at > org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > I guess it's because the function taskMetricsGroup.close is not call in all > the cases -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
[ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348345#comment-16348345 ] Gunnar Morling commented on KAFKA-6252: --- I also see this issue and can confirm it's not a connector still stuck in its polling loop. That being said I think interrupting {{poll()}} after a while after a task not stopping seems a reasonable thing to do to me. In fact, I was surprised that it is not happening yet, given that {{poll()}} is declared to throw {{InterruptedException}}. Apart from the matter of metrics it may help to prevent threads of incorrectly implemented connectors hang around after stopping the connector. It won't work in all cases but it should help in many ({{poll()}} will usually invoke and block on a method dealing correctly with interruption), > A metric named 'XX' already exists, can't register another one. > --- > > Key: KAFKA-6252 > URL: https://issues.apache.org/jira/browse/KAFKA-6252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Alexis Sellier >Assignee: Arjun Satish >Priority: Critical > Fix For: 1.1.0, 1.0.1 > > > When a connector crashes (or is not implemented correctly by not > stopping/interrupting {{poll()}}), It cannot be restarted and an exception > like this is thrown > {code:java} > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=offset-commit-max-time-ms, group=connector-task-metrics, > description=The maximum time in milliseconds taken by this task to commit > offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already > exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241) > at > org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328) > at > org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98) > at > org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > I guess it's because the function taskMetricsGroup.close is not call in all > the cases -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking
[ https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-6456: -- Priority: Minor (was: Major) > Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking > -- > > Key: KAFKA-6456 > URL: https://issues.apache.org/jira/browse/KAFKA-6456 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Minor > > The docs of {{poll()}} currently say "This method should block if no data is > currently available". This causes the task from transitioning to PAUSED > state, if there's no data available for a longer period of time. I'd > therefore suggest to reword like this: > {quote} > This method should block if no data is currently available but return control > to Kafka Connect periodically (by returning null). > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking
[ https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-6456: -- Summary: Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking (was: Improve JavaDoc of SourceTask#poll()) > Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking > -- > > Key: KAFKA-6456 > URL: https://issues.apache.org/jira/browse/KAFKA-6456 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > The docs of {{poll()}} currently say "This method should block if no data is > currently available". This causes the task from transitioning to PAUSED > state, if there's no data available for a longer period of time. I'd > therefore suggest to reword like this: > {quote} > This method should block if no data is currently available but return control > to Kafka Connect periodically (by returning null). > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll()
[ https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-6456: -- Description: The docs of {{poll()}} currently say "This method should block if no data is currently available". This causes the task from transitioning to PAUSED state, if there's no data available for a longer period of time. I'd therefore suggest to reword like this: {quote} This method should block if no data is currently available but return control to Kafka Connect periodically (by returning null). {quote} > Improve JavaDoc of SourceTask#poll() > > > Key: KAFKA-6456 > URL: https://issues.apache.org/jira/browse/KAFKA-6456 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > The docs of {{poll()}} currently say "This method should block if no data is > currently available". This causes the task from transitioning to PAUSED > state, if there's no data available for a longer period of time. I'd > therefore suggest to reword like this: > {quote} > This method should block if no data is currently available but return control > to Kafka Connect periodically (by returning null). > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll()
[ https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-6456: -- Summary: Improve JavaDoc of SourceTask#poll() (was: Improve JavaDoc of SourceTask) > Improve JavaDoc of SourceTask#poll() > > > Key: KAFKA-6456 > URL: https://issues.apache.org/jira/browse/KAFKA-6456 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6456) Improve JavaDoc of SourceTask
Gunnar Morling created KAFKA-6456: - Summary: Improve JavaDoc of SourceTask Key: KAFKA-6456 URL: https://issues.apache.org/jira/browse/KAFKA-6456 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Gunnar Morling -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16257642#comment-16257642 ] Gunnar Morling commented on KAFKA-3821: --- Hi [~rhauch], seeing that this one is assigned for 1.1.0. Have you created a KIP for it already, or plans to do so soon? I like the idea of the dedicated {{OffsetRecord}}. > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch > Labels: needs-kip > Fix For: 1.1.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v6.4.14#64029)