Etienne,

Have you looked at  
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
 ? It seems that’s how Spark integrates Kafka with DataFrame API.

Thanks,
Manu Zhang
On Dec 20, 2018, 5:31 PM +0800, Etienne Chauchot <[email protected]>, wrote:
> @Manu, anyway spark DataSource (V1) instanciation mechanism is the same than 
> DataSource V2, so question remains even if we target v1 for stability reasons.
>
> Etienne
>
> Le mercredi 19 décembre 2018 à 17:07 +0100, Etienne Chauchot a écrit :
> > Yes, this is thanks to these spark community meetings that I got the name 
> > of Ryan. And, indeed, when I saw the design sync meetings, I realized how 
> > recent the DataSourceV2 API is.
> > I think you are right, I should wait for it to be finished and in the 
> > meantime use V1.
> >
> > Etienne
> >
> > Le mercredi 19 décembre 2018 à 23:27 +0800, Manu Zhang a écrit :
> > > The Spark community has been holding a weekly sync meeting on 
> > > DataSourceV2 and sharing notes back to their dev list 
> > > https://lists.apache.org/[email protected]:lte=3M:DataSourceV2%20sync.
> > >  At this time, there are still some moving pieces at Spark’s side. Is it 
> > > too early to target DataSourceV2 ?
> > >
> > > Thanks,
> > > Manu Zhang
> > > On Dec 19, 2018, 6:40 PM +0800, Etienne Chauchot <[email protected]>, 
> > > wrote:
> > > > Thanks Kenn for taking the time to take a look
> > > >
> > > > Le mardi 18 décembre 2018 à 11:39 -0500, Kenneth Knowles a écrit :
> > > > > I don't know DataSourceV2 well, but I am reading around to try to 
> > > > > help. I see the problem with the SparkSession API. Is there no other 
> > > > > way to instantiate a DataSourceV2 and read the data from it?
> > > > => No this is exactly what I'm looking for :)
> > > > >
> > > > > Other thoughts:
> > > > >
> > > > >  - Maybe start from Splittable DoFn since it is a new translator?
> > > > => Yes but I still need to translate BoundedSource and UnBoundedSource 
> > > > for compatibility with IOs that have not migrated to SDF
> > > >
> > > > >  - I wonder if the reason for this API is that the class name and 
> > > > > options are what is shipped to workers, so the limited API makes 
> > > > > serialization easy for them?
> > > > => Yes, that and because DataSource is the entry point of the spark 
> > > > pipeline so it should not need to receive more than user input conf, 
> > > > hence the String only support. But we are not users but DAG translators 
> > > > hence our need to pass more complex objects than Strings.
> > > >
> > > > >  - As a total hack, you could serialize the Beam objects (maybe to 
> > > > > portable protos) and pass that as a single "primitive type" option.
> > > > => Yes, sure, it could work. Another hack would be to use ASM or 
> > > > ByteBuddy to "enhance" Spark classes but it is weak and risky :)
> > > > >
> > > > > You definitely need someone from Spark more than someone from Beam 
> > > > > for this issue. At this point, I've read the scaladocs enough that I 
> > > > > think I'd dig into Spark's code to see what is going on and if there 
> > > > > is a way that is more obviously right.
> > > > => Yes this is what I tried but got no answer on the public spark MLs. 
> > > > Luckily I asked directly Ryan Blue of the Spark community. He kindly 
> > > > answered. I'm digging into Catalog and Spark plans to get a different 
> > > > instanciation mechanism.
> > > >
> > > > Etienne
> > > > >
> > > > > Kenn
> > > > >
> > > > > > On Tue, Dec 18, 2018 at 11:09 AM Etienne Chauchot 
> > > > > > <[email protected]> wrote:
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > Does anyone have comments on this question?
> > > > > > >
> > > > > > > Thanks
> > > > > > > Etienne
> > > > > > >
> > > > > > > Le vendredi 14 décembre 2018 à 10:37 +0100, Etienne Chauchot a 
> > > > > > > écrit :
> > > > > > > > Hi guys,
> > > > > > > > I'm currently coding a POC on a new spark runner based on 
> > > > > > > > structured streaming and new DataSourceV2 API and I'm having an 
> > > > > > > > interrogation. Having found no pointers on the internet, I've 
> > > > > > > > asked the spark community with no luck. If anyone of you have 
> > > > > > > > knowledge about new Spark DataSourceV2 API, can you share 
> > > > > > > > thoughts?
> > > > > > > >
> > > > > > > > Also I did not mention in the email but I did not find any way 
> > > > > > > > to get a reference on the automatically created DataSourceV2 
> > > > > > > > instance, so I cannot lazy init the source either.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > >
> > > > > > > > Etienne
> > > > > > > >
> > > > > > > > -------- Message transféré --------
> > > > > > > > De: Etienne Chauchot <[email protected]>
> > > > > > > > À: [email protected]
> > > > > > > > Objet: [Apache Beam] Custom DataSourceV2 instanciation: 
> > > > > > > > parameters passing and Encoders
> > > > > > > > Date: Tue, 11 Dec 2018 19:02:23 +0100
> > > > > > > >
> > > > > > > > Hi Spark guys,
> > > > > > > >
> > > > > > > > I'm Etienne Chauchot and I'm a committer on the Apache Beam 
> > > > > > > > project.
> > > > > > > >
> > > > > > > > We have what we call runners. They are pieces of software that 
> > > > > > > > translate pipelines written using Beam API into pipelines that 
> > > > > > > > use native execution engine API. Currently, the Spark runner 
> > > > > > > > uses old RDD / DStream APIs.
> > > > > > > > I'm writing a new runner that will use structured streaming 
> > > > > > > > (but not continuous processing, and also no schema for now).
> > > > > > > >
> > > > > > > > I am just starting. I'm currently trying to map our sources to 
> > > > > > > > yours. I'm targeting new DataSourceV2 API. It maps pretty well 
> > > > > > > > with Beam sources but I have a problem with instanciation of 
> > > > > > > > the custom source.
> > > > > > > > I searched for an answer in stack-overflow and user ML with no 
> > > > > > > > luck. I guess it is a too specific question:
> > > > > > > >
> > > > > > > > When visiting Beam DAG I have access to Beam objects such as 
> > > > > > > > Source and Reader that I need to map to MicroBatchReader and 
> > > > > > > > InputPartitionReader.
> > > > > > > > As far as I understand, a custom DataSourceV2 is instantiated 
> > > > > > > > automatically by spark thanks to 
> > > > > > > > sparkSession.readStream().format(providerClassName) or similar 
> > > > > > > > code. The problem is that I can only pass options of primitive 
> > > > > > > > types + String so I cannot pass the Beam Source to DataSourceV2.
> > > > > > > > => Is there a way to do so ?
> > > > > > > >
> > > > > > > >
> > > > > > > > Also I get as an output a Dataset<Row>. The Row contains an 
> > > > > > > > instance of Beam WindowedValue<T>, T is the type parameter of 
> > > > > > > > the Source. I do a map on the Dataset to transform it to a 
> > > > > > > > Dataset<WindowedValue<T>>. I have a question related to the 
> > > > > > > > Encoder:
> > > > > > > > => how to properly create an Encoder for the generic type 
> > > > > > > > WindowedValue<T> to use in the map?
> > > > > > > >
> > > > > > > > Here is the code:
> > > > > > > > https://github.com/apache/beam/tree/spark-runner_structured-streaming
> > > > > > > >
> > > > > > > > And more specially:
> > > > > > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
> > > > > > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Etienne
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >

Reply via email to