Nice suggestion Reynold and great news to see that Wenchen succeeded
prototyping!

One thing I would like to make sure is, how continuous mode works with such
abstraction. Would continuous mode be also abstracted with Stream, and
createScan would provide unbounded Scan?

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 9월 1일 (토) 오전 8:26, Ryan Blue <rb...@netflix.com.invalid>님이 작성:

> Thanks, Reynold!
>
> I think your API sketch looks great. I appreciate having the Table level
> in the abstraction to plug into as well. I think this makes it clear what
> everything does, particularly having the Stream level that represents a
> configured (by ScanConfig) streaming read and can act as a factory for
> individual batch scans or for continuous scans.
>
> Wenchen, I'm not sure what you mean by doing pushdown at the table level.
> It seems to mean that pushdown is specific to a batch scan or streaming
> read, which seems to be what you're saying as well. Wouldn't the pushdown
> happen to create a ScanConfig, which is then used as Reynold suggests?
> Looking forward to seeing this PR when you get it posted. Thanks for all of
> your work on this!
>
> rb
>
> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com>
> wrote:
>
>> Thank Reynold for writing this and starting the discussion!
>>
>> Data source v2 was started with batch only, so we didn't pay much
>> attention to the abstraction and just follow the v1 API. Now we are
>> designing the streaming API and catalog integration, the abstraction
>> becomes super important.
>>
>> I like this proposed abstraction and have successfully prototyped it to
>> make sure it works.
>>
>> During prototyping, I have to work around the issue that the current
>> streaming engine does query optimization/planning for each micro batch.
>> With this abstraction, the operator pushdown is only applied once
>> per-query. In my prototype, I do the physical planning up front to get the
>> pushdown result, and
>> add a logical linking node that wraps the resulting physical plan node
>> for the data source, and then swap that logical linking node into the
>> logical plan for each batch. In the future we should just let the streaming
>> engine do query optimization/planning only once.
>>
>> About pushdown, I think we should do it at the table level. The table
>> should create a new pushdow handler to apply operator pushdowm for each
>> scan/stream, and create the scan/stream with the pushdown result. The
>> rationale is, a table should have the same pushdown behavior regardless the
>> scan node.
>>
>> Thanks,
>> Wenchen
>>
>>
>>
>>
>>
>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com> wrote:
>>
>>> I spent some time last week looking at the current data source v2 apis,
>>> and I thought we should be a bit more buttoned up in terms of the
>>> abstractions and the guarantees Spark provides. In particular, I feel we
>>> need the following levels of "abstractions", to fit the use cases in Spark,
>>> from batch, to streaming.
>>>
>>> Please don't focus on the naming at this stage. When possible, I draw
>>> parallels to what similar levels are named in the currently committed api:
>>>
>>> 0. Format: This represents a specific format, e.g. Parquet, ORC. There
>>> is currently no explicit class at this level.
>>>
>>> 1. Table: This should represent a logical dataset (with schema). This
>>> could be just a directory on the file system, or a table in the catalog.
>>> Operations on tables can include batch reads (Scan), streams, writes, and
>>> potentially other operations such as deletes. The closest to the table
>>> level abstraction in the current code base is the "Provider" class,
>>> although Provider isn't quite a Table. This is similar to Ryan's proposed
>>> design.
>>>
>>> 2. Stream: Specific to streaming. A stream is created out of a Table.
>>> This logically represents a an instance of a StreamingQuery. Pushdowns and
>>> options are handled at this layer. I.e. Spark guarnatees to data source
>>> implementation pushdowns and options don't change within a Stream. Each
>>> Stream consists of a sequence of scans. There is no equivalent concept
>>> in the current committed code.
>>>
>>> 3. Scan: A physical scan -- either as part of a streaming query, or a
>>> batch query. This should contain sufficient information and methods so we
>>> can run a Spark job over a defined subset of the table. It's functionally
>>> equivalent to an RDD, except there's no dependency on RDD so it is a
>>> smaller surface. In the current code, the equivalent class would be the
>>> ScanConfig, which represents the information needed, but in order to
>>> execute a job, ReadSupport is needed (various methods in ReadSupport takes
>>> a ScanConfig).
>>>
>>>
>>> To illustrate with pseudocode what the different levels mean, a batch
>>> query would look like the following:
>>>
>>> val provider = reflection[Format]("parquet")
>>> val table = provider.createTable(options)
>>> val scan = table.createScan(scanConfig) // scanConfig includes pushdown
>>> and options
>>> // run tasks on executors
>>>
>>> A streaming micro-batch scan would look like the following:
>>>
>>> val provider = reflection[Format]("parquet")
>>> val table = provider.createTable(options)
>>> val stream = table.createStream(scanConfig)
>>>
>>> while(true) {
>>>   val scan = streamingScan.createScan(startOffset)
>>>   // run tasks on executors
>>> }
>>>
>>>
>>> Vs the current API, the above:
>>>
>>> 1. Creates an explicit Table abstraction, and an explicit Scan
>>> abstraction.
>>>
>>> 2. Have an explicit Stream level and makes it clear pushdowns and
>>> options are handled there, rather than at the individual scan (ReadSupport)
>>> level. Data source implementations don't need to worry about pushdowns or
>>> options changing mid-stream. For batch, those happen when the scan object
>>> is created.
>>>
>>>
>>>
>>> This email is just a high level sketch. I've asked Wenchen to prototype
>>> this, to see if it is actually feasible and the degree of hacks it removes,
>>> or creates.
>>>
>>>
>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to