[jira] [Created] (KAFKA-13485) Restart connectors after RetriableException raised from Task::start()

2021-11-26 Thread Gunnar Morling (Jira)
Gunnar Morling created KAFKA-13485:
--

 Summary: Restart connectors after RetriableException raised from 
Task::start()
 Key: KAFKA-13485
 URL: https://issues.apache.org/jira/browse/KAFKA-13485
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Gunnar Morling


If a {{RetriableException}} is raised from {{Task::start()}}, this doesn't 
trigger an attempt to start that connector again. I.e. the restart 
functionality currently is only implemented for exceptions raised from 
{{poll()}}/{{put()}}. Triggering restarts also upon failures during {{start()}} 
would be desirable, so to circumvent temporary failure conditions like a 
network hickup which currrently require a manual restart of the affected tasks, 
if a connector for instance establishes a database connection during 
{{start()}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13478) KIP-802: Validation Support for Kafka Connect SMT Options

2021-11-24 Thread Gunnar Morling (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-13478:
---
Description: Implement 
[KIP-802|https://cwiki.apache.org/confluence/display/KAFKA/KIP-802%3A+Validation+Support+for+Kafka+Connect+SMT+Options],
 adding validation support for SMT options.  (was: Implement [KIP-802|KIP-802: 
Validation Support for Kafka Connect SMT Options], adding validation support 
for SMT options.)

> KIP-802: Validation Support for Kafka Connect SMT Options
> -
>
> Key: KAFKA-13478
> URL: https://issues.apache.org/jira/browse/KAFKA-13478
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> Implement 
> [KIP-802|https://cwiki.apache.org/confluence/display/KAFKA/KIP-802%3A+Validation+Support+for+Kafka+Connect+SMT+Options],
>  adding validation support for SMT options.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13478) KIP-802: Validation Support for Kafka Connect SMT Options

2021-11-24 Thread Gunnar Morling (Jira)
Gunnar Morling created KAFKA-13478:
--

 Summary: KIP-802: Validation Support for Kafka Connect SMT Options
 Key: KAFKA-13478
 URL: https://issues.apache.org/jira/browse/KAFKA-13478
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Gunnar Morling


Implement [KIP-802|KIP-802: Validation Support for Kafka Connect SMT Options], 
adding validation support for SMT options.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss

2021-05-20 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348804#comment-17348804
 ] 

Gunnar Morling commented on KAFKA-12801:


Ok, so I've added the two JFR recordings to my example repository 
(https://github.com/gunnarmorling/debezium-examples/commit/e6ce9f2db398ee042e9cc0e611310eeddf3c9427),
 which also contains the set-up for reproducing this.

> High CPU load after restarting brokers subsequent to quorum loss
> 
>
> Key: KAFKA-12801
> URL: https://issues.apache.org/jira/browse/KAFKA-12801
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.8.0
>Reporter: Gunnar Morling
>Priority: Major
>
> I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of 
> three Kafka nodes (all combined nodes), and one Kafka Connect node. After 
> starting all components, I first stop the current controller of the Kafka 
> cluster, then I stop the then controller of the Kafka cluster. At this point, 
> only one Kafka node out of the original three and Connect is running.
> When now restarting the two stopped Kafka nodes, CPU load on the Connect node 
> and the two broker nodes goes up to 100% and remains at that level for an 
> indefinite amount of time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss

2021-05-20 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348802#comment-17348802
 ] 

Gunnar Morling commented on KAFKA-12801:


Hum, so even with zipping (which brings the size down to 1.6 MB), I'm receiving 
this error upon upload:

{quote}
File "jfr.zip" was not uploaded
An internal error has occurred. Please contact your administrator
{quote}

> High CPU load after restarting brokers subsequent to quorum loss
> 
>
> Key: KAFKA-12801
> URL: https://issues.apache.org/jira/browse/KAFKA-12801
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.8.0
>Reporter: Gunnar Morling
>Priority: Major
>
> I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of 
> three Kafka nodes (all combined nodes), and one Kafka Connect node. After 
> starting all components, I first stop the current controller of the Kafka 
> cluster, then I stop the then controller of the Kafka cluster. At this point, 
> only one Kafka node out of the original three and Connect is running.
> When now restarting the two stopped Kafka nodes, CPU load on the Connect node 
> and the two broker nodes goes up to 100% and remains at that level for an 
> indefinite amount of time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss

2021-05-20 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348204#comment-17348204
 ] 

Gunnar Morling commented on KAFKA-12801:


Hey [~lucasbradstreet], ok, so I got two JFR recordings (one from one of the 
Kafka nodes with high CPU load, one from Connect), but attaching to this issue 
fails (something about the file type perhaps). Do you have any other way of 
receiving those files? They have a size of 5.2 MB together. Alternatively, to 
reproduce this by yourself, you also could follow the steps I've described 
here: https://www.morling.dev/blog/exploring-zookeeper-less-kafka/.

> High CPU load after restarting brokers subsequent to quorum loss
> 
>
> Key: KAFKA-12801
> URL: https://issues.apache.org/jira/browse/KAFKA-12801
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.8.0
>Reporter: Gunnar Morling
>Priority: Major
>
> I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of 
> three Kafka nodes (all combined nodes), and one Kafka Connect node. After 
> starting all components, I first stop the current controller of the Kafka 
> cluster, then I stop the then controller of the Kafka cluster. At this point, 
> only one Kafka node out of the original three and Connect is running.
> When now restarting the two stopped Kafka nodes, CPU load on the Connect node 
> and the two broker nodes goes up to 100% and remains at that level for an 
> indefinite amount of time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12806) KRaft: Confusing leadership status exposed in metrics for controller without quorum

2021-05-18 Thread Gunnar Morling (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-12806:
---
Summary: KRaft: Confusing leadership status exposed in metrics for 
controller without quorum  (was: KRaft: Confusing leadership status exposed for 
controller without quorum)

> KRaft: Confusing leadership status exposed in metrics for controller without 
> quorum
> ---
>
> Key: KAFKA-12806
> URL: https://issues.apache.org/jira/browse/KAFKA-12806
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Gunnar Morling
>Priority: Major
>
> I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of 
> three Kafka nodes (all combined nodes). After starting all components, I 
> first stop the current controller of the Kafka cluster, then I stop the then 
> controller of the Kafka cluster. At this point, only one Kafka node out of 
> the original three and Connect is running. In the new KRaft-based metrics, 
> "leader" is exposed as the role for that node, and its id is shown as the 
> current leader. Also in the metadata shell, that node is shown as the quorum 
> leader via /metadataQuorum/leader. This is pretty confusing, as one out of 
> three nodes cannot have the quorum. I believe this is mostly an issue of 
> displaying the status, as for instance creating a topic in this state times 
> out.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12806) KRaft: Confusing leadership status exposed for controller without quorum

