[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323726#comment-15323726
 ] 

Randall Hauch commented on KAFKA-3821:
--------------------------------------

[~ewencp], I'm not sure a {{data}} field is that much different than storing 
information in the offsets. I'd also worry that if the data isn't stored in the 
same place as the offsets, that there's a chance for failing after offsets are 
flushed but before data is flushed (or vice versa).

However, I have been thinking of something that may very well be, well, out in 
left field. Maybe connectors need some kind of _history events_ that they can 
produce and easily consume. We originally talked some time ago about possibly 
adding persisted state (e.g., a key-value store), but I think that's going in 
the wrong direction and that instead an _event stream_ is far better and more 
powerful because it gives you an ordered history. Naturally, these history 
events would be persisted to a Kafka topic, but it'd be one that's primarily 
for the connector (or any monitors). The semantics are very specific, though:

* The connector registers an _event handler_ that is called for every event, 
with one method that handles the events and a few other methods that are called 
at various lifecycle state transitions.
* Upon startup, the framework reads the topic from the very beginning, 
replaying all events and forwarding them to the _event handler_. When complete, 
the event handler's _end replay_ method is called to signal to it that all 
events have been replayed.
* The connector can write to it at any time, and all events are synchronously 
written to Kafka (this is thus a lower-volume topic). The connector's consumer 
sees these events after they've been read from Kafka, although much of the time 
these events could probably be ignored (since the connector is writing them), 
but occasionally there might be an event it didn't create and the connector 
wants/needs to respond.

I could see a couple of uses for this:

# Debezium's MySQL connector records all DDL statements (with offsets) it reads 
from the MySQL binlog so that, when the connector starts back up and is given 
an offset it is to start it, it can replay the DDL statements until it reaches 
but does not pass the designated offset to reconstruct an in-memory model of 
the database schemas. Currently we do this with a separate thread, but we've 
had to sidestep Kafka Connect and handle this ourselves because we have to have 
full control over producing, flushing, and consuming. And our history is 
flushed before the offsets for normal records might be flushed, so we know that 
we might have extra events in our history (and have to ignore them) should 
something go wrong and DDL get recorded without the affiliated offsets.
# Record occasional state changes, such as "starting a snapshot", "completing a 
snapshot", etc.
# Enable a management tool to submit events (rather "requests") to be written 
to the topic. The connector would receive them and can react accordingly, all 
without having to stop the connector, change the configuration, restart the 
connector, wait for something to complete, and change the configuration back. 
An example might be to perform an ad hoc snapshot of 3 specific tables, or 
"flush snapshot of current DDL statements to history" (allowing the topic to be 
log compacted).
# Task coordination, where a connector with multiple tasks needs one "leader" 
task to do work while the others wait, and when completed all the tasks can 
continue. (This might be contrived.)

Now, I've not fleshed this out very far and welcome any thoughts or criticisms. 
My connector writing experience is limited: I've written one complicated 
connector and one relatively simple prototype connector, and I'm not sure 
whether this is a general need or something far too specific to my limited 
context.

This isn't too different from offset storage, except that you want to be able 
to replay the history upon startup. Perhaps there's a way of incorporating 
these ideas into offset storage, but I suspect that's not really a good idea.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-3821
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3821
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 0.9.0.1
>            Reporter: Randall Hauch
>            Assignee: Ewen Cheslack-Postava
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to