Hi Ryan, You are right that the `LogicalWrite` mirrors the read side API. I just don't have a good naming yet, and write side changes will be a different PR.
Hi Hyukjin, That's my expectation, otherwise we keep rebasing the refactor PR and never get it done. On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <gurwls...@gmail.com> wrote: > BTW, do we hold Datasource V2 related PRs for now until we finish this > refactoring just for clarification? > > 2018년 9월 7일 (금) 오전 12:52, Ryan Blue <rb...@netflix.com.invalid>님이 작성: > >> Wenchen, >> >> I'm not really sure what you're proposing here. What is a `LogicalWrite`? >> Is it something that mirrors the read side in your PR? >> >> I think that I agree that if we have a Write independent of the Table >> that carries the commit and abort methods, then we can create it directly >> without a WriteConfig. So I tentatively agree with what you propose, >> assuming that I understand it correctly. >> >> rb >> >> On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <cloud0...@gmail.com> wrote: >> >>> I'm switching to my another Gmail account, let's see if it still gets >>> dropped this time. >>> >>> Hi Ryan, >>> >>> I'm thinking about the write path and feel the abstraction should be the >>> same. >>> >>> We still have logical and physical writing. And the table can create >>> different logical writing based on how to write. e.g., append, delete, >>> replaceWhere, etc. >>> >>> One thing I'm not sure about is the WriteConfig. With the WriteConfig, >>> the API would look like >>> trait Table { >>> WriteConfig newAppendWriteConfig(); >>> >>> WriteConfig newDeleteWriteConfig(deleteExprs); >>> >>> LogicalWrite newLogicalWrite(writeConfig); >>> } >>> >>> Without WriteConfig, the API looks like >>> trait Table { >>> LogicalWrite newAppendWrite(); >>> >>> LogicalWrite newDeleteWrite(deleteExprs); >>> } >>> >>> >>> It looks to me that the API is simpler without WriteConfig, what do you >>> think? >>> >>> Thanks, >>> Wenchen >>> >>> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <rb...@netflix.com.invalid> >>> wrote: >>> >>>> Latest from Wenchen in case it was dropped. >>>> >>>> ---------- Forwarded message --------- >>>> From: Wenchen Fan <wenc...@databricks.com> >>>> Date: Mon, Sep 3, 2018 at 6:16 AM >>>> Subject: Re: data source api v2 refactoring >>>> To: <mri...@gmail.com> >>>> Cc: Ryan Blue <rb...@netflix.com>, Reynold Xin <r...@databricks.com>, < >>>> dev@spark.apache.org> >>>> >>>> >>>> Hi Mridul, >>>> >>>> I'm not sure what's going on, my email was CC'ed to the dev list. >>>> >>>> >>>> Hi Ryan, >>>> >>>> The logical and physical scan idea sounds good. To add more color >>>> to Jungtaek's question, both micro-batch and continuous mode have >>>> the logical and physical scan, but there is a difference: for micro-batch >>>> mode, a physical scan outputs data for one epoch, but it's not true for >>>> continuous mode. >>>> >>>> I'm not sure if it's necessary to include streaming epoch in the API >>>> abstraction, for features like metrics reporting. >>>> >>>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <mri...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously >>>>> Ryan did :-) ) >>>>> I did not see it in the mail thread I received or in archives ... [1] >>>>> Wondering which othersenderswere getting dropped (if yes). >>>>> >>>>> Regards >>>>> Mridul >>>>> >>>>> [1] >>>>> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html >>>>> >>>>> >>>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid> >>>>> wrote: >>>>> >>>>>> Thanks for clarifying, Wenchen. I think that's what I expected. >>>>>> >>>>>> As for the abstraction, here's the way that I think about it: there >>>>>> are two important parts of a scan: the definition of what will be read, >>>>>> and >>>>>> task sets that actually perform the read. In batch, there's one >>>>>> definition >>>>>> of the scan and one task set so it makes sense that there's one scan >>>>>> object >>>>>> that encapsulates both of these concepts. For streaming, we need to >>>>>> separate the two into the definition of what will be read (the stream or >>>>>> streaming read) and the task sets that are run (scans). That way, the >>>>>> streaming read behaves like a factory for scans, producing scans that >>>>>> handle the data either in micro-batches or using continuous tasks. >>>>>> >>>>>> To address Jungtaek's question, I think that this does work with >>>>>> continuous. In continuous mode, the query operators keep running and send >>>>>> data to one another directly. The API still needs a streaming read layer >>>>>> because it may still produce more than one continuous scan. That would >>>>>> happen when the underlying source changes and Spark needs to >>>>>> reconfigure. I >>>>>> think the example here is when partitioning in a Kafka topic changes and >>>>>> Spark needs to re-map Kafka partitions to continuous tasks. >>>>>> >>>>>> rb >>>>>> >>>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Ryan, >>>>>>> >>>>>>> Sorry I may use a wrong wording. The pushdown is done with >>>>>>> ScanConfig, which is not table/stream/scan, but something between them. >>>>>>> The >>>>>>> table creates ScanConfigBuilder, and table creates stream/scan with >>>>>>> ScanConfig. For streaming source, stream is the one to take care of the >>>>>>> pushdown result. For batch source, it's the scan. >>>>>>> >>>>>>> It's a little tricky because stream is an abstraction for streaming >>>>>>> source only. Better ideas are welcome! >>>>>>> >>>>>> >>>>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote: >>>>>>> >>>>>>>> Thanks, Reynold! >>>>>>>> >>>>>>>> I think your API sketch looks great. I appreciate having the Table >>>>>>>> level in the abstraction to plug into as well. I think this makes it >>>>>>>> clear >>>>>>>> what everything does, particularly having the Stream level that >>>>>>>> represents >>>>>>>> a configured (by ScanConfig) streaming read and can act as a factory >>>>>>>> for >>>>>>>> individual batch scans or for continuous scans. >>>>>>>> >>>>>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table >>>>>>>> level. It seems to mean that pushdown is specific to a batch scan or >>>>>>>> streaming read, which seems to be what you're saying as well. Wouldn't >>>>>>>> the >>>>>>>> pushdown happen to create a ScanConfig, which is then used as Reynold >>>>>>>> suggests? Looking forward to seeing this PR when you get it posted. >>>>>>>> Thanks >>>>>>>> for all of your work on this! >>>>>>>> >>>>>>>> rb >>>>>>>> >>>>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thank Reynold for writing this and starting the discussion! >>>>>>>>> >>>>>>>>> Data source v2 was started with batch only, so we didn't pay much >>>>>>>>> attention to the abstraction and just follow the v1 API. Now we are >>>>>>>>> designing the streaming API and catalog integration, the abstraction >>>>>>>>> becomes super important. >>>>>>>>> >>>>>>>>> I like this proposed abstraction and have successfully prototyped >>>>>>>>> it to make sure it works. >>>>>>>>> >>>>>>>>> During prototyping, I have to work around the issue that the >>>>>>>>> current streaming engine does query optimization/planning for each >>>>>>>>> micro >>>>>>>>> batch. With this abstraction, the operator pushdown is only applied >>>>>>>>> once >>>>>>>>> per-query. In my prototype, I do the physical planning up front to >>>>>>>>> get the >>>>>>>>> pushdown result, and >>>>>>>>> add a logical linking node that wraps the resulting physical plan >>>>>>>>> node for the data source, and then swap that logical linking node >>>>>>>>> into the >>>>>>>>> logical plan for each batch. In the future we should just let the >>>>>>>>> streaming >>>>>>>>> engine do query optimization/planning only once. >>>>>>>>> >>>>>>>>> About pushdown, I think we should do it at the table level. The >>>>>>>>> table should create a new pushdow handler to apply operator pushdowm >>>>>>>>> for >>>>>>>>> each scan/stream, and create the scan/stream with the pushdown >>>>>>>>> result. The >>>>>>>>> rationale is, a table should have the same pushdown behavior >>>>>>>>> regardless the >>>>>>>>> scan node. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Wenchen >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I spent some time last week looking at the current data source v2 >>>>>>>>>> apis, and I thought we should be a bit more buttoned up in terms of >>>>>>>>>> the >>>>>>>>>> abstractions and the guarantees Spark provides. In particular, I >>>>>>>>>> feel we >>>>>>>>>> need the following levels of "abstractions", to fit the use cases in >>>>>>>>>> Spark, >>>>>>>>>> from batch, to streaming. >>>>>>>>>> >>>>>>>>>> Please don't focus on the naming at this stage. When possible, I >>>>>>>>>> draw parallels to what similar levels are named in the currently >>>>>>>>>> committed >>>>>>>>>> api: >>>>>>>>>> >>>>>>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC. >>>>>>>>>> There is currently no explicit class at this level. >>>>>>>>>> >>>>>>>>>> 1. Table: This should represent a logical dataset (with schema). >>>>>>>>>> This could be just a directory on the file system, or a table in the >>>>>>>>>> catalog. Operations on tables can include batch reads (Scan), >>>>>>>>>> streams, >>>>>>>>>> writes, and potentially other operations such as deletes. The >>>>>>>>>> closest to >>>>>>>>>> the table level abstraction in the current code base is the >>>>>>>>>> "Provider" >>>>>>>>>> class, although Provider isn't quite a Table. This is similar to >>>>>>>>>> Ryan's >>>>>>>>>> proposed design. >>>>>>>>>> >>>>>>>>>> 2. Stream: Specific to streaming. A stream is created out of a >>>>>>>>>> Table. This logically represents a an instance of a StreamingQuery. >>>>>>>>>> Pushdowns and options are handled at this layer. I.e. Spark >>>>>>>>>> guarnatees to >>>>>>>>>> data source implementation pushdowns and options don't change within >>>>>>>>>> a >>>>>>>>>> Stream. Each Stream consists of a sequence of scans. There is no >>>>>>>>>> equivalent concept in the current committed code. >>>>>>>>>> >>>>>>>>>> 3. Scan: A physical scan -- either as part of a streaming query, >>>>>>>>>> or a batch query. This should contain sufficient information and >>>>>>>>>> methods so >>>>>>>>>> we can run a Spark job over a defined subset of the table. It's >>>>>>>>>> functionally equivalent to an RDD, except there's no dependency on >>>>>>>>>> RDD so >>>>>>>>>> it is a smaller surface. In the current code, the equivalent class >>>>>>>>>> would be >>>>>>>>>> the ScanConfig, which represents the information needed, but in >>>>>>>>>> order to >>>>>>>>>> execute a job, ReadSupport is needed (various methods in ReadSupport >>>>>>>>>> takes >>>>>>>>>> a ScanConfig). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> To illustrate with pseudocode what the different levels mean, a >>>>>>>>>> batch query would look like the following: >>>>>>>>>> >>>>>>>>>> val provider = reflection[Format]("parquet") >>>>>>>>>> val table = provider.createTable(options) >>>>>>>>>> val scan = table.createScan(scanConfig) // scanConfig includes >>>>>>>>>> pushdown and options >>>>>>>>>> // run tasks on executors >>>>>>>>>> >>>>>>>>>> A streaming micro-batch scan would look like the following: >>>>>>>>>> >>>>>>>>>> val provider = reflection[Format]("parquet") >>>>>>>>>> val table = provider.createTable(options) >>>>>>>>>> val stream = table.createStream(scanConfig) >>>>>>>>>> >>>>>>>>>> while(true) { >>>>>>>>>> val scan = streamingScan.createScan(startOffset) >>>>>>>>>> // run tasks on executors >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Vs the current API, the above: >>>>>>>>>> >>>>>>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan >>>>>>>>>> abstraction. >>>>>>>>>> >>>>>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns and >>>>>>>>>> options are handled there, rather than at the individual scan >>>>>>>>>> (ReadSupport) >>>>>>>>>> level. Data source implementations don't need to worry about >>>>>>>>>> pushdowns or >>>>>>>>>> options changing mid-stream. For batch, those happen when the scan >>>>>>>>>> object >>>>>>>>>> is created. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This email is just a high level sketch. I've asked Wenchen to >>>>>>>>>> prototype this, to see if it is actually feasible and the degree of >>>>>>>>>> hacks >>>>>>>>>> it removes, or creates. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Ryan Blue >>>>>>>> Software Engineer >>>>>>>> Netflix >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Ryan Blue >>>>>> Software Engineer >>>>>> Netflix >>>>>> >>>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> >