I don't know DataSourceV2 well, but I am reading around to try to help. I see the problem with the SparkSession API. Is there no other way to instantiate a DataSourceV2 and read the data from it?
Other thoughts: - Maybe start from Splittable DoFn since it is a new translator? - I wonder if the reason for this API is that the class name and options are what is shipped to workers, so the limited API makes serialization easy for them? - As a total hack, you could serialize the Beam objects (maybe to portable protos) and pass that as a single "primitive type" option. You definitely need someone from Spark more than someone from Beam for this issue. At this point, I've read the scaladocs enough that I think I'd dig into Spark's code to see what is going on and if there is a way that is more obviously right. Kenn On Tue, Dec 18, 2018 at 11:09 AM Etienne Chauchot <[email protected]> wrote: > Hi everyone, > > Does anyone have comments on this question? > > Thanks > Etienne > > Le vendredi 14 décembre 2018 à 10:37 +0100, Etienne Chauchot a écrit : > > Hi guys, > I'm currently coding a POC on a new spark runner based on structured > streaming and new DataSourceV2 API and I'm having an interrogation. Having > found no pointers on the internet, I've asked the spark community with no > luck. If anyone of you have knowledge about new Spark DataSourceV2 API, can > you share thoughts? > > Also I did not mention in the email but I did not find any way to get a > reference on the automatically created DataSourceV2 instance, so I cannot > lazy init the source either. > > Thanks > > Etienne > > -------- Message transféré -------- > *De*: Etienne Chauchot <[email protected] > <etienne%20chauchot%20%[email protected]%3e>> > *À*: [email protected] > *Objet*: [Apache Beam] Custom DataSourceV2 instanciation: parameters > passing and Encoders > *Date*: Tue, 11 Dec 2018 19:02:23 +0100 > > Hi Spark guys, > > I'm Etienne Chauchot and I'm a committer on the Apache Beam project. > > We have what we call runners. They are pieces of software that translate > pipelines written using Beam API into pipelines that use native execution > engine API. Currently, the Spark runner uses old RDD / DStream APIs. > I'm writing a new runner that will use structured streaming (but not > continuous processing, and also no schema for now). > > I am just starting. I'm currently trying to map our sources to yours. I'm > targeting new DataSourceV2 API. It maps pretty well with Beam sources but I > have a problem with instanciation of the custom source. > I searched for an answer in stack-overflow and user ML with no luck. I > guess it is a too specific question: > > When visiting Beam DAG I have access to Beam objects such as Source and > Reader that I need to map to MicroBatchReader and InputPartitionReader. > As far as I understand, a custom DataSourceV2 is instantiated > automatically by spark thanks to > sparkSession.readStream().format(providerClassName) or similar code. The > problem is that I can only pass options of primitive types + String so I > cannot pass the Beam Source to DataSourceV2. > => Is there a way to do so ? > > > Also I get as an output a Dataset<Row>. The Row contains an instance of > Beam WindowedValue<T>, T is the type parameter of the Source. I do a map on > the Dataset to transform it to a Dataset<WindowedValue<T>>. I have a > question related to the Encoder: > => how to properly create an Encoder for the generic type WindowedValue<T> > to use in the map? > > Here is the code: > https://github.com/apache/beam/tree/spark-runner_structured-streaming > > And more specially: > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java > > Thanks, > > Etienne > > > > > > >
