Re: [DISCUSS] KIP-802: Validation Support for Kafka Connect SMT Options

2021-12-21 Thread Gunnar Morling
Hey Chris,

Thanks a lot for reviewing this KIP and your comments! Some more answers
inline.

Am Di., 7. Dez. 2021 um 23:49 Uhr schrieb Chris Egerton
:

> Hi Gunnar,
>
> Thanks for the KIP! The section on backwards compatibility is especially
> impressive and was enjoyable to read.
>

Excellent, that's great to hear!

Overall I like the direction of the KIP (and in fact just ran into a
> situation yesterday where it would be valuable). I only have one major
> thought: could we add similar validate methods for the Converter and
> HeaderConverter interfaces? With KIP-769 [1], it looks like we'll have a
> new Converter::config method, so if that makes it through, it should be a
> matter of just adding the same methods to those interfaces as well
> (although we may want to be tolerant of null ConfigDef objects being
> returned from HeaderConverter::config since the Connect framework has not
> been enforcing this requirement to date).
>

Yes, I think it's a good idea to expand the scope of the KIP to cover all
these contracts. I have updated the KIP document accordingly.

>
> That aside, a few small nits:
>
> 1. The "This page is meant as a template" section can be removed :)
> 2. The "Current Status" can be updated to "Under Discussion"
> 3. Might want to add javadocs to the newly-proposed validate method (I'm
> assuming they'll largely mirror the ones for the existing
> Connector::validate method, but we may also want to add a {@since} tag or
> some other information on which versions of Connect will leverage the
> method).
>

Done.

I will try and create a PR for this work in January next year.

All the best,

--Gunnar

