Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Yushu Yao
Looks like it's a valid use case.
Wondering anyone can give some high level guidelines on how to implement
this?
I can give it a try.

-Yushu


On Tue, May 24, 2022 at 2:42 AM Jan Lukavský  wrote:

> +dev@beam 
> On 5/24/22 11:40, Jan Lukavský wrote:
>
> Hi,
> I think this feature is valid. Every runner for which Beam is not a
> 'native' SDK uses some form of translation context, which maps PCollection
> to internal representation of the particular SDK of the runner (RDD in this
> case). It should be possible to "import" an RDD into the specific runner
> via something like
>
>   SparkRunner runner = ;
>   PCollection<...> pCollection = runner.importRDD(rdd);
>
> and similarly
>
>   RDD<...> rdd = runner.exportRDD(pCollection);
>
> Yes, apparently this would be runner specific, but that is the point,
> actually. This would enable using features and libraries, that Beam does
> not have, or micro-optimize some particular step using runner-specific
> features, that we don't have in Beam. We actually had this feature (at
> least in a prototype) many years ago when Euphoria was a separate project.
>
>  Jan
> On 5/23/22 20:58, Alexey Romanenko wrote:
>
>
>
> On 23 May 2022, at 20:40, Brian Hulette  wrote:
>
> Yeah I'm not sure of any simple way to do this. I wonder if it's worth
> considering building some Spark runner-specific feature around this, or at
> least packaging up Robert's proposed solution?
>
>
> I’m not sure that a runner specific feature is a good way to do this since
> the other runners won’t be able to support it or I’m missing something?
>
> There could be other interesting integrations in this space too, e.g.
> using Spark RDDs as a cache for Interactive Beam.
>
>
> Another option could be to add something like SparkIO (or
> FlinkIO/whatever) to read/write data from/to Spark data structures for such
> cases (Spark schema to Beam schema convention also could be supported). And
> dreaming a bit more, for those who need to have a mixed pipeline (e.g.
> Spark + Beam) such connectors could support the push-downs of pure Spark
> pipelines and then use the result downstream in Beam.
>
> —
> Alexey
>
>
>
> Brian
>
> On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw 
> wrote:
>
>> The easiest way to do this would be to write the RDD somewhere then
>> read it from Beam.
>>
>> On Mon, May 23, 2022 at 9:39 AM Yushu Yao  wrote:
>> >
>> > Hi Folks,
>> >
>> > I know this is not the optimal way to use beam :-) But assume I only
>> use the spark runner.
>> >
>> > I have a spark library (very complex) that emits a spark dataframe (or
>> RDD).
>> > I also have an existing complex beam pipeline that can do post
>> processing on the data inside the dataframe.
>> >
>> > However, the beam part needs a pcollection to start with. The question
>> is, how can I convert a spark RDD into a pcollection?
>> >
>> > Thanks
>> > -Yushu
>> >
>>
>
>


Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Moritz Mack
Hi Yushu,

Have a look at org.apache.beam.runners.spark.translation.EvaluationContext in 
the Spark runner. It maintains that mapping between PCollections and RDDs 
(wrapped in the BoundedDataset helper). As Reuven just pointed out, values are 
timestamped (and windowed) in Beam, therefore BoundedDataset expects a 
JavaRDD>.
The idea is to map your external RDD to a new PCollection 
(PCollection.createPrimitiveOutputInternal) in the EvaluationContext (and vice 
versa). You can then apply Beam transforms to that PCollection (and with that 
effectively to the mapped RDD) as you are used to. Obviously, there’s a few 
steps necessary as EvaluationContext isn’t easily accessible from the outside.

Just having a quick look, creating a RddSource for Spark RDDs seems also not 
too bad. That would allow you to do something like this:
pipeline.apply(Read.from(new RddSource<>(javaRdd.rdd(), coder)));

Though I haven’t done much testing beyond a quick experiment. One notable 
disadvantage of that approach Is that all RDD partition data must be 
broadcasted to all workers to then pick the right partition. This should mostly 
be fine, but some types of partitions carry data as well …
https://gist.github.com/mosche/5c1ef8ba281a9a08df1ec67fac700d03

/Moritz

On 24.05.22, 16:46, "Yushu Yao"  wrote:

Looks like it's a valid use case. Wondering anyone can give some high level 
guidelines on how to implement this? I can give it a try. -Yushu On Tue, May 
24, 2022 at 2:42 AM Jan Lukavský  wrote: ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ 
‍ ‍ ‍ ‍ ‍ ‍ ‍
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Looks like it's a valid use case.
Wondering anyone can give some high level guidelines on how to implement this?
I can give it a try.

-Yushu


On Tue, May 24, 2022 at 2:42 AM Jan Lukavský 
mailto:je...@seznam.cz>> wrote:

+dev@beam
On 5/24/22 11:40, Jan Lukavský wrote:

Hi,
I think this feature is valid. Every runner for which Beam is not a 'native' 
SDK uses some form of translation context, which maps PCollection to internal 
representation of the particular SDK of the runner (RDD in this case). It 
should be possible to "import" an RDD into the specific runner via something 
like

  SparkRunner runner = ;
  PCollection<...> pCollection = runner.importRDD(rdd);

and similarly

  RDD<...> rdd = runner.exportRDD(pCollection);

Yes, apparently this would be runner specific, but that is the point, actually. 
This would enable using features and libraries, that Beam does not have, or 
micro-optimize some particular step using runner-specific features, that we 
don't have in Beam. We actually had this feature (at least in a prototype) many 
years ago when Euphoria was a separate project.

 Jan
On 5/23/22 20:58, Alexey Romanenko wrote:



On 23 May 2022, at 20:40, Brian Hulette 
mailto:bhule...@google.com>> wrote:

Yeah I'm not sure of any simple way to do this. I wonder if it's worth 
considering building some Spark runner-specific feature around this, or at 
least packaging up Robert's proposed solution?

I’m not sure that a runner specific feature is a good way to do this since the 
other runners won’t be able to support it or I’m missing something?

There could be other interesting integrations in this space too, e.g. using 
Spark RDDs as a cache for Interactive Beam.

Another option could be to add something like SparkIO (or FlinkIO/whatever) to 
read/write data from/to Spark data structures for such cases (Spark schema to 
Beam schema convention also could be supported). And dreaming a bit more, for 
those who need to have a mixed pipeline (e.g. Spark + Beam) such connectors 
could support the push-downs of pure Spark pipelines and then use the result 
downstream in Beam.

—
Alexey




Brian

On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw 
mailto:rober...@google.com>> wrote:
The easiest way to do this would be to write the RDD somewhere then
read it from Beam.

On Mon, May 23, 2022 at 9:39 AM Yushu Yao 
mailto:yao.yu...@gmail.com>> wrote:
>
> Hi Folks,
>
> I know this is not the optimal way to use beam :-) But assume I only use the 
> spark runner.
>
> I have a spark library (very complex) that emits a spark dataframe (or RDD).
> I also have an existing complex beam pipeline that can do post processing on 
> the data inside the dataframe.
>
> However, the beam part needs a pcollection to start with. The question is, 
> how can I convert a spark RDD into a pcollection?
>
> Thanks
> -Yushu
>


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. 




Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Jan Lukavský

+dev@beam 

On 5/24/22 11:40, Jan Lukavský wrote:


Hi,
I think this feature is valid. Every runner for which Beam is not a 
'native' SDK uses some form of translation context, which maps 
PCollection to internal representation of the particular SDK of the 
runner (RDD in this case). It should be possible to "import" an RDD 
into the specific runner via something like


  SparkRunner runner = ;
  PCollection<...> pCollection = runner.importRDD(rdd);

and similarly

  RDD<...> rdd = runner.exportRDD(pCollection);

Yes, apparently this would be runner specific, but that is the point, 
actually. This would enable using features and libraries, that Beam 
does not have, or micro-optimize some particular step using 
runner-specific features, that we don't have in Beam. We actually had 
this feature (at least in a prototype) many years ago when Euphoria 
was a separate project.


 Jan

On 5/23/22 20:58, Alexey Romanenko wrote:




On 23 May 2022, at 20:40, Brian Hulette  wrote:

Yeah I'm not sure of any simple way to do this. I wonder if it's 
worth considering building some Spark runner-specific feature around 
this, or at least packaging up Robert's proposed solution?


I’m not sure that a runner specific feature is a good way to do this 
since the other runners won’t be able to support it or I’m missing 
something?


There could be other interesting integrations in this space too, 
e.g. using Spark RDDs as a cache for Interactive Beam.


Another option could be to add something like SparkIO (or 
FlinkIO/whatever) to read/write data from/to Spark data structures 
for such cases (Spark schema to Beam schema convention also could be 
supported). And dreaming a bit more, for those who need to have a 
mixed pipeline (e.g. Spark + Beam) such connectors could support the 
push-downs of pure Spark pipelines and then use the result downstream 
in Beam.


—
Alexey




Brian

On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw 
 wrote:


The easiest way to do this would be to write the RDD somewhere then
read it from Beam.

On Mon, May 23, 2022 at 9:39 AM Yushu Yao 
wrote:
>
> Hi Folks,
>
> I know this is not the optimal way to use beam :-) But assume
I only use the spark runner.
>
> I have a spark library (very complex) that emits a spark
dataframe (or RDD).
> I also have an existing complex beam pipeline that can do post
processing on the data inside the dataframe.
>
> However, the beam part needs a pcollection to start with. The
question is, how can I convert a spark RDD into a pcollection?
>
> Thanks
> -Yushu
>