2021-05-18 Thread Gunnar Morling (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-12806:
---
Description: 
I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of three 
Kafka nodes (all combined nodes). After starting all components, I first stop 
the current controller of the Kafka cluster, then I stop the then controller of 
the Kafka cluster. At this point, only one Kafka node out of the original three 
and Connect is running. In the new KRaft-based metrics, "leader" is exposed as 
the role for that node, and its id is shown as the current leader. Also in the 
metadata shell, that node is shown as the quorum leader via 
/metadataQuorum/leader. This is pretty confusing, as one out of three nodes 
cannot have the quorum. I believe this is mostly an issue of displaying the 
status, as for instance creating a topic in this state times out.
 

> KRaft: Confusing leadership status exposed for controller without quorum
> 
>
> Key: KAFKA-12806
> URL: https://issues.apache.org/jira/browse/KAFKA-12806
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Gunnar Morling
>Priority: Major
>
> I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of 
> three Kafka nodes (all combined nodes). After starting all components, I 
> first stop the current controller of the Kafka cluster, then I stop the then 
> controller of the Kafka cluster. At this point, only one Kafka node out of 
> the original three and Connect is running. In the new KRaft-based metrics, 
> "leader" is exposed as the role for that node, and its id is shown as the 
> current leader. Also in the metadata shell, that node is shown as the quorum 
> leader via /metadataQuorum/leader. This is pretty confusing, as one out of 
> three nodes cannot have the quorum. I believe this is mostly an issue of 
> displaying the status, as for instance creating a topic in this state times 
> out.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12806) KRaft: Confusing leadership status exposed for controller without quorum

2021-05-18 Thread Gunnar Morling (Jira)
Gunnar Morling created KAFKA-12806:
--

 Summary: KRaft: Confusing leadership status exposed for controller 
without quorum
 Key: KAFKA-12806
 URL: https://issues.apache.org/jira/browse/KAFKA-12806
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.0
Reporter: Gunnar Morling






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss

2021-05-18 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346835#comment-17346835
 ] 

Gunnar Morling commented on KAFKA-12801:


Would a JFR profiling work, too? I could provide that one a bit more easily, as 
I'm more familiar with this tool.

> High CPU load after restarting brokers subsequent to quorum loss
> 
>
> Key: KAFKA-12801
> URL: https://issues.apache.org/jira/browse/KAFKA-12801
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.8.0
>Reporter: Gunnar Morling
>Priority: Major
>
> I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of 
> three Kafka nodes (all combined nodes), and one Kafka Connect node. After 
> starting all components, I first stop the current controller of the Kafka 
> cluster, then I stop the then controller of the Kafka cluster. At this point, 
> only one Kafka node out of the original three and Connect is running.
> When now restarting the two stopped Kafka nodes, CPU load on the Connect node 
> and the two broker nodes goes up to 100% and remains at that level for an 
> indefinite amount of time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss

2021-05-17 Thread Gunnar Morling (Jira)
Gunnar Morling created KAFKA-12801:
--

 Summary: High CPU load after restarting brokers subsequent to 
quorum loss
 Key: KAFKA-12801
 URL: https://issues.apache.org/jira/browse/KAFKA-12801
 Project: Kafka
  Issue Type: Bug
  Components: core, KafkaConnect
Affects Versions: 2.8.0
Reporter: Gunnar Morling


I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of three 
Kafka nodes (all combined nodes), and one Kafka Connect node. After starting 
all components, I first stop the current controller of the Kafka cluster, then 
I stop the then controller of the Kafka cluster. At this point, only one Kafka 
node out of the original three and Connect is running.

When now restarting the two stopped Kafka nodes, CPU load on the Connect node 
and the two broker nodes goes up to 100% and remains at that level for an 
indefinite amount of time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2020-02-07 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17032552#comment-17032552
 ] 

Gunnar Morling commented on KAFKA-7052:
---

Thanks for commenting, [~rhauch]! My intention was to actually keep the current 
behavior by means of the default setting of the new option:

* "fail" for records with a schema (it's raising an 
{{IllegalArgumentException}} but could be NPE of course if the exact exception 
type is a concern; I think it shouldn't, as the original NPE really is a 
weakness of the existing implementation that shouldn't be relied upon
* "return-null" for records without schema

I.e. without any explicit setting, the behavior will be exactly be the same as 
today (ignoring the changed exception type). That's why I didn't assume that'd 
need a KIP, but as per what you're saying, any new option mandates a KIP?

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2020-02-06 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031745#comment-17031745
 ] 

Gunnar Morling commented on KAFKA-7052:
---

Yes, that'd certainly work for me and I'd be happy to send a PR. Perhaps 
\{{behavior.on.non.existant.field}} = \{{(fail|drop|passon)}}?

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2020-02-06 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031720#comment-17031720
 ] 

Gunnar Morling commented on KAFKA-7052:
---

Hey [~rmoff], what I meant to say: raising _any_ exception is rather disruptive 
here. We surely could improve the exception message, but I don't think it would 
help you really, as the connector still would be in FAILED state when 
encountering a schema change event. I think your case would be best addressed 
if the SMT just left the key/value as-is, in case the extract field operation 
cannot be applied.

Taking a step back, it could also indicate a more general short-coming of 
Connect: SMTs applied to a source connector such as Debezium are applied to 
_all_ topics, whereas it actually may be desirable to apply them only to a 
_subset_ of topics produced by the connector (in case of Debezium, actual 
change data topics, while schema change and heartbeat topics shouldn't be 
targetted). Perhaps worth a KIP? [~tombentley], WDYT?

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7273) Converters should have access to headers.

2019-10-10 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948868#comment-16948868
 ] 

Gunnar Morling commented on KAFKA-7273:
---

{quote}
Headers is modifiable, it has add/remove methods
{quote}

[~sap1ens], yes, but e.g. the passed object could be a copy so changes to it 
are ignored, or an implementation is passed whose mutators raise an exeception 
etc.

I've filed https://github.com/apache/kafka/pull/7489 for making this explicit 
in the JavaDoc of the method.

> Converters should have access to headers.
> -
>
> Key: KAFKA-7273
> URL: https://issues.apache.org/jira/browse/KAFKA-7273
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Major
> Fix For: 2.4.0
>
>
> I found myself wanting to build a converter that stored additional type 
> information within headers. The converter interface does not allow a 
> developer to access to the headers in a Converter. I'm not suggesting that we 
> change the method for serializing them, rather that 
> *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* 
> and *toConnectData*. For example something like this.
> {code:java}
> import org.apache.kafka.connect.data.Schema;
> import org.apache.kafka.connect.data.SchemaAndValue;
> import org.apache.kafka.connect.header.Headers;
> import org.apache.kafka.connect.storage.Converter;
> public interface Converter {
>   default byte[] fromConnectData(String topic, Headers headers, Schema 
> schema, Object object) {
> return fromConnectData(topic, schema, object);
>   }
>   default SchemaAndValue toConnectData(String topic, Headers headers, byte[] 
> payload) {
> return toConnectData(topic, payload);
>   }
>   void configure(Map var1, boolean var2);
>   byte[] fromConnectData(String var1, Schema var2, Object var3);
>   SchemaAndValue toConnectData(String var1, byte[] var2);
> }
> {code}
> This would be a similar approach to what was already done with 
> ExtendedDeserializer and ExtendedSerializer in the Kafka client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7273) Converters should have access to headers.

