[ 
https://issues.apache.org/jira/browse/SPARK-24882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16558666#comment-16558666
 ] 

Ryan Blue edited comment on SPARK-24882 at 7/26/18 6:19 PM:
------------------------------------------------------------

[~cloud_fan], I'm adding some suggestions here because comments on the doc are 
good for discussion, but not really for longer content.

I like the separation of the batch, micro-batch, and streaming classes. That 
works well. I also like the addition of the Metadata class, though I'd use a 
more specific name.

There are a few naming changes I would make to be more specific and to preserve 
existing names or naming conventions:
* Instead of ReaderProvider, I think we should use ReaderFactory because that 
name corresponds to the write path and accurately describes the class
* I think we should continue to use InputPartition instead of InputSplit, even 
if we introduce a reader factory. (Probably also rename SplitReader to 
PartitionReader.)
* Metadata isn't specific so I think we should use ScanConfig instead
* getSplits should be planSplits because "get" implies a quick operation (like 
returning a field's value) in Java. This is also consistent with the current 
API.

Next, instead of using mix-ins on Metadata / ScanConfig, I think a builder 
would help clarify the order of operations. If ScanConfig is mutable, then it 
could be passed to the other methods in different states. I'd rather use a 
Builder to make it Immutable. That way, implementations know that the Metadata 
/ ScanConfig doesn't change between calls to estimateStatistics and getSplits 
so results can be cached. To make this work, Spark would provide a Builder 
interface with default methods that do nothing. To implement pushdown, users 
just need to implement the methods. This also allows us to add new pushdown 
methods (like pushLimit) without introducing new interfaces.

I'd also like to see the classes reorganized a little to reduce the overall 
number of interfaces:

Metadata / ScanConfig contains all of the state that the DataSourceReader used 
to hold. If the DataSourceReader has no state, then its methods should be 
provided by the a single instance of the source instead. That would change the 
API to get rid of the Reader level and merge it into ReadSupport. Then 
ReadSupport would be used to create a ScanConfig and then BatchReadSupport (or 
similar) would be used to plan splits and get reader factories. I think this is 
easier for implementations.

{code:lang=java}
public interface ReadSupport {
  ScanConfig.Builder newScanBuilder();
}

public interface ReportsStatistics extends ReadSupport {
  Statistics estimateStatistics(ScanConfig)
}

public interface BatchReadSupport extends ReadSupport {
  InputSplit[] planSplits(ScanConfig)

  ReaderFactory readerFactory()
}

public interface MicroBatchReadSupport extends ScanSupport {
  InputPartition[] planSplits(ScanConfig, Offset start, Offset end)

  Offset initialOffset()

  MicroBatchReaderFactory readerFactory()
}

public interface ContinuousReadSupport extends ScanSupport {
  InputPartition[] planSplits(ScanConfig, Offset start)

  Offset initialOffset()

  ContinuousReaderFactory readerFactory()
}
{code}

Note that this change also cleans up the confusion around the use of Reader: 
the only Reader is a SplitReader that returns rows or row batches.

I would keep the same structure that you have for micro-batch, continuous, and 
batch ReaderFactory and SplitReader.

Here's a sketch of the ScanConfig and Builder I mentioned above:

{code:lang=java}
public interface ScanConfig {
  StructType schema()

  Filter[] pushedFilters()

  Expression[] pushedPredicates()

  // by default, the Builder doesn't push anything
  public interface Builder {
    Builder pushProjection(...)
    Builder pushFilters(...)
    default Builder pushPredicates(...) {
      return this;
    }
    Builder pushLimit(...)
    ScanConfig build()
  }
}
{code}


was (Author: rdblue):
[~cloud_fan], I'm adding some suggestions here because comments on the doc are 
good for discussion, but not really for longer content.

I like the separation of the batch, micro-batch, and streaming classes. That 
works well. I also like the addition of the Metadata class, though I'd use a 
more specific name.

There are a few naming changes I would make to be more specific and to preserve 
existing names or naming conventions:
* Instead of ReaderProvider, I think we should use ReaderFactory because that 
name corresponds to the write path and accurately describes the class
* I think we should continue to use InputPartition instead of InputSplit, even 
if we introduce a reader factory. (Probably also rename SplitReader to 
PartitionReader.)
* Metadata isn't specific so I think we should use ScanConfig instead
* getSplits should be planSplits because get implies a quick operation in Java. 
This is also consistent with the current API.

Next, instead of using mix-ins on Metadata / ScanConfig, I think a builder 
would help clarify the order of operations. If ScanConfig is mutable, then it 
could be passed to the other methods in different states. I'd rather use a 
Builder to make it Immutable. That way, implementations know that the Metadata 
/ ScanConfig doesn't change between calls to estimateStatistics and getSplits 
so results can be cached. To make this work, Spark would provide a Builder 
interface with default methods that do nothing. To implement pushdown, users 
just need to implement the methods. This also allows us to add new pushdown 
methods (like pushLimit) without introducing new interfaces.

I'd also like to see the classes reorganized a little to reduce the overall 
number of interfaces:

Metadata / ScanConfig contains all of the state that the DataSourceReader used 
to hold. If the DataSourceReader has no state, then its methods should be 
provided by the a single instance of the source instead. That would change the 
API to get rid of the Reader level and merge it into ReadSupport. Then 
ReadSupport would be used to create a ScanConfig and then BatchReadSupport (or 
similar) would be used to plan splits and get reader factories. I think this is 
easier for implementations.

{code:lang=java}
public interface ReadSupport {
  ScanConfig.Builder newScanBuilder();
}

public interface ReportsStatistics extends ReadSupport {
  Statistics estimateStatistics(ScanConfig)
}

public interface BatchReadSupport extends ReadSupport {
  InputSplit[] planSplits(ScanConfig)

  ReaderFactory readerFactory()
}

public interface MicroBatchReadSupport extends ScanSupport {
  InputPartition[] planSplits(ScanConfig, Offset start, Offset end)

  Offset initialOffset()

  MicroBatchReaderFactory readerFactory()
}

public interface ContinuousReadSupport extends ScanSupport {
  InputPartition[] planSplits(ScanConfig, Offset start)

  Offset initialOffset()

  ContinuousReaderFactory readerFactory()
}
{code}

Note that this change also cleans up the confusion around the use of Reader: 
the only Reader is a SplitReader that returns rows or row batches.

I would keep the same structure that you have for micro-batch, continuous, and 
batch ReaderFactory and SplitReader.

Here's a sketch of the ScanConfig and Builder I mentioned above:

{code:lang=java}
public interface ScanConfig {
  StructType schema()

  Filter[] pushedFilters()

  Expression[] pushedPredicates()

  // by default, the Builder doesn't push anything
  public interface Builder {
    Builder pushProjection(...)
    Builder pushFilters(...)
    default Builder pushPredicates(...) {
      return this;
    }
    Builder pushLimit(...)
    ScanConfig build()
  }
}
{code}

> separate responsibilities of the data source v2 read API
> --------------------------------------------------------
>
>                 Key: SPARK-24882
>                 URL: https://issues.apache.org/jira/browse/SPARK-24882
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Wenchen Fan
>            Assignee: Wenchen Fan
>            Priority: Major
>
> Data source V2 is out for a while, see the SPIP 
> [here|https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit?usp=sharing].
>  We have already migrated most of the built-in streaming data sources to the 
> V2 API, and the file source migration is in progress. During the migration, 
> we found several problems and want to address them before we stabilize the V2 
> API.
> To solve these problems, we need to separate responsibilities in the data 
> source v2 read API. Details please see the attached google doc: 
> https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to