Wenchen, can you explain a bit more clearly why this is necessary? The
pseudo-code you used doesn’t clearly demonstrate why. Why couldn’t this be
handled this with inheritance from an abstract Factory class? Why define
all of the createXDataReader methods, but make the DataFormat a field in
the factory?

A related issue is that I think there’s a strong case that the v2 sources
should produce only InternalRow and that Row and UnsafeRow shouldn’t be
exposed; see SPARK-23325 <https://issues.apache.org/jira/browse/SPARK-23325>.
The basic arguments are:

   - UnsafeRow is really difficult to produce without using Spark’s
   projection methods. If implementations can produce UnsafeRow, then they
   can still pass them as InternalRow and the projection Spark adds would
   be a no-op. When implementations can’t produce UnsafeRow, then it is
   better for Spark to insert the projection to unsafe. An example of a data
   format that doesn’t produce unsafe is the built-in Parquet source, which
   produces InternalRow and projects before returning the row.
   - For Row, I see no good reason to support it in a new interface when it
   will just introduce an extra transformation. The argument that Row is
   the “public” API doesn’t apply because UnsafeRow is already exposed
   through the v2 API.
   - Standardizing on InternalRow would remove the need for these
   interfaces entirely and simplify what implementers must provide and would
   reduce confusion over what to do.

Using InternalRow doesn’t cover the case where we want to produce
ColumnarBatch instead, so what you’re proposing might still be a good idea.
I just think that we can simplify either path.
​

On Mon, Apr 16, 2018 at 11:17 PM, Wenchen Fan <cloud0...@gmail.com> wrote:

> Yea definitely not. The only requirement is, the DataReader/WriterFactory
> must support at least one DataFormat.
>
> >  how are we going to express capability of the given reader of its
> supported format(s), or specific support for each of “real-time data in row
> format, and history data in columnar format”?
>
> When DataSourceReader/Writer create factories, the factory must contain
> enough information to decide the data format. Let's take ORC as an example.
> In OrcReaderFactory, it knows which files to read, and which columns to
> output. Since now Spark only support columnar scan for simple types,
> OrcReaderFactory will only output ColumnarBatch if the columns to scan
> are all simple types.
>
> On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung <felixcheun...@hotmail.com>
> wrote:
>
>> Is it required for DataReader to support all known DataFormat?
>>
>> Hopefully, not, as assumed by the ‘throw’ in the interface. Then
>> specifically how are we going to express capability of the given reader of
>> its supported format(s), or specific support for each of “real-time data in
>> row format, and history data in columnar format”?
>>
>>
>> ------------------------------
>> *From:* Wenchen Fan <cloud0...@gmail.com>
>> *Sent:* Sunday, April 15, 2018 7:45:01 PM
>> *To:* Spark dev list
>> *Subject:* [discuss][data source v2] remove type parameter in
>> DataReader/WriterFactory
>>
>> Hi all,
>>
>> I'd like to propose an API change to the data source v2.
>>
>> One design goal of data source v2 is API type safety. The FileFormat API
>> is a bad example, it asks the implementation to return InternalRow even
>> it's actually ColumnarBatch. In data source v2 we add a type parameter
>> to DataReader/WriterFactoty and DataReader/Writer, so that data source
>> supporting columnar scan returns ColumnarBatch at API level.
>>
>> However, we met some problems when migrating streaming and file-based
>> data source to data source v2.
>>
>> For the streaming side, we need a variant of DataReader/WriterFactory to
>> add streaming specific concept like epoch id and offset. For details please
>> see ContinuousDataReaderFactory and https://docs.google.com/do
>> cument/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#
>>
>> But this conflicts with the special format mixin traits like
>> SupportsScanColumnarBatch. We have to make the streaming variant of
>> DataReader/WriterFactory to extend the original DataReader/WriterFactory,
>> and do type cast at runtime, which is unnecessary and violate the type
>> safety.
>>
>> For the file-based data source side, we have a problem with code
>> duplication. Let's take ORC data source as an example. To support both
>> unsafe row and columnar batch scan, we need something like
>>
>> // A lot of parameters to carry to the executor side
>> class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
>>   def createDataReader ...
>> }
>>
>> class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch]
>> {
>>   def createDataReader ...
>> }
>>
>> class OrcDataSourceReader extends DataSourceReader {
>>   def createUnsafeRowFactories = ... // logic to prepare the parameters
>> and create factories
>>
>>   def createColumnarBatchFactories = ... // logic to prepare the
>> parameters and create factories
>> }
>>
>> You can see that we have duplicated logic for preparing parameters and
>> defining the factory.
>>
>> Here I propose to remove all the special format mixin traits and change
>> the factory interface to
>>
>> public enum DataFormat {
>>   ROW,
>>   INTERNAL_ROW,
>>   UNSAFE_ROW,
>>   COLUMNAR_BATCH
>> }
>>
>> interface DataReaderFactory {
>>   DataFormat dataFormat;
>>
>>   default DataReader<Row> createRowDataReader() {
>>     throw new IllegalStateException();
>>   }
>>
>>   default DataReader<UnsafeRow> createUnsafeRowDataReader() {
>>     throw new IllegalStateException();
>>   }
>>
>>   default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
>>     throw new IllegalStateException();
>>   }
>> }
>>
>> Spark will look at the dataFormat and decide which create data reader
>> method to call.
>>
>> Now we don't have the problem for the streaming side as these special
>> format mixin traits go away. And the ORC data source can also be simplified
>> to
>>
>> class OrcReaderFactory(...) extends DataReaderFactory {
>>   def createUnsafeRowReader ...
>>
>>   def createColumnarBatchReader ...
>> }
>>
>> class OrcDataSourceReader extends DataSourceReader {
>>   def createReadFactories = ... // logic to prepare the parameters and
>> create factories
>> }
>>
>> We also have a potential benefit of supporting hybrid storage data
>> source, which may keep real-time data in row format, and history data in
>> columnar format. Then they can make some DataReaderFactory output
>> InternalRow and some output ColumnarBatch.
>>
>> Thoughts?
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to