[Apache Beam] Custom DataSourceV2 instanciation: parameters passing and Encoders

2018-12-11 Thread Etienne Chauchot
Hi Spark guys,

I'm Etienne Chauchot and I'm a committer on the Apache Beam project. 

We have what we call runners. They are pieces of software that translate 
pipelines written using Beam API into pipelines
that use native execution engine API. Currently, the Spark runner uses old RDD 
/ DStream APIs. 
I'm writing a new runner that will use structured streaming (but not continuous 
processing, and also no schema for now).

I am just starting. I'm currently trying to map our sources to yours. I'm 
targeting new DataSourceV2 API. It maps pretty
well with Beam sources but I have a problem with instanciation of the custom 
source.
I searched for an answer in stack-overflow and user ML with no luck. I guess it 
is a too specific question:

When visiting Beam DAG I have access to Beam objects such as Source and Reader 
that I need to map to MicroBatchReader
and InputPartitionReader.
As far as I understand, a custom DataSourceV2 is instantiated automatically by 
spark thanks to
sparkSession.readStream().format(providerClassName) or similar code. The 
problem is that I can only pass options of
primitive types + String so I cannot pass the Beam Source to DataSourceV2. 
=> Is there a way to do so ?


Also I get as an output a Dataset. The Row contains an instance of Beam 
WindowedValue, T is the type parameter
of the Source. I  do a map on the Dataset to transform it to a 
Dataset>. I have a question related to
the Encoder: 
=> how to properly create an Encoder for the generic type WindowedValue to 
use in the map?

Here is the code:
https://github.com/apache/beam/tree/spark-runner_structured-streaming

And more specially:

https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java

https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java

Thanks,

Etienne








Re: [Apache Beam] Custom DataSourceV2 instanciation: parameters passing and Encoders

2018-12-18 Thread Etienne Chauchot
Hi everyone, 
Does anyone have comments on this question? 
CCing user ML
ThanksEtienne
Le mardi 11 décembre 2018 à 19:02 +0100, Etienne Chauchot a écrit :
> Hi Spark guys,
> I'm Etienne Chauchot and I'm a committer on the Apache Beam project. 
> We have what we call runners. They are pieces of software that translate 
> pipelines written using Beam API into
> pipelines that use native execution engine API. Currently, the Spark runner 
> uses old RDD / DStream APIs. I'm writing a
> new runner that will use structured streaming (but not continuous processing, 
> and also no schema for now).
> I am just starting. I'm currently trying to map our sources to yours. I'm 
> targeting new DataSourceV2 API. It maps
> pretty well with Beam sources but I have a problem with instanciation of the 
> custom source.I searched for an answer in
> stack-overflow and user ML with no luck. I guess it is a too specific 
> question:
> When visiting Beam DAG I have access to Beam objects such as Source and 
> Reader that I need to map to MicroBatchReader
> and InputPartitionReader.As far as I understand, a custom DataSourceV2 is 
> instantiated automatically by spark thanks
> to sparkSession.readStream().format(providerClassName) or similar code. The 
> problem is that I can only pass options of
> primitive types + String so I cannot pass the Beam Source to DataSourceV2. => 
> Is there a way to do so ?
> 
> Also I get as an output a Dataset. The Row contains an instance of Beam 
> WindowedValue, T is the type parameter
> of the Source. I  do a map on the Dataset to transform it to a 
> Dataset>. I have a question related to
> the Encoder: => how to properly create an Encoder for the generic type 
> WindowedValue to use in the map?
> Here is the 
> code:https://github.com/apache/beam/tree/spark-runner_structured-streaming
> And more specially:
> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.javahttps://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
> Thanks,
> Etienne
> 
> 
> 
> 
> 
> 


CombinePerKey and GroupByKey

2019-02-28 Thread Etienne Chauchot
Hi all,

I'm migrating RDD pipelines to Dataset and I saw that Combine.PerKey is no more 
there in the Dataset API.  So, I
translated it to:


KeyValueGroupedDataset> groupedDataset =
keyedDataset.groupByKey(KVHelpers.extractKey(), 
EncoderHelpers.genericEncoder());

Dataset> combinedDataset =
groupedDataset.agg(
new Aggregator, AccumT, OutputT>(combineFn).toColumn());