2019-10-10 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948354#comment-16948354
 ] 

Gunnar Morling edited comment on KAFKA-7273 at 10/10/19 9:46 AM:
-

Hey [~rhauch], [~jcustenborder], just came across this change, which I'd have 
suggested myself otherwise :-) Very nice for a specific use case I have in mind.

One question for clarification, though: is the {{Headers}} object passed to 
{{fromConnectData()}} modifiable, i.e. can a converter add/override/remove 
existing headers before the message is sent to Kafka? This is what I'd like to 
do, and it's not quite clear to me whether that's allowed or not. Modifying the 
input parameter as a side-effect might be a bit surprising, given the method 
returns something.

If it is supported, I'll be happy to send a PR with a clarifying sentence in 
the method JavaDoc. Thanks!


was (Author: gunnar.morling):
Hey [~rhauch], [~jcustenborder], just came across this change, which I'd have 
suggested myself otherwise :-) Very nice for a specific use case I have in mind.

One question for clarification, though: is the {{Headers}} object passed to 
{{fromConnectData()}} modifiable, i.e. can a converter add/override existing 
headers for before the message is sent to Kafka? This is what I'd like to do, 
and it's not quite clear to me whether that's allowed or not. Modifying the 
input parameter as a side-effect might be a bit surprising given the method 
returns something.

If it is supported, I'll be happy to send a PR with a clarifying sentence in 
the method JavaDoc. Thanks!

> Converters should have access to headers.
> -
>
> Key: KAFKA-7273
> URL: https://issues.apache.org/jira/browse/KAFKA-7273
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Major
> Fix For: 2.4.0
>
>
> I found myself wanting to build a converter that stored additional type 
> information within headers. The converter interface does not allow a 
> developer to access to the headers in a Converter. I'm not suggesting that we 
> change the method for serializing them, rather that 
> *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* 
> and *toConnectData*. For example something like this.
> {code:java}
> import org.apache.kafka.connect.data.Schema;
> import org.apache.kafka.connect.data.SchemaAndValue;
> import org.apache.kafka.connect.header.Headers;
> import org.apache.kafka.connect.storage.Converter;
> public interface Converter {
>   default byte[] fromConnectData(String topic, Headers headers, Schema 
> schema, Object object) {
> return fromConnectData(topic, schema, object);
>   }
>   default SchemaAndValue toConnectData(String topic, Headers headers, byte[] 
> payload) {
> return toConnectData(topic, payload);
>   }
>   void configure(Map var1, boolean var2);
>   byte[] fromConnectData(String var1, Schema var2, Object var3);
>   SchemaAndValue toConnectData(String var1, byte[] var2);
> }
> {code}
> This would be a similar approach to what was already done with 
> ExtendedDeserializer and ExtendedSerializer in the Kafka client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7273) Converters should have access to headers.

2019-10-10 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948354#comment-16948354
 ] 

Gunnar Morling commented on KAFKA-7273:
---

Hey [~rhauch], [~jcustenborder], just came across this change, which I'd have 
suggested myself otherwise :-) Very nice for a specific use case I have in mind.

One question for clarification, though: is the {{Headers}} object passed to 
{{fromConnectData()}} modifiable, i.e. can a converter add/override existing 
headers for before the message is sent to Kafka? This is what I'd like to do, 
and it's not quite clear to me whether that's allowed or not. Modifying the 
input parameter as a side-effect might be a bit surprising given the method 
returns something.

If it is supported, I'll be happy to send a PR with a clarifying sentence in 
the method JavaDoc. Thanks!

> Converters should have access to headers.
> -
>
> Key: KAFKA-7273
> URL: https://issues.apache.org/jira/browse/KAFKA-7273
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Major
> Fix For: 2.4.0
>
>
> I found myself wanting to build a converter that stored additional type 
> information within headers. The converter interface does not allow a 
> developer to access to the headers in a Converter. I'm not suggesting that we 
> change the method for serializing them, rather that 
> *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* 
> and *toConnectData*. For example something like this.
> {code:java}
> import org.apache.kafka.connect.data.Schema;
> import org.apache.kafka.connect.data.SchemaAndValue;
> import org.apache.kafka.connect.header.Headers;
> import org.apache.kafka.connect.storage.Converter;
> public interface Converter {
>   default byte[] fromConnectData(String topic, Headers headers, Schema 
> schema, Object object) {
> return fromConnectData(topic, schema, object);
>   }
>   default SchemaAndValue toConnectData(String topic, Headers headers, byte[] 
> payload) {
> return toConnectData(topic, payload);
>   }
>   void configure(Map var1, boolean var2);
>   byte[] fromConnectData(String var1, Schema var2, Object var3);
>   SchemaAndValue toConnectData(String var1, byte[] var2);
> }
> {code}
> This would be a similar approach to what was already done with 
> ExtendedDeserializer and ExtendedSerializer in the Kafka client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2019-09-27 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939744#comment-16939744
 ] 

Gunnar Morling commented on KAFKA-7052:
---

