Hi Manu,
Yes, but the instantiation remains the same (auto instantiation by the 
framework and String passing as arguments). I
discussed with spark team and they pointed to the catalog API but it is still 
under design. I think I'll go for the
serialization to String of the beam Source as Kenn suggested.
Etienne
Le vendredi 21 décembre 2018 à 22:26 +0800, Manu Zhang a écrit :
> Etienne,
> 
> 
> Have you looked at  
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
> la ? It seems that’s how Spark integrates Kafka with DataFrame API.
> 
> 
> 
> Thanks,
> 
> Manu Zhang
> On Dec 20, 2018, 5:31 PM +0800, Etienne Chauchot <[email protected]>, 
> wrote:
> 
> > @Manu, anyway spark DataSource (V1) instanciation mechanism is the same 
> > than DataSource V2, so question remains even
> > if we target v1 for stability reasons.
> > 
> > 
> > Etienne
> > 
> > 
> > Le mercredi 19 décembre 2018 à 17:07 +0100, Etienne Chauchot a écrit :
> > > Yes, this is thanks to these spark community meetings that I got the name 
> > > of Ryan. And, indeed, when I saw the
> > > design sync meetings, I realized how recent the DataSourceV2 API is.
> > > I think you are right, I should wait for it to be finished and in the 
> > > meantime use V1.
> > > 
> > > 
> > > Etienne
> > > 
> > > 
> > > Le mercredi 19 décembre 2018 à 23:27 +0800, Manu Zhang a écrit :
> > > > The Spark community has been holding a weekly sync meeting on 
> > > > DataSourceV2 and sharing notes back to their dev
> > > > list 
> > > > https://lists.apache.org/[email protected]:lte=3M:DataSourceV2%20sync.
> > > >  At this time, there are
> > > > still some moving pieces at Spark’s side. Is it too early to target 
> > > > DataSourceV2 ?
> > > > 
> > > > 
> > > > Thanks,
> > > > 
> > > > Manu Zhang
> > > > On Dec 19, 2018, 6:40 PM +0800, Etienne Chauchot 
> > > > <[email protected]>, wrote:
> > > > 
> > > > > Thanks Kenn for taking the time to take a look
> > > > > 
> > > > > 
> > > > > Le mardi 18 décembre 2018 à 11:39 -0500, Kenneth Knowles a écrit :
> > > > > > I don't know DataSourceV2 well, but I am reading around to try to 
> > > > > > help. I see the problem with the
> > > > > > SparkSession API. Is there no other way to instantiate a 
> > > > > > DataSourceV2 and read the data from it?
> > > > > 
> > > > > => No this is exactly what I'm looking for :)
> > > > > > 
> > > > > > 
> > > > > > Other thoughts:
> > > > > > 
> > > > > > 
> > > > > >  - Maybe start from Splittable DoFn since it is a new translator?
> > > > > > 
> > > > > > 
> > > > > 
> > > > > => Yes but I still need to translate BoundedSource and 
> > > > > UnBoundedSource for compatibility with IOs that have
> > > > > not migrated to SDF
> > > > > 
> > > > > 
> > > > > >  - I wonder if the reason for this API is that the class name and 
> > > > > > options are what is shipped to workers, so
> > > > > > the limited API makes serialization easy for them?
> > > > > > 
> > > > > 
> > > > > => Yes, that and because DataSource is the entry point of the spark 
> > > > > pipeline so it should not need to receive
> > > > > more than user input conf, hence the String only support. But we are 
> > > > > not users but DAG translators hence our
> > > > > need to pass more complex objects than Strings.
> > > > > 
> > > > > 
> > > > > > 
> > > > > >  - As a total hack, you could serialize the Beam objects (maybe to 
> > > > > > portable protos) and pass that as a
> > > > > > single "primitive type" option.
> > > > > > 
> > > > > > 
> > > > > 
> > > > > => Yes, sure, it could work. Another hack would be to use ASM or 
> > > > > ByteBuddy to "enhance" Spark classes but it
> > > > > is weak and risky :)
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > You definitely need someone from Spark more than someone from Beam 
> > > > > > for this issue. At this point, I've read
> > > > > > the scaladocs enough that I think I'd dig into Spark's code to see 
> > > > > > what is going on and if there is a way
> > > > > > that is more obviously right.
> > > > > > 
> > > > > > 
> > > > > 
> > > > > => Yes this is what I tried but got no answer on the public spark 
> > > > > MLs. Luckily I asked directly Ryan Blue of
> > > > > the Spark community. He kindly answered. I'm digging into Catalog and 
> > > > > Spark plans to get a different
> > > > > instanciation mechanism.
> > > > > 
> > > > > 
> > > > > Etienne
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > Kenn
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > On Tue, Dec 18, 2018 at 11:09 AM Etienne Chauchot 
> > > > > > <[email protected]> wrote:
> > > > > > 
> > > > > > > Hi everyone, 
> > > > > > > 
> > > > > > > 
> > > > > > > Does anyone have comments on this question?
> > > > > > > 
> > > > > > > 
> > > > > > > Thanks
> > > > > > > Etienne
> > > > > > > 
> > > > > > > 
> > > > > > > Le vendredi 14 décembre 2018 à 10:37 +0100, Etienne Chauchot a 
> > > > > > > écrit :
> > > > > > > > Hi guys,
> > > > > > > > I'm currently coding a POC on a new spark runner based on 
> > > > > > > > structured streaming and new DataSourceV2 API
> > > > > > > > and I'm having an interrogation. Having found no pointers on 
> > > > > > > > the internet, I've asked the spark
> > > > > > > > community with no luck. If anyone of you have knowledge about 
> > > > > > > > new Spark DataSourceV2 API, can you share
> > > > > > > > thoughts?
> > > > > > > > 
> > > > > > > > 
> > > > > > > > Also I did not mention in the email but I did not find any way 
> > > > > > > > to get a reference on the automatically
> > > > > > > > created DataSourceV2 instance, so I cannot lazy init the source 
> > > > > > > > either.
> > > > > > > > 
> > > > > > > > 
> > > > > > > > Thanks
> > > > > > > > 
> > > > > > > > 
> > > > > > > > Etienne
> > > > > > > > 
> > > > > > > > 
> > > > > > > > -------- Message transféré --------
> > > > > > > > De: Etienne Chauchot <[email protected]>
> > > > > > > > À: [email protected]
> > > > > > > > Objet: [Apache Beam] Custom DataSourceV2 instanciation: 
> > > > > > > > parameters passing and Encoders
> > > > > > > > Date: Tue, 11 Dec 2018 19:02:23 +0100
> > > > > > > > 
> > > > > > > > 
> > > > > > > > Hi Spark guys,
> > > > > > > > 
> > > > > > > > 
> > > > > > > > I'm Etienne Chauchot and I'm a committer on the Apache Beam 
> > > > > > > > project.
> > > > > > > > 
> > > > > > > > 
> > > > > > > > We have what we call runners. They are pieces of software that 
> > > > > > > > translate pipelines written using Beam
> > > > > > > > API into pipelines that use native execution engine API. 
> > > > > > > > Currently, the Spark runner uses old RDD /
> > > > > > > > DStream APIs.
> > > > > > > > I'm writing a new runner that will use structured streaming 
> > > > > > > > (but not continuous processing, and also no
> > > > > > > > schema for now).
> > > > > > > > 
> > > > > > > > 
> > > > > > > > I am just starting. I'm currently trying to map our sources to 
> > > > > > > > yours. I'm targeting new DataSourceV2
> > > > > > > > API. It maps pretty well with Beam sources but I have a problem 
> > > > > > > > with instanciation of the custom source.
> > > > > > > > I searched for an answer in stack-overflow and user ML with no 
> > > > > > > > luck. I guess it is a too specific
> > > > > > > > question:
> > > > > > > > 
> > > > > > > > 
> > > > > > > > When visiting Beam DAG I have access to Beam objects such as 
> > > > > > > > Source and Reader that I need to map to
> > > > > > > > MicroBatchReader and InputPartitionReader.
> > > > > > > > As far as I understand, a custom DataSourceV2 is instantiated 
> > > > > > > > automatically by spark thanks to
> > > > > > > > sparkSession.readStream().format(providerClassName) or similar 
> > > > > > > > code. The problem is that I can only pass
> > > > > > > > options of primitive types + String so I cannot pass the Beam 
> > > > > > > > Source to DataSourceV2.
> > > > > > > > => Is there a way to do so ?
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > Also I get as an output a Dataset<Row>. The Row contains an 
> > > > > > > > instance of Beam WindowedValue<T>, T is the
> > > > > > > > type parameter of the Source. I do a map on the Dataset to 
> > > > > > > > transform it to a Dataset<WindowedValue<T>>.
> > > > > > > > I have a question related to the Encoder: 
> > > > > > > > => how to properly create an Encoder for the generic type 
> > > > > > > > WindowedValue<T> to use in the map?
> > > > > > > > 
> > > > > > > > 
> > > > > > > > Here is the code:
> > > > > > > > https://github.com/apache/beam/tree/spark-runner_structured-streaming
> > > > > > > > 
> > > > > > > > 
> > > > > > > > And more specially:
> > > > > > > > 
https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
> > > > > > > > 
https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
> > > > > > > > 
> > > > > > > > 
> > > > > > > > Thanks,
> > > > > > > > 
> > > > > > > > 
> > > > > > > > Etienne
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> 
> 
> 

Reply via email to