Same here, I don't see anything from Wenchen... just replies to him.
On Sat, Sep 1, 2018 at 9:31 PM Mridul Muralidharan <mri...@gmail.com> wrote:
>
>
> Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did 
> :-) )
> I did not see it in the mail thread I received or in archives ... [1] 
> Wondering which othersenderswere getting dropped (if yes).
>
> Regards
> Mridul
>
> [1] 
> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>
>
> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid> wrote:
>>
>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>
>> As for the abstraction, here's the way that I think about it: there are two 
>> important parts of a scan: the definition of what will be read, and task 
>> sets that actually perform the read. In batch, there's one definition of the 
>> scan and one task set so it makes sense that there's one scan object that 
>> encapsulates both of these concepts. For streaming, we need to separate the 
>> two into the definition of what will be read (the stream or streaming read) 
>> and the task sets that are run (scans). That way, the streaming read behaves 
>> like a factory for scans, producing scans that handle the data either in 
>> micro-batches or using continuous tasks.
>>
>> To address Jungtaek's question, I think that this does work with continuous. 
>> In continuous mode, the query operators keep running and send data to one 
>> another directly. The API still needs a streaming read layer because it may 
>> still produce more than one continuous scan. That would happen when the 
>> underlying source changes and Spark needs to reconfigure. I think the 
>> example here is when partitioning in a Kafka topic changes and Spark needs 
>> to re-map Kafka partitions to continuous tasks.
>>
>> rb
>>
>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com> wrote:
>>>
>>> Hi Ryan,
>>>
>>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig, 
>>> which is not table/stream/scan, but something between them. The table 
>>> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. 
>>> For streaming source, stream is the one to take care of the pushdown 
>>> result. For batch source, it's the scan.
>>>
>>> It's a little tricky because stream is an abstraction for streaming source 
>>> only. Better ideas are welcome!
>>>
>>>
>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>> Thanks, Reynold!
>>>>
>>>> I think your API sketch looks great. I appreciate having the Table level 
>>>> in the abstraction to plug into as well. I think this makes it clear what 
>>>> everything does, particularly having the Stream level that represents a 
>>>> configured (by ScanConfig) streaming read and can act as a factory for 
>>>> individual batch scans or for continuous scans.
>>>>
>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table level. 
>>>> It seems to mean that pushdown is specific to a batch scan or streaming 
>>>> read, which seems to be what you're saying as well. Wouldn't the pushdown 
>>>> happen to create a ScanConfig, which is then used as Reynold suggests? 
>>>> Looking forward to seeing this PR when you get it posted. Thanks for all 
>>>> of your work on this!
>>>>
>>>> rb
>>>>
>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com> wrote:
>>>>>
>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>
>>>>> Data source v2 was started with batch only, so we didn't pay much 
>>>>> attention to the abstraction and just follow the v1 API. Now we are 
>>>>> designing the streaming API and catalog integration, the abstraction 
>>>>> becomes super important.
>>>>>
>>>>> I like this proposed abstraction and have successfully prototyped it to 
>>>>> make sure it works.
>>>>>
>>>>> During prototyping, I have to work around the issue that the current 
>>>>> streaming engine does query optimization/planning for each micro batch. 
>>>>> With this abstraction, the operator pushdown is only applied once 
>>>>> per-query. In my prototype, I do the physical planning up front to get 
>>>>> the pushdown result, and
>>>>> add a logical linking node that wraps the resulting physical plan node 
>>>>> for the data source, and then swap that logical linking node into the 
>>>>> logical plan for each batch. In the future we should just let the 
>>>>> streaming engine do query optimization/planning only once.
>>>>>
>>>>> About pushdown, I think we should do it at the table level. The table 
>>>>> should create a new pushdow handler to apply operator pushdowm for each 
>>>>> scan/stream, and create the scan/stream with the pushdown result. The 
>>>>> rationale is, a table should have the same pushdown behavior regardless 
>>>>> the scan node.
>>>>>
>>>>> Thanks,
>>>>> Wenchen
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com> wrote:
>>>>>>
>>>>>> I spent some time last week looking at the current data source v2 apis, 
>>>>>> and I thought we should be a bit more buttoned up in terms of the 
>>>>>> abstractions and the guarantees Spark provides. In particular, I feel we 
>>>>>> need the following levels of "abstractions", to fit the use cases in 
>>>>>> Spark, from batch, to streaming.
>>>>>>
>>>>>> Please don't focus on the naming at this stage. When possible, I draw 
>>>>>> parallels to what similar levels are named in the currently committed 
>>>>>> api:
>>>>>>
>>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC. There 
>>>>>> is currently no explicit class at this level.
>>>>>>
>>>>>> 1. Table: This should represent a logical dataset (with schema). This 
>>>>>> could be just a directory on the file system, or a table in the catalog. 
>>>>>> Operations on tables can include batch reads (Scan), streams, writes, 
>>>>>> and potentially other operations such as deletes. The closest to the 
>>>>>> table level abstraction in the current code base is the "Provider" 
>>>>>> class, although Provider isn't quite a Table. This is similar to Ryan's 
>>>>>> proposed design.
>>>>>>
>>>>>> 2. Stream: Specific to streaming. A stream is created out of a Table. 
>>>>>> This logically represents a an instance of a StreamingQuery. Pushdowns 
>>>>>> and options are handled at this layer. I.e. Spark guarnatees to data 
>>>>>> source implementation pushdowns and options don't change within a 
>>>>>> Stream. Each Stream consists of a sequence of scans. There is no 
>>>>>> equivalent concept in the current committed code.
>>>>>>
>>>>>> 3. Scan: A physical scan -- either as part of a streaming query, or a 
>>>>>> batch query. This should contain sufficient information and methods so 
>>>>>> we can run a Spark job over a defined subset of the table. It's 
>>>>>> functionally equivalent to an RDD, except there's no dependency on RDD 
>>>>>> so it is a smaller surface. In the current code, the equivalent class 
>>>>>> would be the ScanConfig, which represents the information needed, but in 
>>>>>> order to execute a job, ReadSupport is needed (various methods in 
>>>>>> ReadSupport takes a ScanConfig).
>>>>>>
>>>>>>
>>>>>> To illustrate with pseudocode what the different levels mean, a batch 
>>>>>> query would look like the following:
>>>>>>
>>>>>> val provider = reflection[Format]("parquet")
>>>>>> val table = provider.createTable(options)
>>>>>> val scan = table.createScan(scanConfig) // scanConfig includes pushdown 
>>>>>> and options
>>>>>> // run tasks on executors
>>>>>>
>>>>>> A streaming micro-batch scan would look like the following:
>>>>>>
>>>>>> val provider = reflection[Format]("parquet")
>>>>>> val table = provider.createTable(options)
>>>>>> val stream = table.createStream(scanConfig)
>>>>>>
>>>>>> while(true) {
>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>   // run tasks on executors
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Vs the current API, the above:
>>>>>>
>>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan 
>>>>>> abstraction.
>>>>>>
>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns and 
>>>>>> options are handled there, rather than at the individual scan 
>>>>>> (ReadSupport) level. Data source implementations don't need to worry 
>>>>>> about pushdowns or options changing mid-stream. For batch, those happen 
>>>>>> when the scan object is created.
>>>>>>
>>>>>>
>>>>>>
>>>>>> This email is just a high level sketch. I've asked Wenchen to prototype 
>>>>>> this, to see if it is actually feasible and the degree of hacks it 
>>>>>> removes, or creates.
>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix



-- 
Marcelo

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to