Hi guys,I'm currently coding a POC on a new spark runner based on structured 
streaming and new DataSourceV2 API and I'm
having an interrogation. Having found no pointers on the internet, I've asked 
the spark community with no luck. If
anyone of you have knowledge about new Spark DataSourceV2 API, can you share 
thoughts?
Also I did not mention in the email but I did not find any way to get a 
reference on the automatically created
DataSourceV2 instance, so I cannot lazy init the source either.
Thanks
Etienne
-------- Message transféré --------De: Etienne Chauchot 
<echauc...@apache.org>À: dev@spark.apache.orgObjet: [Apache
Beam] Custom DataSourceV2 instanciation: parameters passing and EncodersDate: 
Tue, 11 Dec 2018 19:02:23 +0100
Hi Spark guys,
I'm Etienne Chauchot and I'm a committer on the Apache Beam project. 
We have what we call runners. They are pieces of software that translate 
pipelines written using Beam API into pipelines
that use native execution engine API. Currently, the Spark runner uses old RDD 
/ DStream APIs. I'm writing a new runner
that will use structured streaming (but not continuous processing, and also no 
schema for now).
I am just starting. I'm currently trying to map our sources to yours. I'm 
targeting new DataSourceV2 API. It maps pretty
well with Beam sources but I have a problem with instanciation of the custom 
source.I searched for an answer in stack-
overflow and user ML with no luck. I guess it is a too specific question:
When visiting Beam DAG I have access to Beam objects such as Source and Reader 
that I need to map to MicroBatchReader
and InputPartitionReader.As far as I understand, a custom DataSourceV2 is 
instantiated automatically by spark thanks to
sparkSession.readStream().format(providerClassName) or similar code. The 
problem is that I can only pass options of
primitive types + String so I cannot pass the Beam Source to DataSourceV2. => 
Is there a way to do so ?

Also I get as an output a Dataset<Row>. The Row contains an instance of Beam 
WindowedValue<T>, T is the type parameter
of the Source. I  do a map on the Dataset to transform it to a 
Dataset<WindowedValue<T>>. I have a question related to
the Encoder: => how to properly create an Encoder for the generic type 
WindowedValue<T> to use in the map?
Here is the 
code:https://github.com/apache/beam/tree/spark-runner_structured-streaming
And more specially:
https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.javahttps://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
Thanks,
Etienne







Reply via email to