[1] -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+plugins+and+retrieve+their+configuration+definitions#KIP769:ConnectAPIstolistallpluginsandretrievetheirconfigurationdefinitions-PublicInterfaces
> (section labeled "Converter interface"
>
> Cheers,
>
> Chris
>
> On Wed, Nov 24, 2021 at 11:32 AM Gunnar Morling
>  wrote:
>
> > Hey all,
> >
> > I would like to propose a KIP for Apache Kafka Connect which adds
> > validation support for SMT-related configuration options:
> >
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-802%3A+Validation+Support+for+Kafka+Connect+SMT+Options
> >
> > This feature allows users to make sure an SMT is configured correctly
> > before actually putting a connector with that SMT in place.
> >
> > Any feedback, comments, and suggestions around this proposal will
> > be greatly appreciated.
> >
> > Thanks,
> >
> > --Gunnar
> >
>


[jira] [Created] (KAFKA-13485) Restart connectors after RetriableException raised from Task::start()

2021-11-26 Thread Gunnar Morling (Jira)
Gunnar Morling created KAFKA-13485:
--

 Summary: Restart connectors after RetriableException raised from 
Task::start()
 Key: KAFKA-13485
 URL: https://issues.apache.org/jira/browse/KAFKA-13485
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Gunnar Morling


If a {{RetriableException}} is raised from {{Task::start()}}, this doesn't 
trigger an attempt to start that connector again. I.e. the restart 
functionality currently is only implemented for exceptions raised from 
{{poll()}}/{{put()}}. Triggering restarts also upon failures during {{start()}} 
would be desirable, so to circumvent temporary failure conditions like a 
network hickup which currrently require a manual restart of the affected tasks, 
if a connector for instance establishes a database connection during 
{{start()}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: Handling retriable exceptions during Connect source task start

2021-11-26 Thread Gunnar Morling
Hi all,

We encountered a similar situation in Debezium again, where an exception
during Task::start() would be desirable to be retried.

Would anything speak against implementing retriable support for
Task::start() in Kafka Connect? Would it require a KIP?

Thanks,

--Gunnar


Am Mo., 9. Aug. 2021 um 10:47 Uhr schrieb Gunnar Morling <
gunnar.morl...@googlemail.com>:

> Hi,
>
> To ask slightly differently: would there be interest in a pull request for
> implementing retries, in case RetriableException is thrown from the
> Task::start() method?
>
> Thanks,
>
> --Gunnar
>
>
> Am Do., 5. Aug. 2021 um 22:27 Uhr schrieb Sergei Morozov :
>
>> Hi,
>>
>> I'm trying to address an issue in Debezium (DBZ-3823
>> <https://issues.redhat.com/browse/DBZ-3823>) where a source connector
>> task
>> cannot recover from a retriable exception.
>>
>> The root cause is that the task interacts with the source database during
>> SourceTask#start but Kafka Connect doesn't handle retriable exceptions
>> thrown at this stage as retriable. KIP-298
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
>> >
>> that
>> originally introduced handling of retriable exception doesn't describe
>> handling task start exceptions, so it's unclear to me whether those aren't
>> allowed by design or it was just out of the scope of the KIP.
>>
>> My current working solution
>> <https://github.com/debezium/debezium/pull/2572> relies
>> on the internal Debezium implementation of the task restart which
>> introduces certain risks (the details are in the PR description).
>>
>> The question is: are retriable exceptions during start disallowed by
>> design, and the task must not throw retriable exceptions during start, or
>> it's just currently not supported by the Connect framework and I just need
>> to implement proper error handling in the connector?
>>
>> Thanks!
>>
>> --
>> Sergei Morozov
>>
>


[DISCUSS] KIP-802: Validation Support for Kafka Connect SMT Options

2021-11-24 Thread Gunnar Morling
Hey all,

I would like to propose a KIP for Apache Kafka Connect which adds
validation support for SMT-related configuration options:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-802%3A+Validation+Support+for+Kafka+Connect+SMT+Options

This feature allows users to make sure an SMT is configured correctly
before actually putting a connector with that SMT in place.

Any feedback, comments, and suggestions around this proposal will
be greatly appreciated.

Thanks,

--Gunnar


[jira] [Created] (KAFKA-13478) KIP-802: Validation Support for Kafka Connect SMT Options

2021-11-24 Thread Gunnar Morling (Jira)
Gunnar Morling created KAFKA-13478:
--

 Summary: KIP-802: Validation Support for Kafka Connect SMT Options
 Key: KAFKA-13478
 URL: https://issues.apache.org/jira/browse/KAFKA-13478
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Gunnar Morling


Implement [KIP-802|KIP-802: Validation Support for Kafka Connect SMT Options], 
adding validation support for SMT options.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: Do we want to add more SMTs to Apache Kafka?

2021-11-19 Thread Gunnar Morling
Hi all,

Just came across this thread, I hope the late reply is ok.

FWIW, we're in a similar situation in Debezium, where users often request
new (Debezium-specific) SMTs, and we generally tend to recommend them to be
maintained by users themselves, unless they are truly generic. This
excludes a share of users though who aren't Java developers.

What might help is having means of simple discoverability of externally
hosted SMTs, e.g. via some kind of catalog hosted on kafka.apache.org. That
way, people would have it easier to find and obtain SMTs from other places,
reducing the pressure to get them added to Apache Kafka proper.

Best,

--Gunnar




Am So., 7. Nov. 2021 um 21:49 Uhr schrieb Brandon Brown <
bran...@bbrownsound.com>:

> I like the idea of a select number of SMTs being offered and supported out
> of the box. The addition of SMTs via this process is nice because it allows
> for a rich set to be supported out of the box and without the need for
> extra work to deploy.
>
> Perhaps this is a spot where the community could express the interest of
> additional SMTs which maybe are available via an open source library and if
> enough usage occurs there could be a path to fold into the Kafka project at
> large?
>
> Brandon Brown
>
>
> > On Nov 7, 2021, at 1:19 PM, Randall Hauch  wrote:
> >
> > We have had several requests to add more Connect Single Message
> > Transforms (SMTs) to the project. When SMTs were first introduced with
> > KIP-66 (ref 1) in Jun 2017, the KIP mentioned the following:
> >
> >> Criteria: SMTs that are shipped with Kafka Connect should be general
> enough to apply to many data sources & serialization formats. They should
> also be simple enough to not cause any additional library dependency to be
> introduced.
> >> Beyond those being initially included with this KIP, transformations
> can be adopted for inclusion in future with JIRA/ML discussion to weigh the
> tradeoffs.
> >
> > In the 4+ years that we've had SMTs in the project, we've only
> > enhanced the framework with KIP-585 (ref 2), and fixed the initial
> > SMTs (including KIP-437, ref 3). We recently have had quite a few
> > requests to add new SMTs; a few samples of these include:
> > * https://issues.apache.org/jira/browse/KAFKA-10299
> > * https://issues.apache.org/jira/browse/KAFKA-9436
> > * https://issues.apache.org/jira/browse/KAFKA-9318
> > * https://issues.apache.org/jira/browse/KAFKA-12443
> >
> > Adding new or changing existing SMTs to the Apache Kafka project come
> > with requirements. First, AK releases are infrequent and necessarily
> > involve the entire project. Second, adding an SMT is an API change and
> > therefore requires a KIP. Third, all changes in behavior to SMTs
> > included in an prior AK release must be backward compatible, and
> > adding or changing an SMT's configuration requires a KIP. This last
> > one is also challenging if we're limiting ourselves to truly general
> > SMTs, since these are notoriously difficult to get right the first
> > time. All of these aspects mean that it's difficult to add, maintain,
> > and evolve/improve SMTs in AK. And unless a bug fix is critical, we're
> > likely not to create a patch release for AK just to fix a bug in an
> > SMT, simply because of the effort involved.
> >
> > On the other hand, anyone can easily implement their own SMT and
> > deploy them as a Connect plugin, whether that's part of a connector
> > plugin or a separate plugin dedicated for one or SMTs. Interestingly,
> > it's far simpler to implement and maintain custom SMTs outside of AK,
> > especially since those plugins can be released and deployed in any
> > Connect runtime version since at least 0.11.0. And if custom SMTs are
> > maintained in a relatively small project, they can be released often.
> >
> > Finally, KIP-26 (ref 4) specifically rejected maintaining connector
> > implementations in the AK project. So we have precedence for choosing
> > not to accept implementations.
> >
> > Given the above, I wonder if the time has come for us to prefer only
> > maintaining the SMT framework and existing SMTs, and to decline adding
> > new SMTs.
> >
> > Thoughts?
> >
> > Best regards,
> >
> > Randall Hauch
> >
> > (1)
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect
> > (2)
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-585%3A+Filter+and+Conditional+SMTs
> > (3)
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-437%3A+Custom+replacement+for+MaskField+SMT
> > (4)
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767
>


Re: Requesting permissions to contribute to Apache Kafka

2021-11-17 Thread Gunnar Morling
Excellent, thank you so much for the quick help, David!

--Gunnar


Am Mi., 17. Nov. 2021 um 10:42 Uhr schrieb David Jacot
:

> Hi Gunnar,
>
> I have granted you the requested permissions.
>
> I am looking forward to your contributions.
>
> Best,
> David
>
> On Wed, Nov 17, 2021 at 10:32 AM Gunnar Morling
>  wrote:
> >
> > Hi,
> >
> > As per the instructions given in [1], I would like to request the
> > permissions for creating a KIP. My ids are:
> >
> > * Wiki: gunnarmorling
> > * Jira: gunnar.morling
> >
> > Thanks a lot,
> >
> > --Gunnar
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>


Requesting permissions to contribute to Apache Kafka

2021-11-17 Thread Gunnar Morling
Hi,

As per the instructions given in [1], I would like to request the
permissions for creating a KIP. My ids are:

* Wiki: gunnarmorling
* Jira: gunnar.morling

Thanks a lot,

--Gunnar

[1]
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals


Re: KIP-769: Connect API to retrieve connector configuration definitions

2021-11-16 Thread Gunnar Morling
Hi,

I'm +1 for adding a GET endpoint for obtaining config definitions. It
always felt odd to me that one has to issue a PUT for that purpose. If
nothing else, it'd be better in terms of discoverability of the KC REST API.

One additional feature request I'd have is to expose the valid enum
constants for enum-typed options. That'll help to display the values in a
drop-down or via radio buttons in a UI, give us tab completion in kcctl,
etc.

Best,

--Gunnar


Am Di., 16. Nov. 2021 um 16:31 Uhr schrieb Chris Egerton
:

> Hi Viktor,
>
> It sounds like there are three major points here in favor of a new GET
> endpoint for connector config defs.
>
> 1. You cannot issue a blank ("dummy") request for sink connectors because a
> topic list/topic regex has to be supplied (otherwise the PUT endpoint
> returns a 500 response)
> 2. A dummy request still triggers custom validations by the connector,
> which may be best to avoid if we know for sure that the config isn't worth
> validating yet
> 3. It's more ergonomic and intuitive to be able to issue a GET request
> without having to give a dummy connector config
>
> With regards to 1, this is actually a bug in Connect (
> https://issues.apache.org/jira/browse/KAFKA-13327) with a fix already
> implemented and awaiting committer review (
> https://github.com/apache/kafka/pull/11369). I think it'd be better to
> focus on fixing this bug in general instead of implementing a new REST
> endpoint in order to allow people to work around it.
>
> With regards to 2, this is technically possible but I'm unsure it'd be too
> common out in the wild given that most validations that could be expensive
> would involve things like connecting to a database, checking if a cloud
> storage bucket exists, etc., none of which are possible without some
> configuration properties from the user (db hostname, bucket name, etc.).
>
> With regards to 3, I do agree that it'd be easier for people designing UIs
> to have a GET API to work against. I'm just not sure it's worth the
> additional implementation, testing, and maintenance burden. If it were
> possible to issue a PUT request without unexpected 500s for invalid
> configs, would that suffice? AFAICT it'd basically be as simple as issuing
> a PUT request with a dummy body consisting of nothing except the connector
> class (which at this point we might even make unnecessary and just
> automatically replace with the connector class from the URL) and then
> filtering the response to just grab the "definition" field of each element
> in the "configs" array in the response.
>
> Cheers,
>
> Chris
>
> On Tue, Nov 16, 2021 at 9:52 AM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com>
> wrote:
>
> > Hi Folks,
> >
> > I too think this would be a very useful feature. Some of our management
> > applications would provide a wizard for creating connectors. In this
> > scenario the user basically would fill out a sample configuration
> generated
> > by the UI which would send it back to Connect for validation and
> eventually
> > create a new connector. The first part of this workflow can be enhanced
> if
> > we had an API that can return the configuration definition of the given
> > type of connector as the UI application would be able to generate a
> sample
> > for the user based on that (nicely drawn diagram:
> > https://imgur.com/a/7S1Xwm5).
> > The connector-plugins/{connectorType}/config/validate API essentially
> works
> > and returns the data that we need, however it is a HTTP PUT API that is a
> > bit unintuitive for a fetch-like functionality and also functionally
> > different as it validates the given (dummy) request. In case of sink
> > connectors one would need to also provide a topic name.
> >
> > A suggestion for the KIP: I think it can be useful to return the config
> > groups and the connector class' name similarly to the validate API just
> in
> > case any frontend needs them (and also the response would be more like
> the
> > validate API but simpler).
> >
> > Viktor
> >
> > On Fri, Aug 20, 2021 at 4:51 PM Ryanne Dolan 
> > wrote:
> >
> > > I think it'd be worth adding a GET version, fwiw. Could be the same
> > handler
> > > with just a different spelling maybe.
> > >
> > > On Fri, Aug 20, 2021, 7:44 AM Mickael Maison  >
> > > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > You're right, you can achieve the same functionality using the
> > > > existing validate endpoint.
> > > > In my mind it was only for validation once you have build a
> > > > configuration but when used with an empty configuration, it basically
> > > > serves the same purpose as the proposed new endpoint.
> > > >
> > > > I think it's a bit easier to use a GET endpoint but I don't think it
> > > > really warrants a different endpoint.
> > > >
> > > > Thanks
> > > >
> > > > On Thu, Aug 19, 2021 at 2:56 PM Chris Egerton
> > > >  wrote:
> > > > >
> > > > > Hi Mickael,
> > > > >
> > > > > I'm wondering about the use case here. The motivation section
> states
> > > that
> > > > 

Re: Handling retriable exceptions during Connect source task start

2021-08-09 Thread Gunnar Morling
Hi,

To ask slightly differently: would there be interest in a pull request for
implementing retries, in case RetriableException is thrown from the
Task::start() method?

Thanks,

--Gunnar


Am Do., 5. Aug. 2021 um 22:27 Uhr schrieb Sergei Morozov :

> Hi,
>
> I'm trying to address an issue in Debezium (DBZ-3823
> ) where a source connector task
> cannot recover from a retriable exception.
>
> The root cause is that the task interacts with the source database during
> SourceTask#start but Kafka Connect doesn't handle retriable exceptions
> thrown at this stage as retriable. KIP-298
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> >
> that
> originally introduced handling of retriable exception doesn't describe
> handling task start exceptions, so it's unclear to me whether those aren't
> allowed by design or it was just out of the scope of the KIP.
>
> My current working solution
>  relies
> on the internal Debezium implementation of the task restart which
> introduces certain risks (the details are in the PR description).
>
> The question is: are retriable exceptions during start disallowed by
> design, and the task must not throw retriable exceptions during start, or
> it's just currently not supported by the Connect framework and I just need
> to implement proper error handling in the connector?
>
> Thanks!
>
> --
> Sergei Morozov
>


Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-27 Thread Gunnar Morling
Chris,

One follow-up question after thinking some more about this; is there any
limit in terms of duration or size of in-flight, connector-controlled
transactions? In case of Debezium for instance, there may be cases where we
tail the TX log from an upstream source database, not knowing whether the
events we receive belong to a committed or aborted transaction. Would it be
valid to emit all these events via a transactional task, and in case we
receive a ROLLBACK event eventually, to abort the pending Kafka
transaction? Such source transactions could be running for a long time
potentially, e.g. hours or days (at least in theory). Or would this sort of
usage not be considered a reasonable one?

Thanks,

--Gunnar


Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling <
gunnar.morl...@googlemail.com>:

> Chris, all,
>
> I've just read KIP-618, and let me congratulate you first of all for this
> impressive piece of work! Here's a few small suggestions and questions I
> had while reading:
>
> * TransactionContext: What's the use case for the methods accepting a
> source record (commitTransaction(SourceRecord
> record), abortTransaction(SourceRecord record))?
> * SourceTaskContext: Typo in "when the sink connector is deployed" ->
> source task
> * SourceTaskContext: Instead of guarding against NSME, is there a way for
> a connector to query the KC version and thus derive its capabilities? Going
> forward, a generic API for querying capabilities could be nice, so a
> connector can query for capabilities of the runtime in a safe and
> compatible way.
> * SourceConnector: exactlyOnceSupport() -> false return value doesn't match
> * SourceConnector: Would it make sense to merge the two methods perhaps
> and return one enum of { SUPPORTED, NOT_SUPPORTED,
> SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum
> from canDefineTransactionBoundaries(), too; even if it only has two values
> now, that'd allow for extension in the future
>
> And one general question: in Debezium, we have some connectors that
> produce records "out-of-bands" to a schema history topic via their own
> custom producer. Is there any way envisionable where such a producer would
> participate in the transaction managed by the KC runtime environment?
>
> Thanks a lot,
>
> --Gunnar
>
>
> Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton
> :
>
>> Hi all,
>>
>> Wanted to note here that I've updated the KIP document to include the
>> changes discussed recently. They're mostly located in the "Public
>> Interfaces" section. I suspect discussion hasn't concluded yet and there
>> will probably be a few more changes to come, but wanted to take the
>> opportunity to provide a snapshot of what the current design looks like.
>>
>> Cheers,
>>
>> Chris
>>
>> On Fri, May 21, 2021 at 4:32 PM Chris Egerton 
>> wrote:
>>
>> > Hi Tom,
>> >
>> > Wow, I was way off base! I was thinking that the intent of the fencible
>> > producer was to employ it by default with 3.0, as opposed to only after
>> the
>> > worker-level
>> > "exactly.once.source.enabled" property was flipped on. You are correct
>> > that with the case you were actually describing, there would be no
>> > heightened ACL requirements, and that it would leave room in the future
>> for
>> > exactly-once to be disabled on a per-connector basis (as long as all the
>> > workers in the cluster already had "exactly.once.source.enabled" set to
>> > "true") with no worries about breaking changes.
>> >
>> > I agree that this is something for another KIP; even if we could squeeze
>> > it in in time for this release, it might be a bit much for new users to
>> > take in all at once. But I can add it to the doc as "future work" since
>> > it's a promising idea that could prove valuable to someone who might
>> need
>> > per-connector granularity in the future.
>> >
>> > Thanks for clearing things up; in retrospect your comments make a lot
>> more
>> > sense now, and I hope I've sufficiently addressed them by now.
>> >
>> > PSA for you and everyone else--I plan on updating the doc next week with
>> > the new APIs for connector-defined transaction boundaries,
>> > user-configurable transaction boundaries (i.e., poll vs. interval vs.
>> > connectors), and preflight checks for exactly-once validation (required
>> vs.
>> > requested).
>> >
>> > Cheers,
>> >
>> > Chris
>> >
>> > On Fri, Ma

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-27 Thread Gunnar Morling
Chris, all,

I've just read KIP-618, and let me congratulate you first of all for this
impressive piece of work! Here's a few small suggestions and questions I
had while reading:

* TransactionContext: What's the use case for the methods accepting a
source record (commitTransaction(SourceRecord
record), abortTransaction(SourceRecord record))?
* SourceTaskContext: Typo in "when the sink connector is deployed" ->
source task
* SourceTaskContext: Instead of guarding against NSME, is there a way for a
connector to query the KC version and thus derive its capabilities? Going
forward, a generic API for querying capabilities could be nice, so a
connector can query for capabilities of the runtime in a safe and
compatible way.
* SourceConnector: exactlyOnceSupport() -> false return value doesn't match
* SourceConnector: Would it make sense to merge the two methods perhaps and
return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES }?
Or, alternatively return an enum from canDefineTransactionBoundaries(),
too; even if it only has two values now, that'd allow for extension in the
future

And one general question: in Debezium, we have some connectors that produce
records "out-of-bands" to a schema history topic via their own custom
producer. Is there any way envisionable where such a producer would
participate in the transaction managed by the KC runtime environment?

Thanks a lot,

--Gunnar


Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton
:

> Hi all,
>
> Wanted to note here that I've updated the KIP document to include the
> changes discussed recently. They're mostly located in the "Public
> Interfaces" section. I suspect discussion hasn't concluded yet and there
> will probably be a few more changes to come, but wanted to take the
> opportunity to provide a snapshot of what the current design looks like.
>
> Cheers,
>
> Chris
>
> On Fri, May 21, 2021 at 4:32 PM Chris Egerton  wrote:
>
> > Hi Tom,
> >
> > Wow, I was way off base! I was thinking that the intent of the fencible
> > producer was to employ it by default with 3.0, as opposed to only after
> the
> > worker-level
> > "exactly.once.source.enabled" property was flipped on. You are correct
> > that with the case you were actually describing, there would be no
> > heightened ACL requirements, and that it would leave room in the future
> for
> > exactly-once to be disabled on a per-connector basis (as long as all the
> > workers in the cluster already had "exactly.once.source.enabled" set to
> > "true") with no worries about breaking changes.
> >
> > I agree that this is something for another KIP; even if we could squeeze
> > it in in time for this release, it might be a bit much for new users to
> > take in all at once. But I can add it to the doc as "future work" since
> > it's a promising idea that could prove valuable to someone who might need
> > per-connector granularity in the future.
> >
> > Thanks for clearing things up; in retrospect your comments make a lot
> more
> > sense now, and I hope I've sufficiently addressed them by now.
> >
> > PSA for you and everyone else--I plan on updating the doc next week with
> > the new APIs for connector-defined transaction boundaries,
> > user-configurable transaction boundaries (i.e., poll vs. interval vs.
> > connectors), and preflight checks for exactly-once validation (required
> vs.
> > requested).
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley  wrote:
> >
> >> Hi Chris,
> >>
> >> Thanks for continuing to entertain some of these ideas.
> >>
> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton
>  >> >
> >> wrote:
> >>
> >> > [...]
> >> >
> >> That's true, but we do go from three static ACLs (write/describe on a
> >> fixed
> >> > transactional ID, and idempotent write on a fixed cluster) to a
> dynamic
> >> > collection of ACLs.
> >> >
> >>
> >> I'm not quite sure I follow, maybe I've lost track. To be clear, I was
> >> suggesting the use of a 'fencing producer' only in clusters with
> >> exactly.once.source.enabled=true where I imagined the key difference
> >> between the exactly once and fencing cases was how the producer was
> >> configured/used (transactional vs this new fencing semantic). I think
> the
> >> ACL requirements for connector producer principals would therefore be
> the
> >> same as currently described in the KIP. The same is true for the worker
> >> principals (which is the only breaking change you give in the KIP). So I
> >> don't think the fencing idea changes the backwards compatibility story
> >> that's already in the KIP, just allows a safe per-connector
> >> exactly.once=disabled option to be supported (with required as requested
> >> as
> >> we already discussed).
> >>
> >> But I'm wondering whether I've overlooked something.
> >>
> >> Ultimately I think it may behoove us to err on the side of reducing the
> >> > breaking changes here for now and saving them for 4.0 (or some later
> >> major
> >> > release), but would be 

[jira] [Created] (KAFKA-12806) KRaft: Confusing leadership status exposed for controller without quorum

2021-05-18 Thread Gunnar Morling (Jira)
Gunnar Morling created KAFKA-12806:
--

 Summary: KRaft: Confusing leadership status exposed for controller 
without quorum
 Key: KAFKA-12806
 URL: https://issues.apache.org/jira/browse/KAFKA-12806
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.0
Reporter: Gunnar Morling






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Testing KRaft mode

2021-05-17 Thread Gunnar Morling
Hey Chris,

Thanks a lot for your reply. I've tried with "connect.protocol", but it
didn't make any difference. I've thus logged
https://issues.apache.org/jira/browse/KAFKA-12801 and will also try to
attach a profiling log as requested by Lucas.

Best,

--Gunnar


Am Mo., 17. Mai 2021 um 14:32 Uhr schrieb Chris Egerton
:

> Hi Gunnar,
>
> As far as the Connect behavior goes, sounds like
> https://issues.apache.org/jira/browse/KAFKA-12252. To verify, you can
> either set the 'connect.protocol' property of your Connect cluster to
> 'compatible' (easy mode) or cherry-pick the recently-merged fix for it (
> https://github.com/apache/kafka/pull/10014) onto the version of Connect
> you're running, rebuild, and try again. Either step should prevent the
> symptoms you describe, and if not, a new Jira ticket is likely warranted.
>
> Cheers,
>
> Chris
>
> On Mon, May 17, 2021 at 1:50 AM Gunnar Morling
>  wrote:
>
> > Hi,
> >
> > I was testing the early access preview of the new ZooKeeper-less mode and
> > noticed two things I wanted to bring up here. My testing scenario was a
> > cluster of three Kafka nodes in combined mode and a single Kafka Connect
> > node, all running via Docker Compose.
> >
> > * I stopped two of the Kafka nodes; the remaining node then was marked as
> > "leader" as per its metrics and in the new metadata shell; is this
> > expected? As one out of three nodes doesn't have a quorum, I'd rather
> have
> > expected to see that there isn't a leader in this situation. Indeed
> trying
> > to create a topic in this situation times out, so I reckon that the
> > remaining node doesn't actually act as the quorum leader.
> > * After restarting the two nodes, in most cases they, as well as the
> > Connect node, each would use up 100% CPU, like threads stuck in a
> spinning
> > loop. Is this issue already known, or should I log one in Jira?
> >
> > Thanks a lot for your help (and this great feature in general, really
> > exciting to see this),
> >
> > --Gunnar
> >
>


[jira] [Created] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss

2021-05-17 Thread Gunnar Morling (Jira)
Gunnar Morling created KAFKA-12801:
--

 Summary: High CPU load after restarting brokers subsequent to 
quorum loss
 Key: KAFKA-12801
 URL: https://issues.apache.org/jira/browse/KAFKA-12801
 Project: Kafka
  Issue Type: Bug
  Components: core, KafkaConnect
Affects Versions: 2.8.0
Reporter: Gunnar Morling


I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of three 
Kafka nodes (all combined nodes), and one Kafka Connect node. After starting 
all components, I first stop the current controller of the Kafka cluster, then 
I stop the then controller of the Kafka cluster. At this point, only one Kafka 
node out of the original three and Connect is running.

When now restarting the two stopped Kafka nodes, CPU load on the Connect node 
and the two broker nodes goes up to 100% and remains at that level for an 
indefinite amount of time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Testing KRaft mode

2021-05-16 Thread Gunnar Morling
Hi,

I was testing the early access preview of the new ZooKeeper-less mode and
noticed two things I wanted to bring up here. My testing scenario was a
cluster of three Kafka nodes in combined mode and a single Kafka Connect
node, all running via Docker Compose.

* I stopped two of the Kafka nodes; the remaining node then was marked as
"leader" as per its metrics and in the new metadata shell; is this
expected? As one out of three nodes doesn't have a quorum, I'd rather have
expected to see that there isn't a leader in this situation. Indeed trying
to create a topic in this situation times out, so I reckon that the
remaining node doesn't actually act as the quorum leader.
* After restarting the two nodes, in most cases they, as well as the
Connect node, each would use up 100% CPU, like threads stuck in a spinning
loop. Is this issue already known, or should I log one in Jira?

Thanks a lot for your help (and this great feature in general, really
exciting to see this),

--Gunnar


[Connect] Different validation requirements for connector creation and update

2021-01-21 Thread Gunnar Morling
Hi,

In the Debezium community, we ran into an interesting corner case of
connector config validation [1].

The Debezium Postgres connector requires a database resource called a
"replication slot", which identifies this connector to the database and
tracks progress it has made reading the TX log. This replication slot must
not be shared between multiple clients (Debezium connectors, or others), so
we added a validation to make sure that the slot configured by the user
isn't active, i.e. no client is connected to it already. This works as
expected when setting up, or restarting a connector, but when trying to
update the connector configuration, the connector still is running when the
configuration is validated, so the slot is active and validation hence
fails.

Is there a way we can distinguish during config validation whether the
connector is (re-)started or whether it's a validation upon
re-configuration (allowing us to skip this particular validation in the
re-configuration case)?

If that's not the case, would there be interest for a KIP for adding such
capability to the Kafka Connect API?

Thanks for any feedback,

--Gunnar

[1] https://issues.redhat.com/browse/DBZ-2952


Re: [VOTE] KIP-665 Kafka Connect Hash SMT

2020-10-22 Thread Gunnar Morling
Hey Brandon,

I think that's an interesting idea, we got something as a built-in
connector feature in Debezium, too [1]. Two questions:

* Can "field" select nested fields, e.g. "after.email"?
* Did you consider an option for specifying salt for the hash functions?

--Gunnar

[1]
https://debezium.io/documentation/reference/connectors/mysql.html#mysql-property-column-mask-hash



Am Do., 22. Okt. 2020 um 12:53 Uhr schrieb Brandon Brown <
bran...@bbrownsound.com>:

> Gonna give this another little bump. :)
>
> Brandon Brown
>
> > On Oct 15, 2020, at 12:51 PM, Brandon Brown 
> wrote:
> >
> > 
> > As I mentioned in the KIP, this transformer is slightly different from
> the current MaskField SMT.
> >
> >> Currently there exists a MaskField SMT but that would completely remove
> the value by setting it to an equivalent null value. One problem with this
> would be that you’d not be able to know in the case of say a password going
> through the mask transform it would become "" which could mean that no
> password was present in the message, or it was removed. However this hash
> transformer would remove this ambiguity if that makes sense. The proposed
> hash functions would be MD5, SHA1, SHA256. which are all supported via
> MessageDigest.
> >
> > Given this take on things do you still think there would be value in
> this smt?
> >
> >
> > Brandon Brown
> >> On Oct 15, 2020, at 12:36 PM, Ning Zhang 
> wrote:
> >>
> >> Hello, I think this SMT feature is parallel to
> https://docs.confluent.io/current/connect/transforms/index.html
> >>
>  On 2020/10/15 15:24:51, Brandon Brown 
> wrote:
> >>> Bumping this thread.
> >>> Please take a look at the KIP and vote or let me know if you have any
> feedback.
> >>>
> >>> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-665%3A+Kafka+Connect+Hash+SMT
> >>>
> >>> Proposed: https://github.com/apache/kafka/pull/9057
> >>>
> >>> Thanks
> >>>
> >>> Brandon Brown
> >>>
> > On Oct 8, 2020, at 10:30 PM, Brandon Brown 
> wrote:
> 
>  Just wanted to give another bump on this and see if anyone had any
> comments.
> 
>  Thanks!
> 
>  Brandon Brown
> 
> > On Oct 1, 2020, at 9:11 AM, "bran...@bbrownsound.com" <
> bran...@bbrownsound.com> wrote:
> >
> > Hey Kafka Developers,
> >
> > I’ve created the following KIP and updated it based on feedback from
> Mickael. I was wondering if we could get a vote on my proposal and move
> forward with the proposed pr.
> >
> > Thanks so much!
> > -Brandon
> >>>
>


Re: [VOTE] KIP 585: Filter and conditional SMTs

2020-05-19 Thread Gunnar Morling
+1 (non-binding)

Thanks for working on this, Tom! This KIP will be very useful for
connectors like Debezium.

--Gunnar

Am Fr., 15. Mai 2020 um 20:02 Uhr schrieb Konstantine Karantasis
:
>
> +1 (binding)
>
> Thanks Tom.
>
> Konstantine
>
> On Fri, May 15, 2020 at 5:03 AM Andrew Schofield 
> wrote:
>
> > +1 (non-binding)
> >
> > Thanks for the KIP. This will be very useful.
> >
> > Andrew Schofield
> >
> > On 13/05/2020, 10:14, "Tom Bentley"  wrote:
> >
> > Hi,
> >
> > I'd like to start a vote on KIP-585: Filter and conditional SMTs
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-585%3A+Filter+and+Conditional+SMTs
> >
> > Those involved in the discussion seem to be positively disposed to the
> > idea, but in the absence of any committer participation it's been
> > difficult
> > to find a consensus on how these things should be configured. What's
> > presented here seemed to be the option which people preferred overall.
> >
> > Kind regards,
> >
> > Tom
> >
> >


Re: [DISCUSS] KIP-585: Conditional SMT

2020-04-03 Thread Gunnar Morling
Hi all,

Thanks a lot for this initiative, Tom!

To shed some light, the use case where this first came up, were issues
we saw with SMTs being applied to the different topics produced by the
Debezium change data capture connectors. There are different kinds of
topics (for change data, schema history, heartbeats etc.) and the
record structure to expect may vary between those. Hence we saw issues
with SMTs like ExtractField, which for instance only should be applied
to all change data topics but not the other ones.

I like the overall approach; for Debezium's purposes, the simple topic
matching and negation operators would be sufficient already. I agree
with Chris and would prefer one single condition attribute, which
contains a single condition or potentially a logical expression with
not, and, etc. I think it's less ambiguous, in particular when it
comes to ordering of the different conditions and determining their
precedence.

Would love to see this feature in one or another way in Connect.

Best,

--Gunnar



Am Do., 2. Apr. 2020 um 18:48 Uhr schrieb Tom Bentley :
>
> Hi Chris and Sönke,
>
> Using the numbering from Chris's email...
>
> 1. That's a good point, I'll see what is needed to make that work.
>
> 2. I'm happy enough to add support for "and" and "or" as part of this KIP
> if people can see a need for it.
>
> In a similar vein, I was wondering about whether it would be worthwhile
> having the equivalent of an "else" clause (what's in the KIP is basically
> an "if" statement). Without support for "else" I think people would often
> need two conditionals, with the condition of one being the negation of the
> condition of another.
>
> 3. I can see the attraction of an expression language. The pros include
> being terse and familiar to programmers and potentially very flexible if
> that's needed in the future. I had a play and implemented it using ANTLR
> and it's not difficult to write a grammar and implement the functions we've
> already discussed and get decent error messages when the expression is
> malformed. So on the one hand I quite like the idea. On the other hand it
> feels like overkill for the use cases that have actually been identified so
> far.
>
> @Sönke what do you make of the expression language idea?
>
> Kind regards,
>
> Tom
>
> On Wed, Apr 1, 2020 at 9:49 PM Christopher Egerton 
> wrote:
>
> > Hi Tom,
> >
> > This looks great and I'd love to see the out-of-the-box SMTs become even
> > more powerful with the improvements you've proposed! Got a few remarks and
> > would be interested in your thoughts:
> >
> > 1. Instead of the new "ConfigDef config(Map props)" method,
> > what would you think about adopting a similar approach as the framework
> > uses with connectors, by adding a "Config validate(Map
> > props)" method that can perform custom validation outside of what can be
> > performed by the ConfigDef's single-property-at-a-time validation? It may
> > be a little heavyweight for use with this particular SMT, but it'd provide
> > more flexibility for other SMT implementations and would mirror an API that
> > developers targeting the framework are likely already familiar with.
> > 2. The possibility for adding the logical operators "and" and "or" is
> > mentioned, but only as a potential future change and not one proposed by
> > this KIP. Why not include those operators sooner rather than later?
> > 3. The syntax for named conditions that are then referenced in logical
> > operators is... tricky. It took me a few attempts to grok the example
> > provided in the KIP after reading Sönke's question about the example for
> > negation. What about a more sophisticated but less verbose syntax that
> > supports a single configuration for the condition, even with logical
> > operators? I'm thinking something like
> > "transforms.conditionalExtract.condition: not(has-header:my-header)"
> > instead of the "transforms.conditionalExtract.condition: not:hasMyHeader"
> > and "transforms.conditionalExtract.condition.hasMyHeader:
> > has-header:my-header" properties. If support for a logical "and" is added,
> > this could then be expanded to something like
> > "transforms.conditionalExtract.condition: and(has-header(my-header),
> > not(topic-matches(my-prefix-.*)))". There would be additional complexity
> > here with the need to escape parentheses and commas that are intended to be
> > treated literally (as part of a header name, for example) instead of as
> > part of the syntax for the condition itself, but a little additional
> > complexity for edge cases like that may be warranted if it heavily reduces
> > complexity for the common cases. The rationale for the proposed
> > parentheses-based syntax here instead of what's mentioned in the KIP
> > (something like "and: , ") is to help with
> > readability; we probably wouldn't need that with the approach of naming
> > conditions via separate properties, but things may get a little nasty with
> > literal conditions included there, especially 

[jira] [Created] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-06-11 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-8523:
-

 Summary: InsertField transformation fails when encountering 
tombstone event
 Key: KAFKA-8523
 URL: https://issues.apache.org/jira/browse/KAFKA-8523
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Gunnar Morling


When applying the {{InsertField}} transformation to a tombstone event, an 
exception is raised:

{code}
org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
absence of schema for [field insertion], found: null
at 
org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
at 
org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
at 
org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
at 
org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
{code}

AFAICS, the transform can still be made working in in this case by simply 
building up a new value map from scratch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8476) Kafka 2.2.1 distribution contains JAX-RS API twice

