[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan closed the pull request at: https://github.com/apache/spark/pull/22547 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r230989559 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala --- @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType * scenarios, where some offsets after the specified initial ones can't be * properly read. */ -class KafkaContinuousReadSupport( +class KafkaContinuousInputStream( --- End diff -- Yea I'll separate this PR into 3 smaller ones, after we have agreed on the high-level design at https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r230973917 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala --- @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType * scenarios, where some offsets after the specified initial ones can't be * properly read. */ -class KafkaContinuousReadSupport( +class KafkaContinuousInputStream( --- End diff -- Makes sense. I really consider this to be a blocker on getting this merged and approved. It's difficult to have confidence in a review over such a large change. Thoughts @cloud-fan @rdblue? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r230528510 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala --- @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType * scenarios, where some offsets after the specified initial ones can't be * properly read. */ -class KafkaContinuousReadSupport( +class KafkaContinuousInputStream( --- End diff -- I'd prefer that the commits themselves compile, but since this is separating the modes I think it could be done incrementally. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r230505785 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala --- @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType * scenarios, where some offsets after the specified initial ones can't be * properly read. */ -class KafkaContinuousReadSupport( +class KafkaContinuousInputStream( --- End diff -- +1 for this. A lot of the changes right now are for moving around the streaming code especially, which makes it harder to isolate just the proposed API for review. An alternative is to make this PR separate commits that, while the commits themselves may not compile because of mismatching signatures - but all the commits taken together would compile, and each commit can be reviewed individually for assessing the API and then the implementation. For example I'd propose 3 PRs: * Batch reading, with a commit for the interface changes and a separate commit for the implementation changes * Micro Batch Streaming read, with a commit for the interface changes and a separate commit for the implementation changes * Continuous streaming read, similar to above Thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226812577 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for data source v2. Implementations must have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@InterfaceStability.Evolving +public interface Format extends DataSourceV2 { --- End diff -- the write API has not been migrated and still need `DataSourceV2` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226798538 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for data source v2. Implementations must have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@InterfaceStability.Evolving +public interface Format extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. path, table name, --- End diff -- Why is it necessary to pass table name and database to Format? Format should only be used in 2 places to create tables. First, in the DataFrameReader (or writer) API when a format is specified directly instead of a catalog/database/table or catalog/path. Second, it would be used in catalogs that support pluggable implementations, like the current session catalog, which needs to dynamically instantiate implementations based on the table's provider. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226798213 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for data source v2. Implementations must have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@InterfaceStability.Evolving +public interface Format extends DataSourceV2 { --- End diff -- Why is there both Format and DataSourceV2? What does DataSourceV2 do? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226796934 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -173,12 +185,17 @@ object DataSourceV2Relation { source: DataSourceV2, options: Map[String, String], tableIdent: Option[TableIdentifier] = None, - userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { -val readSupport = source.createReadSupport(options, userSpecifiedSchema) -val output = readSupport.fullSchema().toAttributes + userSpecifiedSchema: Option[StructType] = None): Option[DataSourceV2Relation] = { --- End diff -- This shouldn't return an option. A relation is not a read-side structure, it is also used in write-side logical plans as the target of a write. Validation rules like PreprocessTableInsertion validate the write dataframe against the relation's schema. That's why the relation has a newWriteSupport method. Creating a relation from a Table should always work, even if the table isn't readable or isn't writable. Analysis can be done later to validate whether the plan that contains a relation can actually use the table. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226790252 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/InputStream.java --- @@ -17,14 +17,18 @@ package org.apache.spark.sql.sources.v2.reader.streaming; -import org.apache.spark.sql.sources.v2.reader.ReadSupport; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; /** - * A base interface for streaming read support. This is package private and is invisible to data - * sources. Data sources should implement concrete streaming read support interfaces: - * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + * An interface representing a readable data stream in a streaming query. It's responsible to manage + * the offsets of the streaming source in this streaming query. + * + * Data sources should implement concrete input stream interfaces: {@link MicroBatchInputStream} and + * {@link ContinuousInputStream}. */ -interface StreamingReadSupport extends ReadSupport { +@InterfaceStability.Evolving +public interface InputStream extends BaseStreamingSource { --- End diff -- `InputStream` conflicts with a well-known JVM class, [`java.io.InputStream`](https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html). I think this should be renamed to be more specific to a streaming table scan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226789748 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -15,37 +15,43 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader; +package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.datasources.v2.NoopScanConfigBuilder; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; +import org.apache.spark.sql.types.StructType; /** - * An interface that defines how to load the data from data source for batch processing. + * An interface representing a logical structured data set of a data source. For example, the + * implementation can be a directory on the file system, or a table in the catalog, etc. * - * The execution engine will get an instance of this interface from a data source provider - * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch - * query, then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}. - * The {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in - * {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader - * factory to scan data from the data source with a Spark job. + * This interface can mixin the following interfaces to support different operations: + * + * {@link SupportsBatchRead}: this table can be read in batch queries. + * {@link SupportsMicroBatchRead}: this table can be read in streaming queries with + * micro-batch trigger. + * {@link SupportsContinuousRead}: this table can be read in streaming queries with + * continuous trigger. + * */ @InterfaceStability.Evolving -public interface BatchReadSupport extends ReadSupport { +public interface Table { + + /** + * Returns the schema of this table. + */ + StructType schema(); /** * Returns a builder of {@link ScanConfig}. Spark will call this method and create a * {@link ScanConfig} for each data scanning job. * * The builder can take some query specific information to do operators pushdown, and keep these * information in the created {@link ScanConfig}. - * - * This is the first step of the data scan. All other methods in {@link BatchReadSupport} needs - * to take {@link ScanConfig} as an input. - */ - ScanConfigBuilder newScanConfigBuilder(); - - /** - * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. */ - PartitionReaderFactory createReaderFactory(ScanConfig config); + default ScanConfigBuilder newScanConfigBuilder(DataSourceOptions options) { --- End diff -- I think it should be clear that these are scan-specific options. Maybe add some documentation with an example of something that would be passed to configure a scan, like a target split size for combining. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226789610 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.BatchScan; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * A mix-in interface for {@link Table}. Table implementations can mixin this interface to + * provide data reading ability for batch processing. + */ +@InterfaceStability.Evolving +public interface SupportsBatchRead extends Table { + + /** + * Creates a {@link BatchScan} instance with a {@link ScanConfig} and user-specified options. + * + * @param config a {@link ScanConfig} which may contains operator pushdown information. + * @param options the user-specified options, which is same as the one used to create the + *{@link ScanConfigBuilder} that built the given {@link ScanConfig}. + */ + BatchScan createBatchScan(ScanConfig config, DataSourceOptions options); --- End diff -- Is there a benefit to having both `ScanConfig` and `BatchScan` objects? Why not have `ScanConfigBuilder` return a `BatchScan` directly by calling `buildBatch`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226785695 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchScan.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.Table; + +/** + * A {@link Scan} for batch queries. + * + * The execution engine will get an instance of {@link Table} first, then call + * {@link Table#newScanConfigBuilder(DataSourceOptions)} and create an instance of + * {@link ScanConfig}. The {@link ScanConfigBuilder} can apply operator pushdown and keep the + * pushdown result in {@link ScanConfig}. Then + * {@link SupportsBatchRead#createBatchScan(ScanConfig, DataSourceOptions)} will be called to create + * a {@link BatchScan} instance, which will be used to create input partitions and reader factory to + * scan data from the data source with a Spark job. + */ +@InterfaceStability.Evolving +public interface BatchScan extends Scan { + + /** + * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. + */ + PartitionReaderFactory createReaderFactory(); --- End diff -- Why are `BatchScan` and `PartitionReaderFactory` different interfaces? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226784919 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.BatchScan; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * A mix-in interface for {@link Table}. Table implementations can mixin this interface to + * provide data reading ability for batch processing. + */ +@InterfaceStability.Evolving +public interface SupportsBatchRead extends Table { + + /** + * Creates a {@link BatchScan} instance with a {@link ScanConfig} and user-specified options. + * + * @param config a {@link ScanConfig} which may contains operator pushdown information. + * @param options the user-specified options, which is same as the one used to create the + *{@link ScanConfigBuilder} that built the given {@link ScanConfig}. --- End diff -- I don't think that options should be passed twice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226783272 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister failOnDataLoss(caseInsensitiveParams)) } - /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read - * batches of Kafka data in a micro-batch streaming query. - */ - override def createMicroBatchReadSupport( - metadataPath: String, - options: DataSourceOptions): KafkaMicroBatchReadSupport = { - -val parameters = options.asMap().asScala.toMap -validateStreamOptions(parameters) -// Each running query should use its own group id. Otherwise, the query may be only assigned -// partial data since Kafka will assign partitions to multiple consumers having the same group -// id. Hence, we should generate a unique id for each query. -val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - -val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } -val specifiedKafkaParams = - parameters -.keySet -.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) -.map { k => k.drop(6).toString -> parameters(k) } -.toMap - -val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) - -val kafkaOffsetReader = new KafkaOffsetReader( - strategy(caseInsensitiveParams), - kafkaParamsForDriver(specifiedKafkaParams), - parameters, - driverGroupIdPrefix = s"$uniqueGroupId-driver") - -new KafkaMicroBatchReadSupport( - kafkaOffsetReader, - kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), - options, - metadataPath, - startingStreamOffsets, - failOnDataLoss(caseInsensitiveParams)) + override def getTable(options: DataSourceOptions): KafkaTable.type = { +KafkaTable } - /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read - * Kafka data in a continuous streaming query. - */ - override def createContinuousReadSupport( - metadataPath: String, - options: DataSourceOptions): KafkaContinuousReadSupport = { -val parameters = options.asMap().asScala.toMap -validateStreamOptions(parameters) -// Each running query should use its own group id. Otherwise, the query may be only assigned -// partial data since Kafka will assign partitions to multiple consumers having the same group -// id. Hence, we should generate a unique id for each query. -val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - -val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } -val specifiedKafkaParams = - parameters -.keySet -.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) -.map { k => k.drop(6).toString -> parameters(k) } -.toMap + object KafkaTable extends Table --- End diff -- Why is `KafkaTable` an object, not a class? This doesn't seem to fit an abstraction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226782371 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala --- @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType * scenarios, where some offsets after the specified initial ones can't be * properly read. */ -class KafkaContinuousReadSupport( +class KafkaContinuousInputStream( --- End diff -- Is it possible to break this change into multiple PRs for batch, microbatch, and continuous? It's really large and it would be nice if we could get the changes in incrementally. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226363445 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -381,7 +390,7 @@ class StreamSuite extends StreamTest { test("insert an extraStrategy") { try { - spark.experimental.extraStrategies = TestStrategy :: Nil + spark.experimental.extraStrategies = CustomStrategy :: Nil --- End diff -- Since we need to do a temporary planning for streaming queries, we can't allow custom strategy to remove streaming leaf nodes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226363020 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -154,21 +159,25 @@ class StreamSuite extends StreamTest { } test("SPARK-20432: union one stream with itself") { -val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a") -val unioned = df.union(df) -withTempDir { outputDir => - withTempDir { checkpointDir => -val query = - unioned -.writeStream.format("parquet") -.option("checkpointLocation", checkpointDir.getAbsolutePath) -.start(outputDir.getAbsolutePath) -try { - query.processAllAvailable() - val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long] - checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 10L)).toArray: _*) -} finally { - query.stop() +val v1Source = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a") +val v2Source = spark.readStream.format(classOf[FakeFormat].getName).load().select("a") + +Seq(v1Source, v2Source).foreach { df => --- End diff -- improve this test to make sure v2 also works. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226361309 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala --- @@ -319,29 +307,18 @@ class RateSourceSuite extends StreamTest { "rate source does not support user-specified schema")) } - test("continuous in registry") { --- End diff -- we don't need this test now. With the new `Format` abstraction, the lookup logic is unified between microbatch and continuous --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226359031 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchInputStream.scala --- @@ -60,6 +59,14 @@ class RateStreamMicroBatchReadSupport(options: DataSourceOptions, checkpointLoca s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.") } + private val numPartitions = { --- End diff -- moved from https://github.com/apache/spark/pull/22547/files#diff-6cd4de793a1c68d3d9415a246823b55eL151 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226355931 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -90,6 +140,8 @@ class ContinuousExecution( do { runContinuous(sparkSessionForStream) } while (state.updateAndGet(stateUpdate) == ACTIVE) + +stopSources() --- End diff -- with the new abstraction, we should only stop sources when the stream query ends, instead of each reconfiguration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226338580 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.BatchScan; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * A mix-in interface for {@link Table}. Table implementations can mixin this interface to + * provide data reading ability for batch processing. + */ +@InterfaceStability.Evolving +public interface SupportsBatchRead extends Table { + + /** + * Creates a {@link BatchScan} instance with a {@link ScanConfig} and user-specified options. + * + * @param config a {@link ScanConfig} which may contains operator pushdown information. + * @param options the user-specified options, which is same as the one used to create the + *{@link ScanConfigBuilder} that built the given {@link ScanConfig}. --- End diff -- Another choice is to let `ScanConfig` carry the options. But `ScanConfig` is an interface and doing this will put more work at user side, so I decided to pass the options again here. Feedbacks are welcome! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r220275520 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala --- @@ -207,13 +207,13 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { testUtils.createTopic(topic2, partitions = 5) eventually(timeout(streamingTimeout)) { assert( -query.lastExecution.executedPlan.collectFirst { - case scan: DataSourceV2ScanExec -if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => -scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] -}.exists { config => +query.lastExecution.logical.collectFirst { --- End diff -- now the known partitions is tracked by the `KafkaContinuousInputStream` in logical plan: https://github.com/apache/spark/pull/22547/files#diff-5fa6c9fc023183f4a855f778944d23ebR62 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r220275173 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister failOnDataLoss(caseInsensitiveParams)) } - /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read - * batches of Kafka data in a micro-batch streaming query. - */ - override def createMicroBatchReadSupport( - metadataPath: String, - options: DataSourceOptions): KafkaMicroBatchReadSupport = { - -val parameters = options.asMap().asScala.toMap -validateStreamOptions(parameters) -// Each running query should use its own group id. Otherwise, the query may be only assigned -// partial data since Kafka will assign partitions to multiple consumers having the same group -// id. Hence, we should generate a unique id for each query. -val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - -val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } -val specifiedKafkaParams = - parameters -.keySet -.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) -.map { k => k.drop(6).toString -> parameters(k) } -.toMap - -val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) - -val kafkaOffsetReader = new KafkaOffsetReader( - strategy(caseInsensitiveParams), - kafkaParamsForDriver(specifiedKafkaParams), - parameters, - driverGroupIdPrefix = s"$uniqueGroupId-driver") - -new KafkaMicroBatchReadSupport( - kafkaOffsetReader, - kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), - options, - metadataPath, - startingStreamOffsets, - failOnDataLoss(caseInsensitiveParams)) + override def getTable(options: DataSourceOptions): KafkaTable.type = { +KafkaTable } - /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read - * Kafka data in a continuous streaming query. - */ - override def createContinuousReadSupport( - metadataPath: String, - options: DataSourceOptions): KafkaContinuousReadSupport = { -val parameters = options.asMap().asScala.toMap -validateStreamOptions(parameters) -// Each running query should use its own group id. Otherwise, the query may be only assigned -// partial data since Kafka will assign partitions to multiple consumers having the same group -// id. Hence, we should generate a unique id for each query. -val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - -val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } -val specifiedKafkaParams = - parameters -.keySet -.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) -.map { k => k.drop(6).toString -> parameters(k) } -.toMap + object KafkaTable extends Table +with SupportsMicroBatchRead with SupportsContinuousRead { -val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) +override def schema(): StructType = KafkaOffsetReader.kafkaSchema -val kafkaOffsetReader = new KafkaOffsetReader( - strategy(caseInsensitiveParams), - kafkaParamsForDriver(specifiedKafkaParams), - parameters, - driverGroupIdPrefix = s"$uniqueGroupId-driver") +/** + * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchInputStream]] to read + * batches of Kafka data in a micro-batch streaming query. + */ +override def createMicroBatchInputStream( +checkpointLocation: String, +config: ScanConfig, +options: DataSourceOptions): MicroBatchInputStream = { + val parameters = options.asMap().asScala.toMap + validateStreamOptions(parameters) + // Each running query should use its own group id. Otherwise, the query may be only assigned + // partial data since Kafka will assign partitions to multiple consumers having the same group + // id. Hence, we should generate a unique id for each query. + val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${che
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r220274862 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchInputStream.scala --- @@ -294,6 +227,88 @@ private[kafka010] class KafkaMicroBatchReadSupport( } } +private[kafka010] class KafkaMicroBatchScan( +kafkaOffsetReader: KafkaOffsetReader, +rangeCalculator: KafkaOffsetRangeCalculator, +executorKafkaParams: ju.Map[String, Object], +pollTimeoutMs: Long, +failOnDataLoss: Boolean, +reportDataLoss: String => Unit, +start: KafkaSourceOffset, +end: KafkaSourceOffset) extends MicroBatchScan with Logging { + + override def createReaderFactory(): PartitionReaderFactory = { +KafkaMicroBatchReaderFactory + } + + override def planInputPartitions(): Array[InputPartition] = { +val startPartitionOffsets = start.partitionToOffsets --- End diff -- moved from https://github.com/apache/spark/pull/22547/files#diff-314d02b954fc05ec7ae687dd486a8e84L104 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r220275016 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister failOnDataLoss(caseInsensitiveParams)) } - /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read - * batches of Kafka data in a micro-batch streaming query. - */ - override def createMicroBatchReadSupport( - metadataPath: String, - options: DataSourceOptions): KafkaMicroBatchReadSupport = { - -val parameters = options.asMap().asScala.toMap -validateStreamOptions(parameters) -// Each running query should use its own group id. Otherwise, the query may be only assigned -// partial data since Kafka will assign partitions to multiple consumers having the same group -// id. Hence, we should generate a unique id for each query. -val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - -val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } -val specifiedKafkaParams = - parameters -.keySet -.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) -.map { k => k.drop(6).toString -> parameters(k) } -.toMap - -val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) - -val kafkaOffsetReader = new KafkaOffsetReader( - strategy(caseInsensitiveParams), - kafkaParamsForDriver(specifiedKafkaParams), - parameters, - driverGroupIdPrefix = s"$uniqueGroupId-driver") - -new KafkaMicroBatchReadSupport( - kafkaOffsetReader, - kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), - options, - metadataPath, - startingStreamOffsets, - failOnDataLoss(caseInsensitiveParams)) + override def getTable(options: DataSourceOptions): KafkaTable.type = { +KafkaTable } - /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read - * Kafka data in a continuous streaming query. - */ - override def createContinuousReadSupport( - metadataPath: String, - options: DataSourceOptions): KafkaContinuousReadSupport = { -val parameters = options.asMap().asScala.toMap -validateStreamOptions(parameters) -// Each running query should use its own group id. Otherwise, the query may be only assigned -// partial data since Kafka will assign partitions to multiple consumers having the same group -// id. Hence, we should generate a unique id for each query. -val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - -val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } -val specifiedKafkaParams = - parameters -.keySet -.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) -.map { k => k.drop(6).toString -> parameters(k) } -.toMap + object KafkaTable extends Table +with SupportsMicroBatchRead with SupportsContinuousRead { -val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) +override def schema(): StructType = KafkaOffsetReader.kafkaSchema -val kafkaOffsetReader = new KafkaOffsetReader( - strategy(caseInsensitiveParams), - kafkaParamsForDriver(specifiedKafkaParams), - parameters, - driverGroupIdPrefix = s"$uniqueGroupId-driver") +/** + * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchInputStream]] to read + * batches of Kafka data in a micro-batch streaming query. + */ +override def createMicroBatchInputStream( +checkpointLocation: String, +config: ScanConfig, +options: DataSourceOptions): MicroBatchInputStream = { + val parameters = options.asMap().asScala.toMap --- End diff -- moved from https://github.com/apache/spark/pull/22547/files#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05L117 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r220274562 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala --- @@ -67,28 +71,29 @@ class KafkaContinuousReadSupport( offsets } - override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema - - override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = { -new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, reportDataLoss) - } - override def deserializeOffset(json: String): Offset = { KafkaSourceOffset(JsonUtils.partitionOffsets(json)) } - override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { -val startOffsets = config.asInstanceOf[KafkaContinuousScanConfig].startOffsets -startOffsets.toSeq.map { - case (topicPartition, start) => -KafkaContinuousInputPartition( - topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss) -}.toArray - } + override def createContinuousScan(start: Offset): ContinuousScan = { +val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start) --- End diff -- moved from https://github.com/apache/spark/pull/22547/files#diff-5fa6c9fc023183f4a855f778944d23ebL162 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/22547 [SPARK-25528][SQL] data source V2 read side API refactoring ## What changes were proposed in this pull request? Refactor the read side API according to the abstraction proposed in the [dev list](http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html) ``` batch: catalog -> table -> scan streaming: catalog -> table -> stream -> scan ``` More concretely, this PR 1. add a new interface called `Format` that can return `Table` 2. rename `ReadSupportProvider` to `Table`, represents a logical data set, with a schema. 3. add a new interface `InputStream` to represent a streaming source in a streaming query. It can create `Scan`s. 4. rename `ReadSupport` to `Scan`. Each `Scan` triggers one Spark job. (like an RDD) ## How was this patch tested? existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark new-idea Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22547.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22547 commit 92dfdaf990f2676d49766f5ab094e8b8a9a755b1 Author: Wenchen Fan Date: 2018-08-27T15:20:08Z data source V2 read side API refactoring --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org