C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r820917219



##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return 
{@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value 
such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link 
ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be 
called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be 
run with exactly-once support.

Review comment:
       @tombentley 
   
   Definitely in favor of doing single-property validations before 
multi-property validations, and as far as single-property validations and 
short-circuiting go, the downstream [preflight validation 
PR](https://github.com/apache/kafka/pull/11776) already performs 
short-circuiting [if the `exactly.once.support` property is 
invalid](https://github.com/C0urante/kafka/blob/b4976e82aec1c864bd25d660b4042a71e03b7c47/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L878)
 by skipping the call to `SourceConnector::exactlyOnceSupport`, and does the 
same for [the `transaction.boundary` 
property](https://github.com/C0urante/kafka/blob/b4976e82aec1c864bd25d660b4042a71e03b7c47/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L925)
 and `SourceConnector::canDefineTransactionBoundaries` method as well.
   
   Can you clarify why you prefer to also short-circuit if any errors are 
reported by single-property or multi-property validation beyond ones reported 
for the `exactly.once.support` and `transaction.boundary` properties? It comes 
with a downside that we'd end up forcing two validation passes on users in 
order to discover any issues with the `exactly.once.support` and 
`transaction.boundary` properties, even if they could potentially be surfaced 
by the connector despite the issues with its config.
   
   Not sure passing in a 
[Config](https://github.com/apache/kafka/blob/3be978464ca01c29241954516b4d952f69d4e16d/clients/src/main/java/org/apache/kafka/common/config/Config.java)
 object would be the best way to address this, but it is better than nothing. 
It doesn't seem like a very ergonomic class to work with for follow-up 
validation attempts and is better suited for rendering final results to users 
(which, IMO, is part of the reason that the preflight validation logic in 
`AbstractHerder` is difficult to follow). Something like a `Map<String, 
ConfigValue>` might be more useful since I imagine most connector authors would 
end up either deriving one of their own from the `Config` object or using Java 
8 streams logic to find the `ConfigValue` for a single property or subset of 
properties. But I'm also hesitant about this approach since I'm not sure how 
we'd want to handle properties that are present in the raw connector config, 
but which the connector doesn't define a `ConfigValue` for
  in the `Config` returned from `Connector::validate`. Maybe I'm overthinking 
this but I can't shake the feeling that either way we go on that front (either 
discard those properties and don't include them in 
`SourceConnector::exactlyOnceSupport` and 
`SourceConnector::canDefineTransactionBoundaries`, or include them with 
`ConfigValue` objects constructed by the framework and inserted into the 
resulting `Config` or `Map<String, ConfigValue>` object), we're going to end up 
introducing some footguns into the logic here that are just going to frustrate 
developers who want to implement these methods without having to read 
paragraphs of documentation or go through framework source code. One practical 
example of this is the `transaction.boundary` property--most connectors won't 
(and shouldn't) define this property themselves, so we'd have to decide if we'd 
want to provide that property to connectors in 
`SourceConnector::exactlyOnceSupport`, and if so, how.
   
   I think an ideal solution here might be opinionated but flexible: we can 
provide special accommodations for idiomatic usage patterns with the Connect 
API (like attaching special meaning to thrown `ConfigException` instances, like 
is already done during [`ConfigDef` config 
validation](https://github.com/apache/kafka/blob/bbb2dc54a0f45bc5455f22a0671adde206dcfa29/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L592-L594),
 or how we [handle `RetriableException` instances specially for source 
tasks](https://github.com/apache/kafka/blob/bbb2dc54a0f45bc5455f22a0671adde206dcfa29/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L306-L310)),
 but allow connector authors enough breathing room to try to surface as much 
information as possible in a single validation pass if they want to put in the 
extra work to make the UX for their connector as smooth as possible.
   
   From an API design perspective, I think adding a `Config` argument to the 
new methods here certainly accomplishes both of these; I'm just stuck on a few 
implementation details that I'm not sure we can overcome.
   
   On this front:
   
   > Or am I asking the impossible (AbstractHerder#validateConnectorConfig is 
not the easiest piece of code to reason about)?
   
   One thing that may be useful to point out (forgive me if you're already 
aware) is that it's expected that `Connector::validate` will report errors 
without throwing exceptions, which means that there's no implicit contract that 
the validation control flow gets short-circuited right now if there are any 
errors in the connector config. This means that we can also validate, for 
example, overridden Kafka client properties and whether they are permitted by 
the `ConnectorClientConfigOverridePolicy` set up on the worker, even if there 
are other errors in the connector config.
   
   @mimaison 
   
   Fair enough! There's definitely some potential for out-of-scope work here 
(like following up on KIP-419 or developing alternatives to it), but at the 
very least we can count the newly-introduced APIs as in-scope and should 
provide these types of guarantees for them.
   
   **@ both**
   
   It may be easier to go over the details of the precise behavior we want here 
on the [preflight validation PR](https://github.com/apache/kafka/pull/11776). 
If we prefer a high-level discussion it's probably best to keep things here, 
but since we're getting fairly granular now, it might help to see what's been 
drafted so far implementation-wise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to