Looking into this, and it's an interesting case. The problem arises when the 
SMT gets one of the [schema change 
events|https://debezium.io/documentation/reference/0.9/connectors/mysql.html#schema-change-topic]
 from the Debezium MySQL connector which don't have the primary key structure 
as for instance your customers table. So I think actually it's not your 
intention to apply the SMT to these messages to begin with.

Question is how to deal with this case; I could see these options when 
encountering a message which doesn't have the given field:

* raise a more meaningful exception than NPE (pretty disruptive)
* return null
* leave the key/value unchanged

I think for your use case, the last option is the most useful one. But 
returning null might also make sense in others. This might require an option 
perhaps?

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-27 Thread Gunnar Morling (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-8523:
--
Description: 
When applying the {{InsertField}} transformation to a tombstone event, an 
exception is raised:

{code}
org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
absence of schema for [field insertion], found: null
at 
org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
at 
org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
at 
org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
at 
org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
{code}

~~AFAICS, the transform can still be made working in in this case by simply 
building up a new value map from scratch.~~

Update as per the discussion in the comments: tombstones should be left as-is 
by this SMT, as any insertion would defeat their purpose of enabling log 
compaction.

  was:
When applying the {{InsertField}} transformation to a tombstone event, an 
exception is raised:

{code}
org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
absence of schema for [field insertion], found: null
at 
org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
at 
org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
at 
org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
at 
org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
{code}

AFAICS, the transform can still be made working in in this case by simply 
building up a new value map from scratch.


> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
> Attachments: image-2019-09-17-15-53-44-038.png
>
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> ~~AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.~~
> Update as per the discussion in the comments: tombstones should be left as-is 
> by this SMT, as any insertion would defeat their purpose of enabling log 
> compaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-27 Thread Gunnar Morling (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-8523:
--
Description: 
When applying the {{InsertField}} transformation to a tombstone event, an 
exception is raised:

{code}
org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
absence of schema for [field insertion], found: null
at 
org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
at 
org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
at 
org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
at 
org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
{code}

-AFAICS, the transform can still be made working in in this case by simply 
building up a new value map from scratch.-

Update as per the discussion in the comments: tombstones should be left as-is 
by this SMT, as any insertion would defeat their purpose of enabling log 
compaction.

  was:
When applying the {{InsertField}} transformation to a tombstone event, an 
exception is raised:

{code}
org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
absence of schema for [field insertion], found: null
at 
org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
at 
org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
at 
org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
at 
org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
{code}

~~AFAICS, the transform can still be made working in in this case by simply 
building up a new value map from scratch.~~

Update as per the discussion in the comments: tombstones should be left as-is 
by this SMT, as any insertion would defeat their purpose of enabling log 
compaction.


> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
> Attachments: image-2019-09-17-15-53-44-038.png
>
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> -AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.-
> Update as per the discussion in the comments: tombstones should be left as-is 
> by this SMT, as any insertion would defeat their purpose of enabling log 
> compaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-27 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939730#comment-16939730
 ] 

Gunnar Morling commented on KAFKA-8523:
---

Hey [~rhauch], [~frederic.tardif], yes, we're all on the same page: tombstones 
shouldn't be modified at all by this SMT. I've updated and force-pushed the PR 
accordingly. It's good to go from my PoV now.

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
> Attachments: image-2019-09-17-15-53-44-038.png
>
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2019-09-17 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931671#comment-16931671
 ] 

Gunnar Morling commented on KAFKA-7052:
---

I'm wondering: is it solely a question of having a more meaningful exception, 
or should rather be null returned in this case. Seems like one might want 
either, depending on the use case.

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931662#comment-16931662
 ] 

Gunnar Morling edited comment on KAFKA-8523 at 9/17/19 5:08 PM:


Yes, I was arriving at pretty much the same conclusion; that said: is an option 
even needed: I've come to think that a tombstone should remain just that, a 
tombstone, i.e. I don't see a good reason to inject any value into a tombstone 
really; after all, it has specific semantics -- enabling compaction -- which 
shouldn't be altered. So I would suggest I simply update my PR so that it 
passes on tombstones unmodified and that should be good enought. WDYT?

Note, if we wanted to have an option, I think it would allow to pass on 
tombstones unmodified (as suggested above) OR to insert the new field, i.e. 
there wouldn't be an empty map really returned, but a map with a single entry 
for that new field (so what the current PR is doing). This could be done, but 
as argued it'd change the message's nature of being a tombstone, so it's 
probably not desirable?


was (Author: gunnar.morling):
Yes, I was arriving at pretty much the same conclusion; that said: is an option 
even needed: I've come to think that a tombstone should remain just that, a 
tombstone, i.e. I don't see a good reason to inject any value into a tombstone 
really; after all, it has specific semantics -- enabling compaction -- which 
shouldn't be altered. So I would suggest I simply update my PR so that it 
passes on tombstones unmodified and that should be good enought. WDYT?

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931662#comment-16931662
 ] 

Gunnar Morling commented on KAFKA-8523:
---

Yes, I was arriving at pretty much the same conclusion; that said: is an option 
even needed: I've come to think that a tombstone should remain just that, a 
tombstone, i.e. I don't see a good reason to inject any value into a tombstone 
really; after all, it has specific semantics -- enabling compaction -- which 
shouldn't be altered. So I would suggest I simply update my PR so that it 
passes on tombstones unmodified and that should be good enought. WDYT?

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-06-20 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16868422#comment-16868422
 ] 

Gunnar Morling commented on KAFKA-8523:
---

Good question, [~rhauch]; I didn't consider it, but I could see both approaches 
making sense. Should we make this an option (insert into tombstones vs. pass 
them on unmodified) for that SMT?

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-06-11 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860794#comment-16860794
 ] 

Gunnar Morling commented on KAFKA-8523:
---

Filed https://github.com/apache/kafka/pull/6914.

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-06-11 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860688#comment-16860688
 ] 

Gunnar Morling commented on KAFKA-8523:
---

I'll provide a PR shortly.

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-06-11 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-8523:
-

 Summary: InsertField transformation fails when encountering 
tombstone event
 Key: KAFKA-8523
 URL: https://issues.apache.org/jira/browse/KAFKA-8523
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Gunnar Morling


When applying the {{InsertField}} transformation to a tombstone event, an 
exception is raised:

{code}
org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
absence of schema for [field insertion], found: null
at 
org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
at 
org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
at 
org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
at 
org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
{code}

AFAICS, the transform can still be made working in in this case by simply 
building up a new value map from scratch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8476) Kafka 2.2.1 distribution contains JAX-RS API twice

2019-06-03 Thread Gunnar Morling (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-8476:
--
Description: In kafka_2.12-2.2.1.tgz there is both javax.ws.rs-api-2.1.jar 
and javax.ws.rs-api-2.1.1.jar. I reckon only one should be there. There are 
also two versions of the inject API (javax.inject-1.jar and 
javax.inject-2.5.0-b42.jar) which might indicate an issue with dependency 
convergence.  (was: In kafka_2.12-2.2.1.tgz there is both 
javax.ws.rs-api-2.1.jar and javax.ws.rs-api-2.1.1.jar. I reckon only one should 
be there.)

> Kafka 2.2.1 distribution contains JAX-RS API twice
> --
>
> Key: KAFKA-8476
> URL: https://issues.apache.org/jira/browse/KAFKA-8476
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gunnar Morling
>Priority: Minor
>
> In kafka_2.12-2.2.1.tgz there is both javax.ws.rs-api-2.1.jar and 
> javax.ws.rs-api-2.1.1.jar. I reckon only one should be there. There are also 
> two versions of the inject API (javax.inject-1.jar and 
> javax.inject-2.5.0-b42.jar) which might indicate an issue with dependency 
> convergence.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8476) Kafka 2.2.1 distribution contains JAX-RS API twice

2019-06-03 Thread Gunnar Morling (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-8476:
--
Priority: Minor  (was: Major)

> Kafka 2.2.1 distribution contains JAX-RS API twice
> --
>
> Key: KAFKA-8476
> URL: https://issues.apache.org/jira/browse/KAFKA-8476
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gunnar Morling
>Priority: Minor
>
> In kafka_2.12-2.2.1.tgz there is both javax.ws.rs-api-2.1.jar and 
> javax.ws.rs-api-2.1.1.jar. I reckon only one should be there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8476) Kafka 2.2.1 distribution contains JAX-RS API twice

