[Apache Beam] Custom DataSourceV2 instanciation: parameters passing and Encoders
Hi Spark guys, I'm Etienne Chauchot and I'm a committer on the Apache Beam project. We have what we call runners. They are pieces of software that translate pipelines written using Beam API into pipelines that use native execution engine API. Currently, the Spark runner uses old RDD / DStream APIs. I'm writing a new runner that will use structured streaming (but not continuous processing, and also no schema for now). I am just starting. I'm currently trying to map our sources to yours. I'm targeting new DataSourceV2 API. It maps pretty well with Beam sources but I have a problem with instanciation of the custom source. I searched for an answer in stack-overflow and user ML with no luck. I guess it is a too specific question: When visiting Beam DAG I have access to Beam objects such as Source and Reader that I need to map to MicroBatchReader and InputPartitionReader. As far as I understand, a custom DataSourceV2 is instantiated automatically by spark thanks to sparkSession.readStream().format(providerClassName) or similar code. The problem is that I can only pass options of primitive types + String so I cannot pass the Beam Source to DataSourceV2. => Is there a way to do so ? Also I get as an output a Dataset. The Row contains an instance of Beam WindowedValue, T is the type parameter of the Source. I do a map on the Dataset to transform it to a Dataset>. I have a question related to the Encoder: => how to properly create an Encoder for the generic type WindowedValue to use in the map? Here is the code: https://github.com/apache/beam/tree/spark-runner_structured-streaming And more specially: https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java Thanks, Etienne
Re: [Apache Beam] Custom DataSourceV2 instanciation: parameters passing and Encoders
Hi everyone, Does anyone have comments on this question? CCing user ML ThanksEtienne Le mardi 11 décembre 2018 à 19:02 +0100, Etienne Chauchot a écrit : > Hi Spark guys, > I'm Etienne Chauchot and I'm a committer on the Apache Beam project. > We have what we call runners. They are pieces of software that translate > pipelines written using Beam API into > pipelines that use native execution engine API. Currently, the Spark runner > uses old RDD / DStream APIs. I'm writing a > new runner that will use structured streaming (but not continuous processing, > and also no schema for now). > I am just starting. I'm currently trying to map our sources to yours. I'm > targeting new DataSourceV2 API. It maps > pretty well with Beam sources but I have a problem with instanciation of the > custom source.I searched for an answer in > stack-overflow and user ML with no luck. I guess it is a too specific > question: > When visiting Beam DAG I have access to Beam objects such as Source and > Reader that I need to map to MicroBatchReader > and InputPartitionReader.As far as I understand, a custom DataSourceV2 is > instantiated automatically by spark thanks > to sparkSession.readStream().format(providerClassName) or similar code. The > problem is that I can only pass options of > primitive types + String so I cannot pass the Beam Source to DataSourceV2. => > Is there a way to do so ? > > Also I get as an output a Dataset. The Row contains an instance of Beam > WindowedValue, T is the type parameter > of the Source. I do a map on the Dataset to transform it to a > Dataset>. I have a question related to > the Encoder: => how to properly create an Encoder for the generic type > WindowedValue to use in the map? > Here is the > code:https://github.com/apache/beam/tree/spark-runner_structured-streaming > And more specially: > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.javahttps://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java > Thanks, > Etienne > > > > > >
CombinePerKey and GroupByKey
Hi all, I'm migrating RDD pipelines to Dataset and I saw that Combine.PerKey is no more there in the Dataset API. So, I translated it to: KeyValueGroupedDataset> groupedDataset = keyedDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder()); Dataset> combinedDataset = groupedDataset.agg( new Aggregator, AccumT, OutputT>(combineFn).toColumn()); I have an interrogation regarding performance : as GroupByKey is generally less performant (entails shuffle and possible OOM if a given key has a lot of data associated to it), I was wondering if the new spark optimizer translates such a DAG into a combinePerKey behind the scene. In other words, is such a DAG going to be translated to a local (or partial I don't know what terminology you use) combine and then a global combine to avoid shuffle? Thanks Etienne
Re: CombinePerKey and GroupByKey
That's good to know Thanks Etienne Le jeudi 28 février 2019 à 10:05 -0800, Reynold Xin a écrit : > This should be fine. Dataset.groupByKey is a logical operation, not a > physical one (as in Spark wouldn’t always > materialize all the groups in memory). > On Thu, Feb 28, 2019 at 1:46 AM Etienne Chauchot wrote: > > Hi all, > > > > I'm migrating RDD pipelines to Dataset and I saw that Combine.PerKey is no > > more there in the Dataset API. So, I > > translated it to: > > > > > > KeyValueGroupedDataset> groupedDataset = > > keyedDataset.groupByKey(KVHelpers.extractKey(), > > EncoderHelpers.genericEncoder()); > > > > Dataset> combinedDataset = > > groupedDataset.agg( > > new Aggregator, AccumT, > > OutputT>(combineFn).toColumn()); > > > > I have an interrogation regarding performance : as GroupByKey is generally > > less performant (entails shuffle and > > possible OOM if a given key has a lot of data associated to it), I was > > wondering if the new spark optimizer > > translates such a DAG into a combinePerKey behind the scene. In other > > words, is such a DAG going to be translated to > > a local (or partial I don't know what terminology you use) combine and then > > a global combine to avoid shuffle? > > > > Thanks > > Etienne
Re: What's the root cause of not supporting multiple aggregations in structured streaming?
Hi all, I'm also very interested in this feature but the PR is open since January 2019 and was not updated. It raised a design discussion around watermarks and a design doc was written (https://docs.google.com/document/d/1IAH9UQJPUiUCLd7H6dazRK2k1szDX38SnM6GVNZYvUo/edit#heading=h.npkueh4bbkz1). We also commented this design but no matter what it seems that the subject is still stale. Is there any interest in the community in delivering this feature or is it considered worthless ? If the latter, can you explain why ? Best Etienne On 22/05/2019 03:38, 张万新 wrote: Thanks, I'll check it out. Arun Mahadevan mailto:ar...@apache.org>> 于 2019年5月21日周二 01:31写道: Heres the proposal for supporting it in "append" mode - https://github.com/apache/spark/pull/23576. You could see if it addresses your requirement and post your feedback in the PR. For "update" mode its going to be much harder to support this without first adding support for "retractions", otherwise we would end up with wrong results. - Arun On Mon, 20 May 2019 at 01:34, Gabor Somogyi mailto:gabor.g.somo...@gmail.com>> wrote: There is PR for this but not yet merged. On Mon, May 20, 2019 at 10:13 AM 张万新 mailto:kevinzwx1...@gmail.com>> wrote: Hi there, I'd like to know what's the root reason why multiple aggregations on streaming dataframe is not allowed since it's a very useful feature, and flink has supported it for a long time. Thanks.
Re: What's the root cause of not supporting multiple aggregations in structured streaming?
Hi Jungtaek Lim, Nice to hear from you again since last time we talked :) and congrats on becoming a Spark committer in the meantime ! (if I'm not mistaking you were not at the time) I totally agree with what you're saying on merging structural parts of Spark without having a broader consensus. What I don't understand is why there is not more investment in SS. Especially because in another thread the community is discussing about deprecating the regular DStream streaming framework. Is the orientation of Spark now mostly batch ? PS: yeah I saw your update on the doc when I took a look at 3.0 preview 2 searching for this particular feature. And regarding the workaround, I'm not sure it meets my needs as it will add delays and also may mess up with watermarks. Best Etienne Chauchot On 04/09/2020 08:06, Jungtaek Lim wrote: Unfortunately I don't see enough active committers working on Structured Streaming; I don't expect major features/improvements can be brought in this situation. Technically I can review and merge the PR on major improvements in SS, but that depends on how huge the proposal is changing. If the proposal brings conceptual change, being reviewed by a committer wouldn't still be enough. So that's not due to the fact we think it's worthless. (That might be only me though.) I'd understand as there's not much investment on SS. There's also a known workaround for multiple aggregations (I've documented in the SS guide doc, in "Limitation of global watermark" section), though I totally agree the workaround is bad. On Tue, Sep 1, 2020 at 12:28 AM Etienne Chauchot <mailto:echauc...@apache.org>> wrote: Hi all, I'm also very interested in this feature but the PR is open since January 2019 and was not updated. It raised a design discussion around watermarks and a design doc was written (https://docs.google.com/document/d/1IAH9UQJPUiUCLd7H6dazRK2k1szDX38SnM6GVNZYvUo/edit#heading=h.npkueh4bbkz1). We also commented this design but no matter what it seems that the subject is still stale. Is there any interest in the community in delivering this feature or is it considered worthless ? If the latter, can you explain why ? Best Etienne On 22/05/2019 03:38, 张万新 wrote: Thanks, I'll check it out. Arun Mahadevan mailto:ar...@apache.org>> 于 2019年5月21日周二 01:31写道: Heres the proposal for supporting it in "append" mode - https://github.com/apache/spark/pull/23576. You could see if it addresses your requirement and post your feedback in the PR. For "update" mode its going to be much harder to support this without first adding support for "retractions", otherwise we would end up with wrong results. - Arun On Mon, 20 May 2019 at 01:34, Gabor Somogyi mailto:gabor.g.somo...@gmail.com>> wrote: There is PR for this but not yet merged. On Mon, May 20, 2019 at 10:13 AM 张万新 mailto:kevinzwx1...@gmail.com>> wrote: Hi there, I'd like to know what's the root reason why multiple aggregations on streaming dataframe is not allowed since it's a very useful feature, and flink has supported it for a long time. Thanks.
Re: What's the root cause of not supporting multiple aggregations in structured streaming?
Hi, Regarding this subject I wrote a blog article that gives details about the watermark architecture proposal that was discussed in the design doc and in the PR: https://echauchot.blogspot.com/2020/11/watermark-architecture-proposal-for.html Best Etienne On 29/09/2020 03:24, Yuanjian Li wrote: Thanks for the great discussion! Also interested in this feature and did some investigation before. As Arun mentioned, similar to the "update" mode, the "complete" mode also needs more design. We might need an operation level output mode for the complete mode support. That is to say, if we use "complete" mode for every aggregation operators, the wrong result will return. SPARK-26655 would be a good start, which only considers about "append" mode. Maybe we need more discussion on the watermark interface. I will take a close look at the doc and PR. Hope we will have the first version with limitations and fix/remove them gradually. Best, Yuanjian Jungtaek Lim <mailto:kabhwan.opensou...@gmail.com>> 于2020年9月26日周六 上午10:31写道: Thanks Etienne! Yeah I forgot to say nice talking with you again. And sorry I forgot to send the reply (was in draft). Regarding investment in SS, well, unfortunately I don't know - I'm just an individual. There might be various reasons to do so, most probably "priority" among the stuff. There's not much I could change. I agree the workaround is sub-optimal, but unless I see sufficient support in the community I probably couldn't make it go forward. I'll just say there's an elephant in the room - as the project goes forward for more than 10 years, backward compatibility is a top priority concern in the project, even across the major versions along the features/APIs. It is great for end users to migrate the version easily, but also blocks devs to fix the bad design once it ships. I'm the one complaining about these issues in the dev list, and I don't see willingness to correct them. On Fri, Sep 4, 2020 at 5:55 PM Etienne Chauchot mailto:echauc...@apache.org>> wrote: Hi Jungtaek Lim, Nice to hear from you again since last time we talked :) and congrats on becoming a Spark committer in the meantime ! (if I'm not mistaking you were not at the time) I totally agree with what you're saying on merging structural parts of Spark without having a broader consensus. What I don't understand is why there is not more investment in SS. Especially because in another thread the community is discussing about deprecating the regular DStream streaming framework. Is the orientation of Spark now mostly batch ? PS: yeah I saw your update on the doc when I took a look at 3.0 preview 2 searching for this particular feature. And regarding the workaround, I'm not sure it meets my needs as it will add delays and also may mess up with watermarks. Best Etienne Chauchot On 04/09/2020 08:06, Jungtaek Lim wrote: Unfortunately I don't see enough active committers working on Structured Streaming; I don't expect major features/improvements can be brought in this situation. Technically I can review and merge the PR on major improvements in SS, but that depends on how huge the proposal is changing. If the proposal brings conceptual change, being reviewed by a committer wouldn't still be enough. So that's not due to the fact we think it's worthless. (That might be only me though.) I'd understand as there's not much investment on SS. There's also a known workaround for multiple aggregations (I've documented in the SS guide doc, in "Limitation of global watermark" section), though I totally agree the workaround is bad. On Tue, Sep 1, 2020 at 12:28 AM Etienne Chauchot mailto:echauc...@apache.org>> wrote: Hi all, I'm also very interested in this feature but the PR is open since January 2019 and was not updated. It raised a design discussion around watermarks and a design doc was written (https://docs.google.com/document/d/1IAH9UQJPUiUCLd7H6dazRK2k1szDX38SnM6GVNZYvUo/edit#heading=h.npkueh4bbkz1). We also commented this design but no matter what it seems that the subject is still stale. Is there any interest in the community in delivering this feature or is it considered worthless ? If the latter, can you explain why ? Best Etienne On 22/05/2019 03:38, 张万新 wrote: Thanks, I'll check it out.