[ 
https://issues.apache.org/jira/browse/SPARK-25390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178052#comment-17178052
 ] 

Rafael edited comment on SPARK-25390 at 8/15/20, 6:26 PM:
----------------------------------------------------------

Hey guys [~cloud_fan] [~b...@cloudera.com]

I'm trying to migrate the package where I was using *import 
org.apache.spark.sql.sources.v2._ into Spark3.0.0*

and haven't found a good guide so may I ask my questions here.

Here my migration plan can you highlight what interfaces should I use now

1. Cannot find what should I use instead of ReadSupport, ReadSupport, 
DataSourceReader,

if instead of ReadSupport we have to use now Scan then what happened to method 
createReader? 
{code:java}
class DefaultSource extends ReadSupport { 
  override def createReader(options: DataSourceOptions): DataSourceReader = new 
GeneratingReader() 
}
{code}
2. 

Here instead of
{code:java}
import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, 
Partitioning}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
SupportsReportPartitioning}
{code}
I should use
{code:java}
import org.apache.spark.sql.connector.read.partitioning.{Distribution, 
Partitioning}
import org.apache.spark.sql.connector.read.{InputPartition, 
SupportsReportPartitioning}
{code}
right?
{code:java}
class GeneratingReader() extends DataSourceReader {
  override def readSchema(): StructType = {...}
  override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
    val partitions = new util.ArrayList[InputPartition[InternalRow]]()
    ...
    partitions.add(new GeneratingInputPartition(...))
  }
  override def outputPartitioning(): Partitioning = {...}
}
{code}
3.

Haven't found what should I use instead of
{code:java}
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader{code}
if a new interface is from package read then it has totally different new 
contract.
{code:java}
import org.apache.spark.sql.connector.read.InputPartition{code}
{code:java}
class GeneratingInputPartition() extends InputPartition[InternalRow] {
  override def createPartitionReader(): InputPartitionReader[InternalRow] = new 
 GeneratingInputPartitionReader(...)
}

class GeneratingInputPartitionReader() extends 
InputPartitionReader[InternalRow] {
   override def next(): Boolean = ...
   override def get(): InternalRow = ...
   override def close(): Unit = ...
}
{code}
And usage will remain the same?
{code:java}
val df = sparkSession.read
  .format("sources.v2.generating")
  .option(OPT_PARTITIONS, calculateNumberOfPartitions(numberOfRows))
  .option(OPT_DESCRIPTOR, descriptorJson)
  .option(OPT_SOURCE_TYPE, 
getSqlName(jobExecutionPlanContext.jobDescriptor.jobHeader.sourceConnection))
  .load()
{code}


was (Author: kyrdan):
Hey guys [~cloud_fan] [~b...@cloudera.com]

I'm trying to migrate the package where I was using *import 
org.apache.spark.sql.sources.v2._ into Spark3.0.0*

and haven't found a good guide so may I ask my questions here.

Here my migration plan can you highlight what interfaces should I use now

1. Cannot find what should I use instead of ReadSupport, ReadSupport, 
DataSourceReader,

if instead of ReadSupport we have to use now Scan then what happened to method 
createReader? 
{code:java}
class DefaultSource extends ReadSupport { 
  override def createReader(options: DataSourceOptions): DataSourceReader = new 
GeneratingReader() 
}
{code}
2. 

Here instead of
{code:java}
import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, 
Partitioning}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
SupportsReportPartitioning}
{code}
I should use
{code:java}
import org.apache.spark.sql.connector.read.partitioning.{Distribution, 
Partitioning}
import org.apache.spark.sql.connector.read.{InputPartition, 
SupportsReportPartitioning}
{code}
right?
{code:java}
class GeneratingReader() extends DataSourceReader {
  override def readSchema(): StructType = {...}
  override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
    val partitions = new util.ArrayList[InputPartition[InternalRow]]()
    ...
    partitions.add(new GeneratingInputPartition(...))
  }
  override def outputPartitioning(): Partitioning = {...}
}
{code}
3.

Haven't found what should I use instead of
{code:java}
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader{code}
if a new interface is from package read then it has totally different new 
contract.
{code:java}
import org.apache.spark.sql.connector.read.InputPartition{code}
{code:java}
class GeneratingInputPartition() extends InputPartition[InternalRow] {
  override def createPartitionReader(): InputPartitionReader[InternalRow] = new 
 GeneratingInputPartitionReader(...)
}

class GeneratingInputPartitionReader() extends 
InputPartitionReader[InternalRow] {
   override def next(): Boolean = ...
   override def get(): InternalRow = ...
   override def close(): Unit = ...
}
{code}

> Data source V2 API refactoring
> ------------------------------
>
>                 Key: SPARK-25390
>                 URL: https://issues.apache.org/jira/browse/SPARK-25390
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Wenchen Fan
>            Assignee: Wenchen Fan
>            Priority: Major
>             Fix For: 3.0.0
>
>
> Currently it's not very clear how we should abstract data source v2 API. The 
> abstraction should be unified between batch and streaming, or similar but 
> have a well-defined difference between batch and streaming. And the 
> abstraction should also include catalog/table.
> An example of the abstraction:
> {code}
> batch: catalog -> table -> scan
> streaming: catalog -> table -> stream -> scan
> {code}
> We should refactor the data source v2 API according to the abstraction



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to