Hi all,

I am trying to write custom sql streaming source and I have quite a lot of
questions about how it envisioned to be done.

First attempt was to
extend org.apache.spark.sql.execution.streaming.Source. At first it looks
simple. Data source tells what last offset it has and spark would ask for
data from previous offset up to the last you told you have. This leads to
straightforward implementation. Background thread pumps data into internal
buffer, getOffset reports last offset in the buffer, getBatch converts
slice of buffer into DataFrame.
But during initialization from checkpoint this workflow is turned upside
down. getBatch would be called with offset range which driver never
reported it has. So just for initialization I'll have to write fulfill
range function. Now data source has to implement 2 mode of operation, pull
and push. This seems to be redundant and driven by API design.

Ok, there are V2 streaming interfaces. Let's try "ContinuousReader".
Minimal implementation gives me "java.lang.UnsupportedOperationException:
Data source eventstore2 does not support microbatch processing." Hmm, I
thought that ContinuousReader is an alternative to MicroBatchReader. Also,
if it is required, why ContinuousReader is not inherited from
MicroBatchReader? Perhaps should be documented in ContinuousReader, what is
needed to make it actually working.

Ok, lets try MicroBatchReader. It is better then Source because it provides
explicit initialization call "setOffsetRange". But next call to implement
is "getEndOffset" and with driver being just initialized and have not
consumed any message yet, what should i return? There is no notion "not
ready yet" in API. Is the intention to finish all initialization in
setOffsetRange and block in there until first batch of data arrive? There
is no documentation in MicrobatchReader which would describe start-up
logic. Neither I can find examples of implementation. The only
implementation is RateSourceProviderV2, which does not deal with long
lasting initialization and does not demonstrate where is should be done.

And another question, how it should be implemented, if I do not know my
start position at all, for example Kafka's notion of special offsets, like
"start/end offset". If topic is empty, i will not be able to resolve actual
offset for very long tome, maybe weeks. I can guess that Offset is opaque
to spark and source implementation can implement notion of special offsets,
like "from start" but it would be nice to have it documented.

Ok, I've implemented getEndOffset as returning "0" until driver actually
get some data, next question is about createDataReaderFactories. From
comments I understand that this is done for partitioning purposes. So,
kafka for example could start multiple consumers for the same consumer
group and those consumers would be wrapped into DataReader by
DataReaderFactory. So, this means, that all connection and buffering
business is done in DataReader instances?  So upon "commit",
MicroBatchReader should inform its DataReaders that buffers can be
truncated?

Also terminology. What is "mix-in at least one of the plug-in interfaces"?
I see "mix-in" a lot in interface descriptions and can not understand what
its meaning is in this context. The concept of mixin does not exist in Java
AFIK and my best guess is that documentation hints that it takes more than
one interface to make useful class. But this only adds anxiety because
now I do not know which interfaces do I need to implement and the only way
to know is to implement one and see if it works and keep adding interfaces
until it start working (as is the case with ContinuousReader).
And what is "plug-in interface" I have no clue.

Thanks,
Vadym.


-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified

Reply via email to