Hi Greg,
The "long-term inconsistency" we have observed is not with no tasks at all, but 
instead with all the previously running tasks remaining in a running state but 
with a previous config.
If I'm understanding the original bug report correctly, the scope of the 
problem was thought to only affect the following built-in connectors: 
FileStreamSourceConnector and the FileStreamSinkConnector
see 
https://issues.apache.org/jira/browse/KAFKA-9228?focusedCommentId=16993990&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16993990
However, we are seeing this issue with a number of 3rd-party connectors not 
provided as part of the Kafka project as well.e.g.- Confluent's 
kafka-connect-s3 connector 
(https://github.com/confluentinc/kafka-connect-storage-cloud)- Aerospike's 
connector: 
(https://docs.aerospike.com/connect/kafka/to-asdb/from-kafka-to-asdb-overview)
We're wondering if it would be possible to re-evaluate the impact of this bug 
and look at addressing it either with the pre-existing PR 
(https://github.com/apache/kafka/pull/7823) or a new one.
Thanks!    On Friday, February 3, 2023, 04:29:38 PM EST, Greg Harris 
<greg.har...@aiven.io.invalid> wrote:  
 
 Frank,

I realized I didn't respond to the title directly, sorry about that.
The reason that `ClusterConfigState::inconsistentConnectors` is not used,
is that the effect of an inconsistent connector is applied via
`ClusterConfigState::tasks`.
If a connector is inconsistent, then the tasks method will not return any
task configurations.
This will cause the outer logic to believe that there are 0 tasks defined,
and so any connector which does request a task reconfiguration will write
any task configs that are generated by the connector.

And a task reconfiguration occurs on each connector start, and each time a
connector requests a reconfiguration.
If a reconfiguration failed (which is how the connector became
inconsistent) then it will be retried.
If the worker that had the reconfiguration fail then leaves the cluster,
then the rebalance algorithm will start the connector somewhere else, which
will trigger another task reconfiguration.

Given the above, there does not appear to be any way to have long-term
inconsistent connectors without a reconfiguration consistently failing.
If you are seeing the symptoms of long-term inconsistency (no tasks at all
for a connector) then I'd be very interested in a reproduction case for
that.

Thanks!
Greg Harris

On Fri, Feb 3, 2023 at 1:05 PM Greg Harris <greg.har...@aiven.io> wrote:

> Frank,
>
> The inconsistentConnectors method is related to an extremely specific
> inconsistency that can happen when a worker writes some task
> configurations, and then disconnects without writing a following "commit
> tasks record" to the config topic.
> This is a hold-over from the early days of connect from before Kafka's
> transactions support, and is mostly an implementation detail.
> See the `KafkaConfigBackingStore::putTaskConfigs` and
> `KafkaConfigBackingStore::processTasksCommitRecord` for the relevant code.
> It is not expected that this method is in regular use, and is primarily
> for diagnostic purposes.
>
> What the Strimzi issue seems to describe (and probably the issue you are
> facing) occurs at a higher level, when a worker is deciding whether to
> write new task configs at all.
> The relevant code is here:
> https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
> In that snippet, new task configs generated by the connector are only
> written to the config topic if they differ from the current contents of the
> config topic. And this comparison is done on the post-transformation
> configurations, after ConfigProviders have been resolved.
> And critical for this bug, that resolution is done twice in quick
> succession, when the old and new configuration could evaluate to the same
> final result.
> The code snippet also shows why your workaround works: the other condition
> for writing all of the task configs to the config topic is that the number
> of configurations has changed.
>
> I believe this bug is captured in
> https://issues.apache.org/jira/browse/KAFKA-9228 but it has not
> progressed in some time.
> There is a potentially lower-impact workaround that involves adding a
> nonce to your connector configuration that changes each time you apply a
> new configuration to the connector, which most connectors will pass
> directly to their tasks.
> But this unfortunately does not work in general, as connectors could
> exclude the nonce when generating task configurations.
>
> I hope this gives some more insight to the behavior you're seeing.
>
> Thanks,
> Greg Harris
>
> On Fri, Feb 3, 2023 at 7:36 AM Frank Grimes
> <frankgrime...@yahoo.com.invalid> wrote:
>
>> Hi, we're investigating an issue where occasionally config changes don't
>> propagate to connectors/tasks.
>>
>> When this occurs, the only way to ensure that the configuration takes
>> effect is to resize the number of tasks back down to 1 and then resize back
>> up to the original number of tasks.
>> In searching for others who have been bitten by this scenario we found
>> the following thread on the Strimzi discussions pages:
>> https://github.com/strimzi/strimzi-kafka-operator/discussions/7738
>> Both the symptoms and workaround described there match what we've
>> seen.We've been doing some digging into the Kafka Connect codebase to
>> better understand how config.storage.topic is consumed.
>> In the interest of brevity I won't repeat that entire thread of
>> discussion here.
>> However, I was wondering if anyone knows whether the JavaDoc suggestion
>> on ClusterConfigState.inconsistentConnectors() is actually implemented in
>> the clustered Worker code.i.e. "When a worker detects a connector in this
>> state, it should request that the connector regenerate its task
>> configurations."
>> The reason I ask is because I couldn't find any references to that API
>> call anywhere but in the KafkaConfigBackingStoreTest unit test cases.
>> Thanks!
>>
>
  

Reply via email to