[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194806#comment-16194806
 ] 

Randall Hauch commented on KAFKA-4794:
--------------------------------------

This KIP didn't get enough votes to make it into the 1.0 release, so I think 
it's worth time to try to update/improve the Motivation section of the KIP to 
explain why this is important.

I have another use case that requires this capability, but it actually doesn't 
require the connector to do any kind of monitoring or actions. I'll try to 
first describe it in a general way, and then will give a concrete example.

In many cases, it is possible to determine the tasks based only on the external 
system. But in other cases, the number of tasks and their configuration may 
also depend upon the offsets persisted by the tasks run previously. After all, 
the offsets are an excellent and natural way for the tasks to record their 
progress.

Consider a source connector that needs perform a set of actions in parallel. 
Ideally the connector could use a separate task for each of these actions, and 
then have Connect manage and distribute these tasks for the connector. However, 
the tasks cannot coordinate directly, but they can record their progress in 
offsets (as those tasks generate records) and can call for a task 
reconfiguration. The problem is that upon reconfiguration, the connector can't 
read the offsets to know the state of those tasks, and therefore can't really 
configure the tasks appropriately.

A concrete example of this use case involves recent work in the Debezium 
community. The Debezium MySQL CDC connector can be configured with a set of 
tables that are to be captured/copied. When the connector starts up, it 
performs a consistent snapshot of these tables and then reads the MySQL binlog 
to capture the changes committed to those tables after the snapshot was 
started. However, if the connector is restarted with a different table filter, 
such that there are now several existing tables that are to be captured. The 
developers are changing the connector to be able to detect this case and 
asynchronously perform a snapshot of those additional tables and then read the 
binlog to capture subsequent changes. Snapshots of very large tables can take a 
long time (e.g., days) to run, so the connector shouldn't stop reading the 
binlog for the original set of tables. This could be implemented as two 
instances of the existing task each doing the same thing but just on different 
sets of tables (the original set and the recently added tables). However, at 
some point both tasks approach the current head of the binlog, and at that 
point the connector only needs one of the tasks. The snapshotting task could 
request the tasks be reconfigured, and Connect would stop the two tasks and ask 
the Connector implementation to compute the configs for the new task(s). *If 
the Connector had access to the offsets, it could detect that the snapshot task 
had completed, and could know enough to return the configuration for only the 
main task*, which then would do a bit of reconciliation before continuing as 
normal.

> Add access to OffsetStorageReader from SourceConnector
> ------------------------------------------------------
>
>                 Key: KAFKA-4794
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4794
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.0
>            Reporter: Florian Hussonnois
>            Priority: Minor
>              Labels: needs-kip
>             Fix For: 1.1.0
>
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to