2019-06-03 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-8476:
-

 Summary: Kafka 2.2.1 distribution contains JAX-RS API twice
 Key: KAFKA-8476
 URL: https://issues.apache.org/jira/browse/KAFKA-8476
 Project: Kafka
  Issue Type: Bug
Reporter: Gunnar Morling


In kafka_2.12-2.2.1.tgz there is both javax.ws.rs-api-2.1.jar and 
javax.ws.rs-api-2.1.1.jar. I reckon only one should be there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value

2018-12-12 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719137#comment-16719137
 ] 

Gunnar Morling commented on KAFKA-3832:
---

What would be the right fix to this issue then, could it simply be changed to 
return a null value in this case? That should be simple enough, but I'm 
wondering whether there'll be a problem if some records have a schema 
(non-tombstones) and some others don't (tombstones).

> Kafka Connect's JSON Converter never outputs a null value
> -
>
> Key: KAFKA-3832
> URL: https://issues.apache.org/jira/browse/KAFKA-3832
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Prasanna Subburaj
>Priority: Major
>  Labels: newbie
>
> Kafka Connect's JSON Converter will never output a null value when 
> {{enableSchemas=true}}. This means that when a connector outputs a 
> {{SourceRecord}} with a null value, the JSON Converter will always produce a 
> message value with:
> {code:javascript}
> {
>   "schema": null,
>   "payload": null
> }
> {code}
> And, this means that while Kafka log compaction will always be able to remove 
> earlier messages with the same key, log compaction will never remove _all_ of 
> the messages with the same key. 
> The JSON Connector's {{fromConnectData(...)}} should always return null when 
> it is supplied a null value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7336) Kafka Connect source task hangs when producing record with invalid topic name

2018-08-24 Thread Gunnar Morling (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-7336:
--
Summary: Kafka Connect source task hangs when producing record with invalid 
topic name  (was: Kafka Connect source task when producing record with invalid 
topic name)

> Kafka Connect source task hangs when producing record with invalid topic name
> -
>
> Key: KAFKA-7336
> URL: https://issues.apache.org/jira/browse/KAFKA-7336
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> If a Kafka Connect source task returns a {{SourceRecord}} with an invalid 
> topic name (e.g. "dbserver1.inventory.test@data"), that source task hangs 
> (presumably indefinitely?) and doesn't continue it's polling loop. The log is 
> flooded with this message:
> {code}
> connect_1| 2018-08-24 08:47:29,014 WARN   ||  [Producer 
> clientId=producer-4] Error while fetching metadata with correlation id 833 : 
> {dbserver1.inventory.test@data=INVALID_TOPIC_EXCEPTION}   
> [org.apache.kafka.clients.NetworkClient]
> {code}
> The producer thread is stuck in the loop here:
> {code}
> KafkaProducer.waitOnMetadata(String, Integer, long) line: 938
> KafkaProducer.doSend(ProducerRecord, Callback) line: 823
> KafkaProducer.send(ProducerRecord, Callback) line: 803  
> WorkerSourceTask.sendRecords() line: 318  
> WorkerSourceTask.execute() line: 228  
> WorkerSourceTask(WorkerTask).doRun() line: 175
> WorkerSourceTask(WorkerTask).run() line: 219  
> Executors$RunnableAdapter.call() line: 511 
> FutureTask.run() line: 266 
> ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149
> ThreadPoolExecutor$Worker.run() line: 624 
> Thread.run() line: 748
> {code}
> This causes the task to remain in RUNNING state, but no further invocations 
> of {{poll()}} are done.
> Of course we'll work around this and make sure to not produce records with 
> invalid topic names, but I think the source task should transition to FAILED 
> state in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7336) Kafka Connect source task when producing record with invalid topic name

2018-08-24 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-7336:
-

 Summary: Kafka Connect source task when producing record with 
invalid topic name
 Key: KAFKA-7336
 URL: https://issues.apache.org/jira/browse/KAFKA-7336
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Gunnar Morling


If a Kafka Connect source task returns a {{SourceRecord}} with an invalid topic 
name (e.g. "dbserver1.inventory.test@data"), that source task hangs (presumably 
indefinitely?) and doesn't continue it's polling loop. The log is flooded with 
this message:

{code}
connect_1| 2018-08-24 08:47:29,014 WARN   ||  [Producer 
clientId=producer-4] Error while fetching metadata with correlation id 833 : 
{dbserver1.inventory.test@data=INVALID_TOPIC_EXCEPTION}   
[org.apache.kafka.clients.NetworkClient]
{code}

The producer thread is stuck in the loop here:

{code}
KafkaProducer.waitOnMetadata(String, Integer, long) line: 938  
KafkaProducer.doSend(ProducerRecord, Callback) line: 823  
KafkaProducer.send(ProducerRecord, Callback) line: 803
WorkerSourceTask.sendRecords() line: 318
WorkerSourceTask.execute() line: 228
WorkerSourceTask(WorkerTask).doRun() line: 175  
WorkerSourceTask(WorkerTask).run() line: 219
Executors$RunnableAdapter.call() line: 511   
FutureTask.run() line: 266   
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149  
ThreadPoolExecutor$Worker.run() line: 624   
Thread.run() line: 748  
{code}

This causes the task to remain in RUNNING state, but no further invocations of 
{{poll()}} are done.

Of course we'll work around this and make sure to not produce records with 
invalid topic names, but I think the source task should transition to FAILED 
state in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-08-22 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16588621#comment-16588621
 ] 

Gunnar Morling commented on KAFKA-3821:
---

Hey [~rhauch], [~ewencp] et al., trying to move forward with this issue, I've 
created a quick draft PR of what I had in mind: [PR 
#5553|https://github.com/apache/kafka/pull/5553]. Can you let me know whether 
you think that's an acceptable solution for this issue? Thanks!

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-08-01 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564900#comment-16564900
 ] 

Gunnar Morling commented on KAFKA-3821:
---

One other alternative came to my mind which avoids modelling 
{{OffsetSourceRecord}} as a sub-class of {{SourceRecord}}. There could be this 
new method:

{code}
interface SourceTask {
default OffsetPosition getOffset() {
   return null;
}
}
{code}

This method would be called by Kafka Connect always after calling {{poll()}}. 
{{OffsetPosition}} would be a container containing {{Map 
sourcePartition}} and {{Map sourceOffset}}. The default 
implementation would be a default method returning null, i.e. the change would 
be backwards-compatible.

If a connector implements the new method, it can return its current source 
offset, without emitting another actual source record (by returning an empty 
list from {{poll()}}). This would address the two use cases we have in Debezium 
for this:

