Florian Hussonnois created KAFKA-4794:
-----------------------------------------

             Summary: Add access to OffsetStorageReader from SourceConnector
                 Key: KAFKA-4794
                 URL: https://issues.apache.org/jira/browse/KAFKA-4794
             Project: Kafka
          Issue Type: Improvement
          Components: KafkaConnect
            Reporter: Florian Hussonnois
            Priority: Minor


Currently the offsets storage is only accessible from SourceTask to able to 
initialize properly tasks after a restart, a crash or a reconfiguration request.

To implement more complex connectors that need to track the progression of each 
task it would helpful to have access to an OffsetStorageReader instance from 
the SourceConnector.

In that way, we could have a background thread that could request a tasks 
reconfiguration based on source offsets.

This improvement proposal comes from a customer project that needs to 
periodically scan directories on a shared storage for detecting and for 
streaming new files into Kafka.

The connector implementation is pretty straightforward.

The connector uses a background thread to periodically scan directories. When 
new inputs files are detected a tasks reconfiguration is requested. Then the 
connector assigns a file subset to each task. 

Each task stores sources offsets for the last sent record. The source offsets 
data are:
 - the size of file
 - the bytes offset
 - the bytes size 

Tasks become idle when the assigned files are completed (in : 
recordBytesOffsets + recordBytesSize = fileBytesSize).

Then, the connector should be able to track offsets for each assigned file. 
When all tasks has finished the connector can stop them or assigned new files 
by requesting tasks reconfiguration. 

Moreover, another advantage of monitoring source offsets from the connector is 
detect slow or failed tasks and if necessary to be able to restart all tasks.

If you think this improvement is OK, I can work a pull request.

Thanks,



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to