I have an interrogation regarding performance : as GroupByKey is generally less 
performant (entails shuffle and possible
OOM if a given key has a lot of data associated to it), I was wondering if the 
new spark optimizer translates such a DAG
into a combinePerKey behind the scene. In other words, is such a DAG going to 
be translated to a local (or partial I
don't know what terminology you use) combine and then a global combine to avoid 
shuffle?

Thanks

Etienne


Re: CombinePerKey and GroupByKey

2019-03-01 Thread Etienne Chauchot
That's good to know
Thanks
Etienne
Le jeudi 28 février 2019 à 10:05 -0800, Reynold Xin a écrit :
> This should be fine. Dataset.groupByKey is a logical operation, not a 
> physical one (as in Spark wouldn’t always
> materialize all the groups in memory). 
> On Thu, Feb 28, 2019 at 1:46 AM Etienne Chauchot  wrote:
> > Hi all,
> > 
> > I'm migrating RDD pipelines to Dataset and I saw that Combine.PerKey is no 
> > more there in the Dataset API.  So, I
> > translated it to:
> > 
> > 
> > KeyValueGroupedDataset> groupedDataset =
> > keyedDataset.groupByKey(KVHelpers.extractKey(), 
> > EncoderHelpers.genericEncoder());
> > 
> > Dataset> combinedDataset =
> > groupedDataset.agg(
> > new Aggregator, AccumT, 
> > OutputT>(combineFn).toColumn());
> > 
> > I have an interrogation regarding performance : as GroupByKey is generally 
> > less performant (entails shuffle and
> > possible OOM if a given key has a lot of data associated to it), I was 
> > wondering if the new spark optimizer
> > translates such a DAG into a combinePerKey behind the scene. In other 
> > words, is such a DAG going to be translated to
> > a local (or partial I don't know what terminology you use) combine and then 
> > a global combine to avoid shuffle?
> > 
> > Thanks
> > Etienne


Re: What's the root cause of not supporting multiple aggregations in structured streaming?

2020-08-31 Thread Etienne Chauchot

Hi all,

I'm also very interested in this feature but the PR is open since 
January 2019 and was not updated. It raised a design discussion around 
watermarks and a design doc was written 
(https://docs.google.com/document/d/1IAH9UQJPUiUCLd7H6dazRK2k1szDX38SnM6GVNZYvUo/edit#heading=h.npkueh4bbkz1). 
We also commented this design but no matter what it seems that the 
subject is still stale.


Is there any interest in the community in delivering this feature or is 
it considered worthless ? If the latter, can you explain why ?


Best

Etienne

On 22/05/2019 03:38, 张万新 wrote:

Thanks, I'll check it out.

Arun Mahadevan mailto:ar...@apache.org>> 于 
2019年5月21日周二 01:31写道:


Heres the proposal for supporting it in "append" mode -
https://github.com/apache/spark/pull/23576. You could see if it
addresses your requirement and post your feedback in the PR.
For "update" mode its going to be much harder to support this
without first adding support for "retractions", otherwise we would
end up with wrong results.

- Arun


On Mon, 20 May 2019 at 01:34, Gabor Somogyi
mailto:gabor.g.somo...@gmail.com>> wrote:

There is PR for this but not yet merged.

On Mon, May 20, 2019 at 10:13 AM 张万新 mailto:kevinzwx1...@gmail.com>> wrote:

Hi there,

I'd like to know what's the root reason why multiple
aggregations on streaming dataframe is not allowed since
it's a very useful feature, and flink has supported it for
a long time.

Thanks.



Re: What's the root cause of not supporting multiple aggregations in structured streaming?

2020-09-04 Thread Etienne Chauchot

Hi Jungtaek Lim,

Nice to hear from you again since last time we talked :) and congrats on 
becoming a Spark committer in the meantime ! (if I'm not mistaking you 
were not at the time)


I totally agree with what you're saying on merging structural parts of 
Spark without having a broader consensus. What I don't understand is why 
there is not more investment in SS. Especially because in another thread 
the community is discussing about deprecating the regular DStream 
streaming framework.


Is the orientation of Spark now mostly batch ?

PS: yeah I saw your update on the doc when I took a look at 3.0 preview 
2 searching for this particular feature. And regarding the workaround, 
I'm not sure it meets my needs as it will add delays and also may mess 
up with watermarks.


Best

Etienne Chauchot


On 04/09/2020 08:06, Jungtaek Lim wrote:
Unfortunately I don't see enough active committers working on 
Structured Streaming; I don't expect major features/improvements can 
be brought in this situation.


Technically I can review and merge the PR on major improvements in SS, 
but that depends on how huge the proposal is changing. If the proposal 
brings conceptual change, being reviewed by a committer wouldn't still 
be enough.


So that's not due to the fact we think it's worthless. (That might be 
only me though.) I'd understand as there's not much investment on SS. 
There's also a known workaround for multiple aggregations (I've 
documented in the SS guide doc, in "Limitation of global watermark" 
section), though I totally agree the workaround is bad.


On Tue, Sep 1, 2020 at 12:28 AM Etienne Chauchot <mailto:echauc...@apache.org>> wrote:


Hi all,

I'm also very interested in this feature but the PR is open since
January 2019 and was not updated. It raised a design discussion
around watermarks and a design doc was written

(https://docs.google.com/document/d/1IAH9UQJPUiUCLd7H6dazRK2k1szDX38SnM6GVNZYvUo/edit#heading=h.npkueh4bbkz1).
We also commented this design but no matter what it seems that the
subject is still stale.

Is there any interest in the community in delivering this feature
or is it considered worthless ? If the latter, can you explain why ?

Best

Etienne

On 22/05/2019 03:38, 张万新 wrote:

Thanks, I'll check it out.

Arun Mahadevan mailto:ar...@apache.org>> 于
2019年5月21日周二 01:31写道:

Heres the proposal for supporting it in "append" mode -
https://github.com/apache/spark/pull/23576. You could see if
it addresses your requirement and post your feedback in the PR.
For "update" mode its going to be much harder to support this
without first adding support for "retractions", otherwise we
would end up with wrong results.

- Arun


On Mon, 20 May 2019 at 01:34, Gabor Somogyi
mailto:gabor.g.somo...@gmail.com>> wrote:

There is PR for this but not yet merged.

On Mon, May 20, 2019 at 10:13 AM 张万新
mailto:kevinzwx1...@gmail.com>>
wrote:

Hi there,

I'd like to know what's the root reason why multiple
aggregations on streaming dataframe is not allowed
since it's a very useful feature, and flink has
supported it for a long time.

Thanks.



Re: What's the root cause of not supporting multiple aggregations in structured streaming?

2020-11-26 Thread Etienne Chauchot

Hi,

Regarding this subject I wrote a blog article that gives details about 
the watermark architecture proposal that was discussed in the design doc 
and in the PR:


https://echauchot.blogspot.com/2020/11/watermark-architecture-proposal-for.html

Best

Etienne

On 29/09/2020 03:24, Yuanjian Li wrote:

Thanks for the great discussion!

Also interested in this feature and did some investigation before. As 
Arun mentioned, similar to the "update" mode, the "complete" mode also 
needs more design. We might need an operation level output mode for 
the complete mode support. That is to say, if we use "complete" mode 
for every aggregation operators, the wrong result will return.


SPARK-26655 would be a good start, which only considers about "append" 
mode. Maybe we need more discussion on the watermark interface. I will 
take a close look at the doc and PR. Hope we will have the first 
version with limitations and fix/remove them gradually.


Best,
Yuanjian

Jungtaek Lim <mailto:kabhwan.opensou...@gmail.com>> 于2020年9月26日周六 上午10:31写道:


Thanks Etienne! Yeah I forgot to say nice talking with you again.
And sorry I forgot to send the reply (was in draft).

Regarding investment in SS, well, unfortunately I don't know - I'm
just an individual. There might be various reasons to do so, most
probably "priority" among the stuff. There's not much I could change.

I agree the workaround is sub-optimal, but unless I see sufficient
support in the community I probably couldn't make it go forward.
I'll just say there's an elephant in the room - as the project
goes forward for more than 10 years, backward compatibility is a
top priority concern in the project, even across the major
versions along the features/APIs. It is great for end users to
migrate the version easily, but also blocks devs to fix the bad
design once it ships. I'm the one complaining about these issues
in the dev list, and I don't see willingness to correct them.


On Fri, Sep 4, 2020 at 5:55 PM Etienne Chauchot
mailto:echauc...@apache.org>> wrote:

Hi Jungtaek Lim,

Nice to hear from you again since last time we talked :) and
congrats on becoming a Spark committer in the meantime ! (if
I'm not mistaking you were not at the time)

I totally agree with what you're saying on merging structural
parts of Spark without having a broader consensus. What I
don't understand is why there is not more investment in SS.
Especially because in another thread the community is
discussing about deprecating the regular DStream streaming
framework.

Is the orientation of Spark now mostly batch ?

PS: yeah I saw your update on the doc when I took a look at
3.0 preview 2 searching for this particular feature. And
regarding the workaround, I'm not sure it meets my needs as it
will add delays and also may mess up with watermarks.

Best

Etienne Chauchot


On 04/09/2020 08:06, Jungtaek Lim wrote:

Unfortunately I don't see enough active committers working on
Structured Streaming; I don't expect major
features/improvements can be brought in this situation.

Technically I can review and merge the PR on major
improvements in SS, but that depends on how huge the proposal
is changing. If the proposal brings conceptual change, being
reviewed by a committer wouldn't still be enough.

So that's not due to the fact we think it's worthless. (That
might be only me though.) I'd understand as there's not much
investment on SS. There's also a known workaround for
multiple aggregations (I've documented in the SS guide doc,
in "Limitation of global watermark" section), though I
totally agree the workaround is bad.

On Tue, Sep 1, 2020 at 12:28 AM Etienne Chauchot
mailto:echauc...@apache.org>> wrote:

Hi all,

I'm also very interested in this feature but the PR is
open since January 2019 and was not updated. It raised a
design discussion around watermarks and a design doc was
written

(https://docs.google.com/document/d/1IAH9UQJPUiUCLd7H6dazRK2k1szDX38SnM6GVNZYvUo/edit#heading=h.npkueh4bbkz1).
We also commented this design but no matter what it seems
that the subject is still stale.

Is there any interest in the community in delivering this
feature or is it considered worthless ? If the latter,
can you explain why ?

Best

Etienne

On 22/05/2019 03:38, 张万新 wrote:

Thanks, I'll check it out.