Hi all ,Thanks for your feedback! I was indeed thinking about Reuven's work 
around Schema PCollections, hence my email
to the community. I don't see how it fits considering that, as I'm wrapping a 
source, I need to store both the timestamp
and the value hence the use of WindowedValue<T> (as the other runners do). Yet, 
WindowedValue<T> might be an overkill
because I obviously have no windowing as I'm at the input of a pipeline. Hence 
the idea to create a beam schema that has
the fields of T + a long for the timestamp and make a converter between beam 
schema and spark schema. But I'm not sure
it will be more performant than simply serializing WindowedValue object to 
bytes. I guess Schema PCollection is a tool
more usefull for the sdk part than for the runner part of Beam. Am I missing 
something @Reuven ?
for the same source wrapping problem, both current spark and flink also store 
WindowedValue<T> but do not enforce any
schema in their Dataset equivalent structures. So they don't have this problem
@Manu, regarding your concerns about serialization/deserialization roundtrip, 
artificial roundtrips (not triggered by
the spark FMWK) only happen once at the source execution time. downstream we 
have Dataset<WindowedValue<T>>. But,
indeed, if we apply a pardo it gets wrapped into a mapPartition for which spark 
will require a encoder (similar to beam
coder) for serialization. And indeed we provide a bytes encoder. But once again 
if we had a schema, spark would still
serde and I'm not sure a bean/schema Encoder would be more performant than a 
binary one.
Side note: @Gleb, yes using a schema would allow to use pushdown predicates 
that are included in spark DataSourceV2 API.
But such predicates would depend on the backend IO technology  that we dont' 
know in advance (e.g. filter by a column
with is not a primary/clustering column in Cassandra could not pushed down). We 
would have to translate differently
depending on the IO in place of translating only BoundedSource and 
UnboundedSource.
BestEtienne
Le vendredi 18 janvier 2019 à 18:33 +0100, Gleb Kanterov a écrit :
> Agree with Kenn. It should be possible, Spark has a similar concept called 
> ExpressionEncoder, I was doing similar
> derivation using Scala macro in typelevel/frameless.
> 
> Most of the code in Beam is a blackbox function in ParDo, and the only way to 
> translate it is using `mapPartition`,
> however, we could override behavior for known transforms from beam-java-core, 
> for instance, Group, Select, and
> use FieldAccessDescriptor to pushdown projections. There is a bigger 
> opportunity for Beam SQL, that translates into a
> transforms that fit more Spark DataFrame model.
> 
> Gleb
> 
> 
> 
> On Fri, Jan 18, 2019 at 3:25 PM Kenneth Knowles <k...@apache.org> wrote:
> > I wonder if this could tie in with Reuven's recent work. He's basically 
> > making it so every type with an "obvious"
> > schema automatically converts to/from Row whenever needed. Sounds like a 
> > similar need, superficially.
> > Kenn
> > On Fri, Jan 18, 2019, 02:36 Manu Zhang <owenzhang1...@gmail.com wrote:
> > > Hi Etienne,
> > > I see your point. I'm a bit worried that every ParDo has to be wrapped in 
> > > a `mapPartition` which introduces cost
> > > of serde and forgoes the benefits of Dataset API. 
> > > Maybe Dataset is not the best idea to integrate Beam with Spark. Just my 
> > > $0.02. 
> > > 
> > > Manu
> > >  
> > > On Thu, Jan 17, 2019 at 10:44 PM Etienne Chauchot <echauc...@apache.org> 
> > > wrote:
> > > > Hi Manu,Yes a json schema can make its way to the spark source with no 
> > > > difficulty. but still we need to store
> > > > windowedValue not only the elements that would comply with this schema. 
> > > > The problem is that spark will try to
> > > > match the element (windowedValue) to the schema of the source at any 
> > > > element wise processing. (and downstream it
> > > > will auto guess the schema with the content of dataset. For example if 
> > > > I extract timestamp in a pardo I get a
> > > > Long schema in the output dataset). The problem is that windowedValue 
> > > > is complex and has many subclasses. Maybe
> > > > bytes  serialization is still the best way to go, but we don't leverage 
> > > > schema PCollections. BestEtienne
> > > > Le jeudi 17 janvier 2019 à 21:52 +0800, Manu Zhang a écrit :
> > > > > Nice Try, Etienne ! Is it possible to pass in the schema through 
> > > > > pipeline options ?
> > > > > Manu 
> > > > > On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot 
> > > > > <echauc...@apache.org> wrote:
> > > > > > Hi Kenn,
> > > > > > Sure, in spark DataSourceV2 providing a schema is mandatory:- if I 
> > > > > > set it to null, I obviously get a NPE- if
> > > > > > I set it empty: I get an array out of bounds exception- if I set it 
> > > > > > to Datatype.Null, null is stored as
> > > > > > actual elements => Consequently I set it to binary.
> > > > > > As the beam reader is responsible for reading both the element and 
> > > > > > the timestamp, the source outputs a
> > > > > > Dataset<WindowedValue>. So, the solution I found, for which I asked 
> > > > > > your opinion, is to serialize
> > > > > > windowedValue to bytes using beam FullWindowedValueCoder in 
> > > > > > reader.get() and deserialize the whole dataset
> > > > > > once the source is done using a map to get the windowedValue back 
> > > > > > and give it to the transforms downstream.
> > > > > > I am aware that this is not optimal because of the bytes 
> > > > > > serialization roundtrip, and I wanted your
> > > > > > suggestions around that.
> > > > > > ThanksEtienne
> > > > > > 
> > > > > > Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a écrit :
> > > > > > > Cool!
> > > > > > > I don't quite understand the issue in "bytes serialization to 
> > > > > > > comply to spark dataset schemas to store
> > > > > > > windowedValues". Can you say a little more?
> > > > > > > 
> > > > > > > Kenn
> > > > > > > On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot 
> > > > > > > <echauc...@apache.org> wrote:
> > > > > > > > Hi guys,
> > > > > > > > regarding the new (made from scratch) spark runner POC based on 
> > > > > > > > the dataset API, I was able to make a
> > > > > > > > big step forward: it can now run a first batch pipeline with a 
> > > > > > > > source !
> > > > > > > > 
> > > > > > > > See 
> > > > > > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
> > > > > > > > 
> > > > > > > > there is no test facilities for now, testmode is enabled and it 
> > > > > > > > just prints the output PCollection .
> > > > > > > > 
> > > > > > > > I made some workarounds especially String serialization to pass 
> > > > > > > > beam objects (was forced to) and also
> > > > > > > > bytes serialization to comply to spark dataset schemas to store 
> > > > > > > > windowedValues.
> > > > > > > > 
> > > > > > > > Can you give me your thoughts especially regarding these last 2 
> > > > > > > > matters?
> > > > > > > > 
> > > > > > > > The other parts are not ready for showing yet
> > > > > > > > 
> > > > > > > > Here is the whole branch:
> > > > > > > > 
> > > > > > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming
> > > > > > > > 
> > > > > > > > Thanks,
> > > > > > > > 
> > > > > > > > Etienne
> > > > > > > > 
> > > > > > > > 
> 
> 

Reply via email to