[ https://issues.apache.org/jira/browse/SPARK-25390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178052#comment-17178052 ]
Rafael edited comment on SPARK-25390 at 8/15/20, 6:26 PM: ---------------------------------------------------------- Hey guys [~cloud_fan] [~b...@cloudera.com] I'm trying to migrate the package where I was using *import org.apache.spark.sql.sources.v2._ into Spark3.0.0* and haven't found a good guide so may I ask my questions here. Here my migration plan can you highlight what interfaces should I use now 1. Cannot find what should I use instead of ReadSupport, ReadSupport, DataSourceReader, if instead of ReadSupport we have to use now Scan then what happened to method createReader? {code:java} class DefaultSource extends ReadSupport { override def createReader(options: DataSourceOptions): DataSourceReader = new GeneratingReader() } {code} 2. Here instead of {code:java} import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, Partitioning} import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsReportPartitioning} {code} I should use {code:java} import org.apache.spark.sql.connector.read.partitioning.{Distribution, Partitioning} import org.apache.spark.sql.connector.read.{InputPartition, SupportsReportPartitioning} {code} right? {code:java} class GeneratingReader() extends DataSourceReader { override def readSchema(): StructType = {...} override def planInputPartitions(): util.List[InputPartition[InternalRow]] = { val partitions = new util.ArrayList[InputPartition[InternalRow]]() ... partitions.add(new GeneratingInputPartition(...)) } override def outputPartitioning(): Partitioning = {...} } {code} 3. Haven't found what should I use instead of {code:java} import org.apache.spark.sql.sources.v2.reader.InputPartition import org.apache.spark.sql.sources.v2.reader.InputPartitionReader{code} if a new interface is from package read then it has totally different new contract. {code:java} import org.apache.spark.sql.connector.read.InputPartition{code} {code:java} class GeneratingInputPartition() extends InputPartition[InternalRow] { override def createPartitionReader(): InputPartitionReader[InternalRow] = new GeneratingInputPartitionReader(...) } class GeneratingInputPartitionReader() extends InputPartitionReader[InternalRow] { override def next(): Boolean = ... override def get(): InternalRow = ... override def close(): Unit = ... } {code} And usage will remain the same? {code:java} val df = sparkSession.read .format("sources.v2.generating") .option(OPT_PARTITIONS, calculateNumberOfPartitions(numberOfRows)) .option(OPT_DESCRIPTOR, descriptorJson) .option(OPT_SOURCE_TYPE, getSqlName(jobExecutionPlanContext.jobDescriptor.jobHeader.sourceConnection)) .load() {code} was (Author: kyrdan): Hey guys [~cloud_fan] [~b...@cloudera.com] I'm trying to migrate the package where I was using *import org.apache.spark.sql.sources.v2._ into Spark3.0.0* and haven't found a good guide so may I ask my questions here. Here my migration plan can you highlight what interfaces should I use now 1. Cannot find what should I use instead of ReadSupport, ReadSupport, DataSourceReader, if instead of ReadSupport we have to use now Scan then what happened to method createReader? {code:java} class DefaultSource extends ReadSupport { override def createReader(options: DataSourceOptions): DataSourceReader = new GeneratingReader() } {code} 2. Here instead of {code:java} import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, Partitioning} import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsReportPartitioning} {code} I should use {code:java} import org.apache.spark.sql.connector.read.partitioning.{Distribution, Partitioning} import org.apache.spark.sql.connector.read.{InputPartition, SupportsReportPartitioning} {code} right? {code:java} class GeneratingReader() extends DataSourceReader { override def readSchema(): StructType = {...} override def planInputPartitions(): util.List[InputPartition[InternalRow]] = { val partitions = new util.ArrayList[InputPartition[InternalRow]]() ... partitions.add(new GeneratingInputPartition(...)) } override def outputPartitioning(): Partitioning = {...} } {code} 3. Haven't found what should I use instead of {code:java} import org.apache.spark.sql.sources.v2.reader.InputPartition import org.apache.spark.sql.sources.v2.reader.InputPartitionReader{code} if a new interface is from package read then it has totally different new contract. {code:java} import org.apache.spark.sql.connector.read.InputPartition{code} {code:java} class GeneratingInputPartition() extends InputPartition[InternalRow] { override def createPartitionReader(): InputPartitionReader[InternalRow] = new GeneratingInputPartitionReader(...) } class GeneratingInputPartitionReader() extends InputPartitionReader[InternalRow] { override def next(): Boolean = ... override def get(): InternalRow = ... override def close(): Unit = ... } {code} > Data source V2 API refactoring > ------------------------------ > > Key: SPARK-25390 > URL: https://issues.apache.org/jira/browse/SPARK-25390 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.0.0 > Reporter: Wenchen Fan > Assignee: Wenchen Fan > Priority: Major > Fix For: 3.0.0 > > > Currently it's not very clear how we should abstract data source v2 API. The > abstraction should be unified between batch and streaming, or similar but > have a well-defined difference between batch and streaming. And the > abstraction should also include catalog/table. > An example of the abstraction: > {code} > batch: catalog -> table -> scan > streaming: catalog -> table -> stream -> scan > {code} > We should refactor the data source v2 API according to the abstraction -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org