2019-06-03 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-8476:
-

 Summary: Kafka 2.2.1 distribution contains JAX-RS API twice
 Key: KAFKA-8476
 URL: https://issues.apache.org/jira/browse/KAFKA-8476
 Project: Kafka
  Issue Type: Bug
Reporter: Gunnar Morling


In kafka_2.12-2.2.1.tgz there is both javax.ws.rs-api-2.1.jar and 
javax.ws.rs-api-2.1.1.jar. I reckon only one should be there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7336) Kafka Connect source task when producing record with invalid topic name

2018-08-24 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-7336:
-

 Summary: Kafka Connect source task when producing record with 
invalid topic name
 Key: KAFKA-7336
 URL: https://issues.apache.org/jira/browse/KAFKA-7336
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Gunnar Morling


If a Kafka Connect source task returns a {{SourceRecord}} with an invalid topic 
name (e.g. "dbserver1.inventory.test@data"), that source task hangs (presumably 
indefinitely?) and doesn't continue it's polling loop. The log is flooded with 
this message:

{code}
connect_1| 2018-08-24 08:47:29,014 WARN   ||  [Producer 
clientId=producer-4] Error while fetching metadata with correlation id 833 : 
{dbserver1.inventory.test@data=INVALID_TOPIC_EXCEPTION}   
[org.apache.kafka.clients.NetworkClient]
{code}

