Hi all, I am trying to write custom sql streaming source and I have quite a lot of questions about how it envisioned to be done.
First attempt was to extend org.apache.spark.sql.execution.streaming.Source. At first it looks simple. Data source tells what last offset it has and spark would ask for data from previous offset up to the last you told you have. This leads to straightforward implementation. Background thread pumps data into internal buffer, getOffset reports last offset in the buffer, getBatch converts slice of buffer into DataFrame. But during initialization from checkpoint this workflow is turned upside down. getBatch would be called with offset range which driver never reported it has. So just for initialization I'll have to write fulfill range function. Now data source has to implement 2 mode of operation, pull and push. This seems to be redundant and driven by API design. Ok, there are V2 streaming interfaces. Let's try "ContinuousReader". Minimal implementation gives me "java.lang.UnsupportedOperationException: Data source eventstore2 does not support microbatch processing." Hmm, I thought that ContinuousReader is an alternative to MicroBatchReader. Also, if it is required, why ContinuousReader is not inherited from MicroBatchReader? Perhaps should be documented in ContinuousReader, what is needed to make it actually working. Ok, lets try MicroBatchReader. It is better then Source because it provides explicit initialization call "setOffsetRange". But next call to implement is "getEndOffset" and with driver being just initialized and have not consumed any message yet, what should i return? There is no notion "not ready yet" in API. Is the intention to finish all initialization in setOffsetRange and block in there until first batch of data arrive? There is no documentation in MicrobatchReader which would describe start-up logic. Neither I can find examples of implementation. The only implementation is RateSourceProviderV2, which does not deal with long lasting initialization and does not demonstrate where is should be done. And another question, how it should be implemented, if I do not know my start position at all, for example Kafka's notion of special offsets, like "start/end offset". If topic is empty, i will not be able to resolve actual offset for very long tome, maybe weeks. I can guess that Offset is opaque to spark and source implementation can implement notion of special offsets, like "from start" but it would be nice to have it documented. Ok, I've implemented getEndOffset as returning "0" until driver actually get some data, next question is about createDataReaderFactories. From comments I understand that this is done for partitioning purposes. So, kafka for example could start multiple consumers for the same consumer group and those consumers would be wrapped into DataReader by DataReaderFactory. So, this means, that all connection and buffering business is done in DataReader instances? So upon "commit", MicroBatchReader should inform its DataReaders that buffers can be truncated? Also terminology. What is "mix-in at least one of the plug-in interfaces"? I see "mix-in" a lot in interface descriptions and can not understand what its meaning is in this context. The concept of mixin does not exist in Java AFIK and my best guess is that documentation hints that it takes more than one interface to make useful class. But this only adds anxiety because now I do not know which interfaces do I need to implement and the only way to know is to implement one and see if it works and keep adding interfaces until it start working (as is the case with ContinuousReader). And what is "plug-in interface" I have no clue. Thanks, Vadym. -- >From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified