Same here, I don't see anything from Wenchen... just replies to him. On Sat, Sep 1, 2018 at 9:31 PM Mridul Muralidharan <mri...@gmail.com> wrote: > > > Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did > :-) ) > I did not see it in the mail thread I received or in archives ... [1] > Wondering which othersenderswere getting dropped (if yes). > > Regards > Mridul > > [1] > http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html > > > On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid> wrote: >> >> Thanks for clarifying, Wenchen. I think that's what I expected. >> >> As for the abstraction, here's the way that I think about it: there are two >> important parts of a scan: the definition of what will be read, and task >> sets that actually perform the read. In batch, there's one definition of the >> scan and one task set so it makes sense that there's one scan object that >> encapsulates both of these concepts. For streaming, we need to separate the >> two into the definition of what will be read (the stream or streaming read) >> and the task sets that are run (scans). That way, the streaming read behaves >> like a factory for scans, producing scans that handle the data either in >> micro-batches or using continuous tasks. >> >> To address Jungtaek's question, I think that this does work with continuous. >> In continuous mode, the query operators keep running and send data to one >> another directly. The API still needs a streaming read layer because it may >> still produce more than one continuous scan. That would happen when the >> underlying source changes and Spark needs to reconfigure. I think the >> example here is when partitioning in a Kafka topic changes and Spark needs >> to re-map Kafka partitions to continuous tasks. >> >> rb >> >> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com> wrote: >>> >>> Hi Ryan, >>> >>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig, >>> which is not table/stream/scan, but something between them. The table >>> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. >>> For streaming source, stream is the one to take care of the pushdown >>> result. For batch source, it's the scan. >>> >>> It's a little tricky because stream is an abstraction for streaming source >>> only. Better ideas are welcome! >>> >>> >>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote: >>>> >>>> Thanks, Reynold! >>>> >>>> I think your API sketch looks great. I appreciate having the Table level >>>> in the abstraction to plug into as well. I think this makes it clear what >>>> everything does, particularly having the Stream level that represents a >>>> configured (by ScanConfig) streaming read and can act as a factory for >>>> individual batch scans or for continuous scans. >>>> >>>> Wenchen, I'm not sure what you mean by doing pushdown at the table level. >>>> It seems to mean that pushdown is specific to a batch scan or streaming >>>> read, which seems to be what you're saying as well. Wouldn't the pushdown >>>> happen to create a ScanConfig, which is then used as Reynold suggests? >>>> Looking forward to seeing this PR when you get it posted. Thanks for all >>>> of your work on this! >>>> >>>> rb >>>> >>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com> wrote: >>>>> >>>>> Thank Reynold for writing this and starting the discussion! >>>>> >>>>> Data source v2 was started with batch only, so we didn't pay much >>>>> attention to the abstraction and just follow the v1 API. Now we are >>>>> designing the streaming API and catalog integration, the abstraction >>>>> becomes super important. >>>>> >>>>> I like this proposed abstraction and have successfully prototyped it to >>>>> make sure it works. >>>>> >>>>> During prototyping, I have to work around the issue that the current >>>>> streaming engine does query optimization/planning for each micro batch. >>>>> With this abstraction, the operator pushdown is only applied once >>>>> per-query. In my prototype, I do the physical planning up front to get >>>>> the pushdown result, and >>>>> add a logical linking node that wraps the resulting physical plan node >>>>> for the data source, and then swap that logical linking node into the >>>>> logical plan for each batch. In the future we should just let the >>>>> streaming engine do query optimization/planning only once. >>>>> >>>>> About pushdown, I think we should do it at the table level. The table >>>>> should create a new pushdow handler to apply operator pushdowm for each >>>>> scan/stream, and create the scan/stream with the pushdown result. The >>>>> rationale is, a table should have the same pushdown behavior regardless >>>>> the scan node. >>>>> >>>>> Thanks, >>>>> Wenchen >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com> wrote: >>>>>> >>>>>> I spent some time last week looking at the current data source v2 apis, >>>>>> and I thought we should be a bit more buttoned up in terms of the >>>>>> abstractions and the guarantees Spark provides. In particular, I feel we >>>>>> need the following levels of "abstractions", to fit the use cases in >>>>>> Spark, from batch, to streaming. >>>>>> >>>>>> Please don't focus on the naming at this stage. When possible, I draw >>>>>> parallels to what similar levels are named in the currently committed >>>>>> api: >>>>>> >>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC. There >>>>>> is currently no explicit class at this level. >>>>>> >>>>>> 1. Table: This should represent a logical dataset (with schema). This >>>>>> could be just a directory on the file system, or a table in the catalog. >>>>>> Operations on tables can include batch reads (Scan), streams, writes, >>>>>> and potentially other operations such as deletes. The closest to the >>>>>> table level abstraction in the current code base is the "Provider" >>>>>> class, although Provider isn't quite a Table. This is similar to Ryan's >>>>>> proposed design. >>>>>> >>>>>> 2. Stream: Specific to streaming. A stream is created out of a Table. >>>>>> This logically represents a an instance of a StreamingQuery. Pushdowns >>>>>> and options are handled at this layer. I.e. Spark guarnatees to data >>>>>> source implementation pushdowns and options don't change within a >>>>>> Stream. Each Stream consists of a sequence of scans. There is no >>>>>> equivalent concept in the current committed code. >>>>>> >>>>>> 3. Scan: A physical scan -- either as part of a streaming query, or a >>>>>> batch query. This should contain sufficient information and methods so >>>>>> we can run a Spark job over a defined subset of the table. It's >>>>>> functionally equivalent to an RDD, except there's no dependency on RDD >>>>>> so it is a smaller surface. In the current code, the equivalent class >>>>>> would be the ScanConfig, which represents the information needed, but in >>>>>> order to execute a job, ReadSupport is needed (various methods in >>>>>> ReadSupport takes a ScanConfig). >>>>>> >>>>>> >>>>>> To illustrate with pseudocode what the different levels mean, a batch >>>>>> query would look like the following: >>>>>> >>>>>> val provider = reflection[Format]("parquet") >>>>>> val table = provider.createTable(options) >>>>>> val scan = table.createScan(scanConfig) // scanConfig includes pushdown >>>>>> and options >>>>>> // run tasks on executors >>>>>> >>>>>> A streaming micro-batch scan would look like the following: >>>>>> >>>>>> val provider = reflection[Format]("parquet") >>>>>> val table = provider.createTable(options) >>>>>> val stream = table.createStream(scanConfig) >>>>>> >>>>>> while(true) { >>>>>> val scan = streamingScan.createScan(startOffset) >>>>>> // run tasks on executors >>>>>> } >>>>>> >>>>>> >>>>>> Vs the current API, the above: >>>>>> >>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan >>>>>> abstraction. >>>>>> >>>>>> 2. Have an explicit Stream level and makes it clear pushdowns and >>>>>> options are handled there, rather than at the individual scan >>>>>> (ReadSupport) level. Data source implementations don't need to worry >>>>>> about pushdowns or options changing mid-stream. For batch, those happen >>>>>> when the scan object is created. >>>>>> >>>>>> >>>>>> >>>>>> This email is just a high level sketch. I've asked Wenchen to prototype >>>>>> this, to see if it is actually feasible and the degree of hacks it >>>>>> removes, or creates. >>>>>> >>>>>> >>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix
-- Marcelo --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org