Hi Ryan,

You are right that the `LogicalWrite` mirrors the read side API. I just
don't have a good naming yet, and write side changes will be a different PR.


Hi Hyukjin,

That's my expectation, otherwise we keep rebasing the refactor PR and never
get it done.

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <gurwls...@gmail.com> wrote:

> BTW, do we hold Datasource V2 related PRs for now until we finish this
> refactoring just for clarification?
>
> 2018년 9월 7일 (금) 오전 12:52, Ryan Blue <rb...@netflix.com.invalid>님이 작성:
>
>> Wenchen,
>>
>> I'm not really sure what you're proposing here. What is a `LogicalWrite`?
>> Is it something that mirrors the read side in your PR?
>>
>> I think that I agree that if we have a Write independent of the Table
>> that carries the commit and abort methods, then we can create it directly
>> without a WriteConfig. So I tentatively agree with what you propose,
>> assuming that I understand it correctly.
>>
>> rb
>>
>> On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <cloud0...@gmail.com> wrote:
>>
>>> I'm switching to my another Gmail account, let's see if it still gets
>>> dropped this time.
>>>
>>> Hi Ryan,
>>>
>>> I'm thinking about the write path and feel the abstraction should be the
>>> same.
>>>
>>> We still have logical and physical writing. And the table can create
>>> different logical writing based on how to write. e.g., append, delete,
>>> replaceWhere, etc.
>>>
>>> One thing I'm not sure about is the WriteConfig. With the WriteConfig,
>>> the API would look like
>>> trait Table {
>>>   WriteConfig newAppendWriteConfig();
>>>
>>>   WriteConfig newDeleteWriteConfig(deleteExprs);
>>>
>>>   LogicalWrite newLogicalWrite(writeConfig);
>>> }
>>>
>>> Without WriteConfig, the API looks like
>>> trait Table {
>>>   LogicalWrite newAppendWrite();
>>>
>>>   LogicalWrite newDeleteWrite(deleteExprs);
>>> }
>>>
>>>
>>> It looks to me that the API is simpler without WriteConfig, what do you
>>> think?
>>>
>>> Thanks,
>>> Wenchen
>>>
>>> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <rb...@netflix.com.invalid>
>>> wrote:
>>>
>>>> Latest from Wenchen in case it was dropped.
>>>>
>>>> ---------- Forwarded message ---------
>>>> From: Wenchen Fan <wenc...@databricks.com>
>>>> Date: Mon, Sep 3, 2018 at 6:16 AM
>>>> Subject: Re: data source api v2 refactoring
>>>> To: <mri...@gmail.com>
>>>> Cc: Ryan Blue <rb...@netflix.com>, Reynold Xin <r...@databricks.com>, <
>>>> dev@spark.apache.org>
>>>>
>>>>
>>>> Hi Mridul,
>>>>
>>>> I'm not sure what's going on, my email was CC'ed to the dev list.
>>>>
>>>>
>>>> Hi Ryan,
>>>>
>>>> The logical and physical scan idea sounds good. To add more color
>>>> to Jungtaek's question, both micro-batch and continuous mode have
>>>> the logical and physical scan, but there is a difference: for micro-batch
>>>> mode, a physical scan outputs data for one epoch, but it's not true for
>>>> continuous mode.
>>>>
>>>> I'm not sure if it's necessary to include streaming epoch in the API
>>>> abstraction, for features like metrics reporting.
>>>>
>>>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <mri...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously
>>>>> Ryan did :-) )
>>>>> I did not see it in the mail thread I received or in archives ... [1]
>>>>> Wondering which othersenderswere getting dropped (if yes).
>>>>>
>>>>> Regards
>>>>> Mridul
>>>>>
>>>>> [1]
>>>>> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>>>>>
>>>>>
>>>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid>
>>>>> wrote:
>>>>>
>>>>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>>>>>
>>>>>> As for the abstraction, here's the way that I think about it: there
>>>>>> are two important parts of a scan: the definition of what will be read, 
>>>>>> and
>>>>>> task sets that actually perform the read. In batch, there's one 
>>>>>> definition
>>>>>> of the scan and one task set so it makes sense that there's one scan 
>>>>>> object
>>>>>> that encapsulates both of these concepts. For streaming, we need to
>>>>>> separate the two into the definition of what will be read (the stream or
>>>>>> streaming read) and the task sets that are run (scans). That way, the
>>>>>> streaming read behaves like a factory for scans, producing scans that
>>>>>> handle the data either in micro-batches or using continuous tasks.
>>>>>>
>>>>>> To address Jungtaek's question, I think that this does work with
>>>>>> continuous. In continuous mode, the query operators keep running and send
>>>>>> data to one another directly. The API still needs a streaming read layer
>>>>>> because it may still produce more than one continuous scan. That would
>>>>>> happen when the underlying source changes and Spark needs to 
>>>>>> reconfigure. I
>>>>>> think the example here is when partitioning in a Kafka topic changes and
>>>>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>>>>>
>>>>>> rb
>>>>>>
>>>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Ryan,
>>>>>>>
>>>>>>> Sorry I may use a wrong wording. The pushdown is done with
>>>>>>> ScanConfig, which is not table/stream/scan, but something between them. 
>>>>>>> The
>>>>>>> table creates ScanConfigBuilder, and table creates stream/scan with
>>>>>>> ScanConfig. For streaming source, stream is the one to take care of the
>>>>>>> pushdown result. For batch source, it's the scan.
>>>>>>>
>>>>>>> It's a little tricky because stream is an abstraction for streaming
>>>>>>> source only. Better ideas are welcome!
>>>>>>>
>>>>>>
>>>>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>>
>>>>>>>> Thanks, Reynold!
>>>>>>>>
>>>>>>>> I think your API sketch looks great. I appreciate having the Table
>>>>>>>> level in the abstraction to plug into as well. I think this makes it 
>>>>>>>> clear
>>>>>>>> what everything does, particularly having the Stream level that 
>>>>>>>> represents
>>>>>>>> a configured (by ScanConfig) streaming read and can act as a factory 
>>>>>>>> for
>>>>>>>> individual batch scans or for continuous scans.
>>>>>>>>
>>>>>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table
>>>>>>>> level. It seems to mean that pushdown is specific to a batch scan or
>>>>>>>> streaming read, which seems to be what you're saying as well. Wouldn't 
>>>>>>>> the
>>>>>>>> pushdown happen to create a ScanConfig, which is then used as Reynold
>>>>>>>> suggests? Looking forward to seeing this PR when you get it posted. 
>>>>>>>> Thanks
>>>>>>>> for all of your work on this!
>>>>>>>>
>>>>>>>> rb
>>>>>>>>
>>>>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>>>>>
>>>>>>>>> Data source v2 was started with batch only, so we didn't pay much
>>>>>>>>> attention to the abstraction and just follow the v1 API. Now we are
>>>>>>>>> designing the streaming API and catalog integration, the abstraction
>>>>>>>>> becomes super important.
>>>>>>>>>
>>>>>>>>> I like this proposed abstraction and have successfully prototyped
>>>>>>>>> it to make sure it works.
>>>>>>>>>
>>>>>>>>> During prototyping, I have to work around the issue that the
>>>>>>>>> current streaming engine does query optimization/planning for each 
>>>>>>>>> micro
>>>>>>>>> batch. With this abstraction, the operator pushdown is only applied 
>>>>>>>>> once
>>>>>>>>> per-query. In my prototype, I do the physical planning up front to 
>>>>>>>>> get the
>>>>>>>>> pushdown result, and
>>>>>>>>> add a logical linking node that wraps the resulting physical plan
>>>>>>>>> node for the data source, and then swap that logical linking node 
>>>>>>>>> into the
>>>>>>>>> logical plan for each batch. In the future we should just let the 
>>>>>>>>> streaming
>>>>>>>>> engine do query optimization/planning only once.
>>>>>>>>>
>>>>>>>>> About pushdown, I think we should do it at the table level. The
>>>>>>>>> table should create a new pushdow handler to apply operator pushdowm 
>>>>>>>>> for
>>>>>>>>> each scan/stream, and create the scan/stream with the pushdown 
>>>>>>>>> result. The
>>>>>>>>> rationale is, a table should have the same pushdown behavior 
>>>>>>>>> regardless the
>>>>>>>>> scan node.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Wenchen
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I spent some time last week looking at the current data source v2
>>>>>>>>>> apis, and I thought we should be a bit more buttoned up in terms of 
>>>>>>>>>> the
>>>>>>>>>> abstractions and the guarantees Spark provides. In particular, I 
>>>>>>>>>> feel we
>>>>>>>>>> need the following levels of "abstractions", to fit the use cases in 
>>>>>>>>>> Spark,
>>>>>>>>>> from batch, to streaming.
>>>>>>>>>>
>>>>>>>>>> Please don't focus on the naming at this stage. When possible, I
>>>>>>>>>> draw parallels to what similar levels are named in the currently 
>>>>>>>>>> committed
>>>>>>>>>> api:
>>>>>>>>>>
>>>>>>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC.
>>>>>>>>>> There is currently no explicit class at this level.
>>>>>>>>>>
>>>>>>>>>> 1. Table: This should represent a logical dataset (with schema).
>>>>>>>>>> This could be just a directory on the file system, or a table in the
>>>>>>>>>> catalog. Operations on tables can include batch reads (Scan), 
>>>>>>>>>> streams,
>>>>>>>>>> writes, and potentially other operations such as deletes. The 
>>>>>>>>>> closest to
>>>>>>>>>> the table level abstraction in the current code base is the 
>>>>>>>>>> "Provider"
>>>>>>>>>> class, although Provider isn't quite a Table. This is similar to 
>>>>>>>>>> Ryan's
>>>>>>>>>> proposed design.
>>>>>>>>>>
>>>>>>>>>> 2. Stream: Specific to streaming. A stream is created out of a
>>>>>>>>>> Table. This logically represents a an instance of a StreamingQuery.
>>>>>>>>>> Pushdowns and options are handled at this layer. I.e. Spark 
>>>>>>>>>> guarnatees to
>>>>>>>>>> data source implementation pushdowns and options don't change within 
>>>>>>>>>> a
>>>>>>>>>> Stream. Each Stream consists of a sequence of scans. There is no
>>>>>>>>>> equivalent concept in the current committed code.
>>>>>>>>>>
>>>>>>>>>> 3. Scan: A physical scan -- either as part of a streaming query,
>>>>>>>>>> or a batch query. This should contain sufficient information and 
>>>>>>>>>> methods so
>>>>>>>>>> we can run a Spark job over a defined subset of the table. It's
>>>>>>>>>> functionally equivalent to an RDD, except there's no dependency on 
>>>>>>>>>> RDD so
>>>>>>>>>> it is a smaller surface. In the current code, the equivalent class 
>>>>>>>>>> would be
>>>>>>>>>> the ScanConfig, which represents the information needed, but in 
>>>>>>>>>> order to
>>>>>>>>>> execute a job, ReadSupport is needed (various methods in ReadSupport 
>>>>>>>>>> takes
>>>>>>>>>> a ScanConfig).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> To illustrate with pseudocode what the different levels mean, a
>>>>>>>>>> batch query would look like the following:
>>>>>>>>>>
>>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>>> val scan = table.createScan(scanConfig) // scanConfig includes
>>>>>>>>>> pushdown and options
>>>>>>>>>> // run tasks on executors
>>>>>>>>>>
>>>>>>>>>> A streaming micro-batch scan would look like the following:
>>>>>>>>>>
>>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>>> val stream = table.createStream(scanConfig)
>>>>>>>>>>
>>>>>>>>>> while(true) {
>>>>>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>>>>>   // run tasks on executors
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Vs the current API, the above:
>>>>>>>>>>
>>>>>>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan
>>>>>>>>>> abstraction.
>>>>>>>>>>
>>>>>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns and
>>>>>>>>>> options are handled there, rather than at the individual scan 
>>>>>>>>>> (ReadSupport)
>>>>>>>>>> level. Data source implementations don't need to worry about 
>>>>>>>>>> pushdowns or
>>>>>>>>>> options changing mid-stream. For batch, those happen when the scan 
>>>>>>>>>> object
>>>>>>>>>> is created.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This email is just a high level sketch. I've asked Wenchen to
>>>>>>>>>> prototype this, to see if it is actually feasible and the degree of 
>>>>>>>>>> hacks
>>>>>>>>>> it removes, or creates.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Software Engineer
>>>>>>>> Netflix
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Reply via email to