The producer thread is stuck in the loop here:

{code}
KafkaProducer.waitOnMetadata(String, Integer, long) line: 938  
KafkaProducer.doSend(ProducerRecord, Callback) line: 823  
KafkaProducer.send(ProducerRecord, Callback) line: 803
WorkerSourceTask.sendRecords() line: 318
WorkerSourceTask.execute() line: 228
WorkerSourceTask(WorkerTask).doRun() line: 175  
WorkerSourceTask(WorkerTask).run() line: 219
Executors$RunnableAdapter.call() line: 511   
FutureTask.run() line: 266   
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149  
ThreadPoolExecutor$Worker.run() line: 624   
Thread.run() line: 748  
{code}

This causes the task to remain in RUNNING state, but no further invocations of 
{{poll()}} are done.

Of course we'll work around this and make sure to not produce records with 
invalid topic names, but I think the source task should transition to FAILED 
state in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-7058:
-

 Summary: ConnectSchema#equals() broken for array-typed default 
values
 Key: KAFKA-7058
 URL: https://issues.apache.org/jira/browse/KAFKA-7058
 Project: Kafka
  Issue Type: Bug
Reporter: Gunnar Morling


{ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default 
values, but this doesn't work correctly if the default values in fact are 
arrays. In this case, always {false} will be returned, also if the default 
value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Exposing additional metadata in Kafka Connect schema parameters

2018-03-06 Thread Gunnar Morling
Hi,

A user of the Debezium CDC Kafka Connect connectors has asked whether we
could provide information about the original source type of captured table
columns.

Usually the type info we provide by using the Kafka Connect types and some
custom semantic types is good enough. But there are some cases where
additional type info would help: e.g. in case of MySQL, MEDIUMINT and INT
columns are transmitted as Connect Int32 (as that's the smallest type which
covers their value range). But from that, a consumer can't tell wether an
INT or MEDIUMINT column should be created in a downstream database.

Now my question is: would it be a reasonable thing for us to encode the
original column type as an additional parameter of the Kafka Connect
schemas (using a special parameter name), or would this be bending the
concept of schema parameters too much? Admittedly, this metadata would be
kind of source-specific, but I can see how it'd be beneficial in some use
cases.

Thanks for any advice,

--Gunnar


[jira] [Created] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-6566:
-

 Summary: SourceTask#stop() not called after exception raised in 
poll()
 Key: KAFKA-6566
 URL: https://issues.apache.org/jira/browse/KAFKA-6566
 Project: Kafka
  Issue Type: Bug
Reporter: Gunnar Morling


Having discussed this with [~rhauch], it has been my assumption that 
{{SourceTask#stop()}} will be called by the Kafka Connect framework in case an 
exception has been raised in {{poll()}}. That's not the case, though. As an 
example see the connector and task below.

Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
action to take, as it'll allow the task to clean up any resources such as 
releasing any database connections, right after that failure and not only once 
the connector is stopped.

{code}
package com.example;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class TestConnector extends SourceConnector {

@Override
public String version() {
return null;
}

@Override
public void start(Map<String, String> props) {
}

@Override
public Class taskClass() {
return TestTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return Collections.singletonList(Collections.singletonMap("foo", 
"bar"));
}

@Override
public void stop() {
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

public static class TestTask extends SourceTask {

@Override
public String version() {
return null;
}

@Override
public void start(Map<String, String> props) {
}

@Override
public List poll() throws InterruptedException {
throw new RuntimeException();
}

@Override
public void stop() {
System.out.println("stop() called");
}
}
}

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6551) Unbounded queues in WorkerSourceTask cause OutOfMemoryError

2018-02-09 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-6551:
-

 Summary: Unbounded queues in WorkerSourceTask cause 
OutOfMemoryError
 Key: KAFKA-6551
 URL: https://issues.apache.org/jira/browse/KAFKA-6551
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Gunnar Morling


A Debezium user reported an {{OutOfMemoryError}} to us, with over 50,000 
messages in the {{WorkerSourceTask#outstandingMessages}} map.

This map is unbounded and I can't see any way of "rate limiting" which would 
control how many records are added to it. Growth can only indirectly be limited 
by reducing the offset flush interval, but as connectors can return large 
amounts of messages in single {{poll()}} calls that's not sufficient in all 
cases. Note the user reported this issue during snapshotting a database, i.e. a 
high number of records arrived in a very short period of time.

To solve the problem I'd suggest to make this map backpressure-aware and thus 
prevent its indefinite growth, so that no further records will be polled from 
the connector until messages have been taken out of the map again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6456) Improve JavaDoc of SourceTask

2018-01-17 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-6456:
-

 Summary: Improve JavaDoc of SourceTask
 Key: KAFKA-6456
 URL: https://issues.apache.org/jira/browse/KAFKA-6456
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Gunnar Morling






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


KIP-199 - Could offset management be part of the Connect REST API?

2017-11-17 Thread Gunnar Morling
Hi,

I was reading KIP-199 [1] for adding a tool for Kafka Connect offset
management. This would be a very useful functionality for users of the
Debezium CDC connectors, too.

What I was wondering, instead of having a separate tool for this, has it
been considered to expose offset management via the REST API of Connect?
There could be a resource /connectors//offsets for read and
write access. In line with the current KIP, write access would only be
allowed if the connector is stopped. Exposing this in the REST API might
allow for a consistent experience with the other connector management
functionalities.

I'm not sure whether there are any guidelines on when to have some
functionality as a separate tool vs. in the API, but I thought I'd bring up
the idea and see what others think.

Thanks,

--Gunnar

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-199%3A+Add+Kafka+Connect+offset+tool