* Emit an offset indicating that an initial DB snapshot has been completed 
after the last snapshot record has been emitted
* Regularly emit the processed offsets from the source DB (e.g. MySQL binlog 
position) also if we don't emit any actual source records for a longer period 
of time. Currently it can happen due to filter configuration (the user is only 
interested in capturing some of the tables from their source DB) that we 
process the DB logs for a long time without a way communicate the processed 
offsets to Kafka Connect. This will cause large parts of the log to be 
reprocessed after a connector restart and also causes larger parts of the logs 
than needed to be retained in the source DB).

Would that be an acceptable way forward? I've come to think that modelling 
{{OffsetSourceRecord}} just isn't right; it feels a bit like Java's {{Stack}} 
class which extends {{List}} and that way exposes lots of methods which 
shouldn't exist in its API.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512618#comment-16512618
 ] 

Gunnar Morling commented on KAFKA-7058:
---

Thanks for the quick review, [~rhauch]. It's definitely not a blocker for us, 
we worked around it by having our own equals() implementation for schemas for 
now.

I'm curious about the Kafka versions maintained going forward, though. Will it 
be 0.10.x and 2.0, but not 1.x?

> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Gunnar Morling
>Priority: Major
>
> {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' 
> default values, but this doesn't work correctly if the default values in fact 
> are arrays. In this case, always {{false}} will be returned, also if the 
> default value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-7058:
--
Component/s: KafkaConnect

> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> {ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default 
> values, but this doesn't work correctly if the default values in fact are 
> arrays. In this case, always {false} will be returned, also if the default 
> value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-7058:
--
Description: {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the 
schemas' default values, but this doesn't work correctly if the default values 
in fact are arrays. In this case, always {{false}} will be returned, also if 
the default value arrays actually are the same.  (was: {ConnectSchema#equals()} 
calls {{Objects#equals()}} for the schemas' default values, but this doesn't 
work correctly if the default values in fact are arrays. In this case, always 
{false} will be returned, also if the default value arrays actually are the 
same.)

> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' 
> default values, but this doesn't work correctly if the default values in fact 
> are arrays. In this case, always {{false}} will be returned, also if the 
> default value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512261#comment-16512261
 ] 

Gunnar Morling commented on KAFKA-7058:
---

I'll send a pull request for this in a bit.

> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' 
> default values, but this doesn't work correctly if the default values in fact 
> are arrays. In this case, always {{false}} will be returned, also if the 
> default value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-7058:
-

 Summary: ConnectSchema#equals() broken for array-typed default 
values
 Key: KAFKA-7058
 URL: https://issues.apache.org/jira/browse/KAFKA-7058
 Project: Kafka
  Issue Type: Bug
Reporter: Gunnar Morling


{ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default 
values, but this doesn't work correctly if the default values in fact are 
arrays. In this case, always {false} will be returned, also if the default 
value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-05-22 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16483856#comment-16483856
 ] 

Gunnar Morling commented on KAFKA-3821:
---

Hey [~rhauch], thanks for getting back to this one. I don't think anything in 
particular speaks against the sub-type idea apart from your very first comment: 
"Neither is clean or elegant". IMO, such {{OffsetSourceRecord}} shouldn't 
extend {{SourceRecord}} from an OO modelling perspective, as it doesn't have 
key, value, schemas nor a topic. It kinda would have made sense to model it the 
other way around ({{SourceRecord extends OffsetRecord}}), but it's too late to 
change it that way.

That's why I'd have preferred the suggested new version of {{poll()}} with a 
receiver parameter, it seemed more logical in terms of proper modelling. But 
I'm not hung up on this one, if you think the sub-class approach is the way to 
go and it's something you'd find acceptable in the Kafka API, why not. I can 
try and start writing up a KIP along these lines, but I'm afraid that'll miss 
the next release's deadline (this week?).

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.0.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-03-22 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410487#comment-16410487
 ] 

Gunnar Morling commented on KAFKA-3821:
---

Hey [~ewencp], thanks for your extensive reply! I agree this discussion 
shouldn't be centered around saving or not saving that one allocation. I just 
added this to point out that the receiver approach leaves Kafka Connect more 
freedom in terms of implementing this. I.e. that way, in theory, you could even 
do crazy things such as storing incoming records off-heap. But yeah, most 
likely you'd just use a list yourselves.

But that's not why I was suggesting this. The more substantial advantage I see 
is that it allows to add further methods down the road without breaking 
existing implementors of this contract. [~rhauch] e.g. mentioned methods for TX 
handling. Assuming these calls must be done in the correct ordering when 
submitting records, such methods couldn't really be added to 
{{ConnectorSourceTask}} itself. It's the same for offset handling which is the 
original use case we're after here.

On the offset handling itself, there's two aspects to this. One is that we'd 
like to submit an offset once a snapshot is completed. Currently, we're doing 
what you described with

{quote}
would another option be having Debezium read 1 record forward to determine 
before returning the record and/or constructing its offset whether this is the 
final record of the snapshot
{quote}

But this leads back to complexity of this contract for implementors. Having a 
dedicated way for submitting the offset once the snapshot is done is arguably 
simpler to implement (and reason about when reading the code) than doing this 
"read forward" and delayed processing of it.

In terms of uniqueness of offsets, they _are_ unique also during snapshotting 
(binlog position), but we need a way to say that a snapshot has been completed.

I see though how that could be considered as a "style thing" mostly. There's 
another case though where we'd benefit - substantially - from being able to 
submit offsets explicitly. This is where a connector supports whitelisting of 
captured tables and no changes are done to the whitelisted tables in a while (a 
[comment 
above|https://issues.apache.org/jira/browse/KAFKA-3821?focusedCommentId=15973506&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15973506]
 touches on this). In this case the offset in the event source progresses (e.g. 
MySQL's binlog position), but as no record is of interest as per the whitelist 
config, we have no way to ever submit these offsets. Then, after a connector 
restart, we'd be forced to re-read much larger portions of the binlog than 
actually required.

We currently work around this in Debezium by regularly emitting records to a 
heartbeat topic. This allows us to submit these offsets, also if no changes to 
the whitelisted tables are applied.

Now the original idea above was to emit specific subclasses of {{SourceRecord}} 
for such "offset-only" records, but I wanted to bring up the receiver parameter 
idea because a) it feels less hackish to me and b) opens the door for further 
API additions as described above.

I hope this makes sense; what's proposed is the best coming to my mind right 
for the issues we try resolve (while keeping the API reasonably abstract so to 
cater for other use cases, too). Happy about any alternative proposals of 
course. Thanks for this great discussion!

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-03-19 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404435#comment-16404435
 ] 

Gunnar Morling commented on KAFKA-3821:
---

Hey [~rhauch], another alternative API crossed my mind. It's a bigger deviation 
from the current design but also is more flexible in terms of future extensions:

{code}
public interface WorkerSourceTask2 {

public void poll(SourceRecordReceiver receiver);

// other methods from WorkerSourceTask

public interface SourceRecordReceiver {
void sourceRecord(SourceRecord record);
void offset(Map sourcePartition, Map 
sourceOffset);
}
}
{code}

I.e. instead of having {{SourceRecord}} subclasses, there'd be a new 
receiver-type parameter which is invoked to emit any records. This also opens 
up the door for future other extensions, e.g. a task might enforce commit of 
offsets at a given time. Adding methods like that can be done in a compatible 
way via {{SourceRecordReceiver}} (as it's implemented by KC, not connectors). 
As a small further plus, it also avoids allocation of lists if just a single 
record must be emitted.

Any thoughts on that one? If you don't rule it out for some blocking reason I 
oversaw, how should this be discussed then? As an alternative in the KIP 
document?

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-03-19 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404435#comment-16404435
 ] 

Gunnar Morling edited comment on KAFKA-3821 at 3/19/18 7:07 AM:


Hey [~rhauch], another alternative API crossed my mind. It's a bigger deviation 
from the current design but also is more flexible in terms of future extensions:

{code}
public interface WorkerSourceTask2 {

public void poll(SourceRecordReceiver receiver);

// other methods from WorkerSourceTask

public interface SourceRecordReceiver {
void sourceRecord(SourceRecord record);
void offset(Map sourcePartition, Map 
sourceOffset);
}
}
{code}

I.e. instead of having {{SourceRecord}} subclasses, there'd be a new 
receiver-type parameter which is invoked by {{poll()}} implementations to emit 
any records. This also opens up the door for future other extensions, e.g. a 
task might enforce commit of offsets at a given time. Adding methods like that 
can be done in a compatible way via {{SourceRecordReceiver}} (as it's 
implemented by KC, not connectors). As a small further plus, it also avoids 
allocation of lists if just a single record must be emitted.

Any thoughts on that one? If you don't rule it out for some blocking reason I 
oversaw, how should this be discussed then? As an alternative in the KIP 
document?


was (Author: gunnar.morling):
Hey [~rhauch], another alternative API crossed my mind. It's a bigger deviation 
from the current design but also is more flexible in terms of future extensions:

{code}
public interface WorkerSourceTask2 {

public void poll(SourceRecordReceiver receiver);

// other methods from WorkerSourceTask

public interface SourceRecordReceiver {
void sourceRecord(SourceRecord record);
void offset(Map sourcePartition, Map 
sourceOffset);
}
}
{code}

I.e. instead of having {{SourceRecord}} subclasses, there'd be a new 
receiver-type parameter which is invoked to emit any records. This also opens 
up the door for future other extensions, e.g. a task might enforce commit of 
offsets at a given time. Adding methods like that can be done in a compatible 
way via {{SourceRecordReceiver}} (as it's implemented by KC, not connectors). 
As a small further plus, it also avoids allocation of lists if just a single 
record must be emitted.

Any thoughts on that one? If you don't rule it out for some blocking reason I 
oversaw, how should this be discussed then? As an alternative in the KIP 
document?

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366411#comment-16366411
 ] 

Gunnar Morling commented on KAFKA-6566:
---

+1 for more exploration whether calling stop() was the original intention. 
FWIW, several comments in Debezium indicate that this at least was the 
asumption on that side when the code was written (of course that assumption may 
have been right or wrong).

Note that calling stop() would be the only chance for the task to clean up its 
resources after an exception on the KC side of the polling loop (while we could 
have our entire polling logic in a try/catch block, there's no chance to react 
to exceptions in WorkerSourceTask).

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Gunnar Morling (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-6566:
--
Affects Version/s: 1.0.0

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-6566:
-

 Summary: SourceTask#stop() not called after exception raised in 
poll()
 Key: KAFKA-6566
 URL: https://issues.apache.org/jira/browse/KAFKA-6566
 Project: Kafka
  Issue Type: Bug
Reporter: Gunnar Morling


Having discussed this with [~rhauch], it has been my assumption that 
{{SourceTask#stop()}} will be called by the Kafka Connect framework in case an 
exception has been raised in {{poll()}}. That's not the case, though. As an 
example see the connector and task below.

Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
action to take, as it'll allow the task to clean up any resources such as 
releasing any database connections, right after that failure and not only once 
the connector is stopped.

{code}
package com.example;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class TestConnector extends SourceConnector {

@Override
public String version() {
return null;
}

@Override
public void start(Map props) {
}

@Override
public Class taskClass() {
return TestTask.class;
}

@Override
public List> taskConfigs(int maxTasks) {
return Collections.singletonList(Collections.singletonMap("foo", 
"bar"));
}

@Override
public void stop() {
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

public static class TestTask extends SourceTask {

@Override
public String version() {
return null;
}

@Override
public void start(Map props) {
}

@Override
public List poll() throws InterruptedException {
throw new RuntimeException();
}

@Override
public void stop() {
System.out.println("stop() called");
}
}
}

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6551) Unbounded queues in WorkerSourceTask cause OutOfMemoryError

2018-02-09 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-6551:
-

 Summary: Unbounded queues in WorkerSourceTask cause 
OutOfMemoryError
 Key: KAFKA-6551
 URL: https://issues.apache.org/jira/browse/KAFKA-6551
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Gunnar Morling


A Debezium user reported an {{OutOfMemoryError}} to us, with over 50,000 
messages in the {{WorkerSourceTask#outstandingMessages}} map.

This map is unbounded and I can't see any way of "rate limiting" which would 
control how many records are added to it. Growth can only indirectly be limited 
by reducing the offset flush interval, but as connectors can return large 
amounts of messages in single {{poll()}} calls that's not sufficient in all 
cases. Note the user reported this issue during snapshotting a database, i.e. a 
high number of records arrived in a very short period of time.

To solve the problem I'd suggest to make this map backpressure-aware and thus 
prevent its indefinite growth, so that no further records will be polled from 
the connector until messages have been taken out of the map again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-02-01 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349409#comment-16349409
 ] 

Gunnar Morling commented on KAFKA-3821:
---

Thanks for the pointers, I'll give it a try.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6515) Add toString() method to kafka connect Field class

2018-02-01 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349389#comment-16349389
 ] 

Gunnar Morling commented on KAFKA-6515:
---

Filed a PR as shown above, but can't transition this to "Patch available" due 
to lack of permissions I reckon.

> Add toString() method to kafka connect Field class
> --
>
> Key: KAFKA-6515
> URL: https://issues.apache.org/jira/browse/KAFKA-6515
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Bartłomiej Tartanus
>Priority: Minor
>
> Currently testing is really painful:
> {code:java}
> org.apache.kafka.connect.data.Field@1d51df1f was not equal to 
> org.apache.kafka.connect.data.Field@c0d62cd8{code}
>  
> toString() method would fix this, so please add one. :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-02-01 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348515#comment-16348515
 ] 

Gunnar Morling commented on KAFKA-3821:
---

This is a feature which would be very helpful for the Debezium project. If no 
one else is working on it yet, I'd like to give it a try. Is there a 
description of the process to follow? I.e. where could I find some information 
on how I had to go about proposing a KIP for this?

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-02-01 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348345#comment-16348345
 ] 

Gunnar Morling edited comment on KAFKA-6252 at 2/1/18 10:27 AM:


I also see this issue and can confirm it's not a connector still stuck in its 
polling loop.

That being said I think interrupting {{poll()}} after a while after a task 
fails to stop seems a reasonable thing to do to me (i.e. option #1 from 
[~rhauch]'s comment above). In fact, I was surprised that it is not happening 
yet, given that {{poll()}} is declared to throw {{InterruptedException}}.

Apart from the matter of metrics it may help to prevent threads of incorrectly 
implemented connectors hang around after stopping the connector. It won't work 
in all cases but it should help in many ({{poll()}} will usually invoke and 
block on a method dealing correctly with interruption).


was (Author: gunnar.morling):
I also see this issue and can confirm it's not a connector still stuck in its 
polling loop.

That being said I think interrupting {{poll()}} after a while after a task 
fails to stop seems a reasonable thing to do to me (i.e. option 1) from 
[~rhauch] comment above). In fact, I was surprised that it is not happening 
yet, given that {{poll()}} is declared to throw {{InterruptedException}}.

Apart from the matter of metrics it may help to prevent threads of incorrectly 
implemented connectors hang around after stopping the connector. It won't work 
in all cases but it should help in many ({{poll()}} will usually invoke and 
block on a method dealing correctly with interruption).

> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Assignee: Arjun Satish
>Priority: Critical
> Fix For: 1.1.0, 1.0.1
>
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-02-01 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348345#comment-16348345
 ] 

Gunnar Morling edited comment on KAFKA-6252 at 2/1/18 10:27 AM:


I also see this issue and can confirm it's not a connector still stuck in its 
polling loop.

That being said I think interrupting {{poll()}} after a while after a task 
fails to stop seems a reasonable thing to do to me (i.e. option 1) from 
[~rhauch] comment above). In fact, I was surprised that it is not happening 
yet, given that {{poll()}} is declared to throw {{InterruptedException}}.

Apart from the matter of metrics it may help to prevent threads of incorrectly 
implemented connectors hang around after stopping the connector. It won't work 
in all cases but it should help in many ({{poll()}} will usually invoke and 
block on a method dealing correctly with interruption).


was (Author: gunnar.morling):
I also see this issue and can confirm it's not a connector still stuck in its 
polling loop.

That being said I think interrupting {{poll()}} after a while after a task not 
stopping seems a reasonable thing to do to me. In fact, I was surprised that it 
is not happening yet, given that {{poll()}} is declared to throw 
{{InterruptedException}}.

Apart from the matter of metrics it may help to prevent threads of incorrectly 
implemented connectors hang around after stopping the connector. It won't work 
in all cases but it should help in many ({{poll()}} will usually invoke and 
block on a method dealing correctly with interruption), 

> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Assignee: Arjun Satish
>Priority: Critical
> Fix For: 1.1.0, 1.0.1
>
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-02-01 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348345#comment-16348345
 ] 

Gunnar Morling commented on KAFKA-6252:
---

I also see this issue and can confirm it's not a connector still stuck in its 
polling loop.

That being said I think interrupting {{poll()}} after a while after a task not 
stopping seems a reasonable thing to do to me. In fact, I was surprised that it 
is not happening yet, given that {{poll()}} is declared to throw 
{{InterruptedException}}.

Apart from the matter of metrics it may help to prevent threads of incorrectly 
implemented connectors hang around after stopping the connector. It won't work 
in all cases but it should help in many ({{poll()}} will usually invoke and 
block on a method dealing correctly with interruption), 

> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Assignee: Arjun Satish
>Priority: Critical
> Fix For: 1.1.0, 1.0.1
>
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking

2018-01-17 Thread Gunnar Morling (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-6456:
--
Priority: Minor  (was: Major)

> Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking
> --
>
> Key: KAFKA-6456
> URL: https://issues.apache.org/jira/browse/KAFKA-6456
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Minor
>
> The docs of {{poll()}} currently say "This method should block if no data is 
> currently available". This causes the task from transitioning to PAUSED 
> state, if there's no data available for a longer period of time. I'd 
> therefore suggest to reword like this:
> {quote}
> This method should block if no data is currently available but return control 
> to Kafka Connect periodically (by returning null).
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking

2018-01-17 Thread Gunnar Morling (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-6456:
--
Summary: Improve JavaDoc of SourceTask#poll() to discourage indefinite 
blocking  (was: Improve JavaDoc of SourceTask#poll())

> Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking
> --
>
> Key: KAFKA-6456
> URL: https://issues.apache.org/jira/browse/KAFKA-6456
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> The docs of {{poll()}} currently say "This method should block if no data is 
> currently available". This causes the task from transitioning to PAUSED 
> state, if there's no data available for a longer period of time. I'd 
> therefore suggest to reword like this:
> {quote}
> This method should block if no data is currently available but return control 
> to Kafka Connect periodically (by returning null).
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll()

2018-01-17 Thread Gunnar Morling (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-6456:
--
Description: 
The docs of {{poll()}} currently say "This method should block if no data is 
currently available". This causes the task from transitioning to PAUSED state, 
if there's no data available for a longer period of time. I'd therefore suggest 
to reword like this:

{quote}
This method should block if no data is currently available but return control 
to Kafka Connect periodically (by returning null).
{quote}

> Improve JavaDoc of SourceTask#poll()
> 
>
> Key: KAFKA-6456
> URL: https://issues.apache.org/jira/browse/KAFKA-6456
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> The docs of {{poll()}} currently say "This method should block if no data is 
> currently available". This causes the task from transitioning to PAUSED 
> state, if there's no data available for a longer period of time. I'd 
> therefore suggest to reword like this:
> {quote}
> This method should block if no data is currently available but return control 
> to Kafka Connect periodically (by returning null).
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll()

2018-01-17 Thread Gunnar Morling (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-6456:
--
Summary: Improve JavaDoc of SourceTask#poll()  (was: Improve JavaDoc of 
SourceTask)

> Improve JavaDoc of SourceTask#poll()
> 
>
> Key: KAFKA-6456
> URL: https://issues.apache.org/jira/browse/KAFKA-6456
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6456) Improve JavaDoc of SourceTask

2018-01-17 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-6456:
-

 Summary: Improve JavaDoc of SourceTask
 Key: KAFKA-6456
 URL: https://issues.apache.org/jira/browse/KAFKA-6456
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Gunnar Morling






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2017-11-17 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16257642#comment-16257642
 ] 

Gunnar Morling commented on KAFKA-3821:
---

Hi [~rhauch], seeing that this one is assigned for 1.1.0. Have you created a 
KIP for it already, or plans to do so soon? I like the idea of the dedicated 
{{OffsetRecord}}.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>  Labels: needs-kip
> Fix For: 1.1.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)