[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20922 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r178232592 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala --- @@ -64,6 +64,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with if (loadTestDataBeforeTests) { loadTestData() } +SparkSession.setActiveSession(spark) --- End diff -- (This issue was spun off into #20926.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r17881 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader} +import org.apache.spark.sql.types._ + +/** + * A source that generates increment long values with timestamps. Each generated row has two + * columns: a timestamp column for the generated time and an auto increment long column starting + * with 0L. + * + * This source supports the following options: + * - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. + * - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed + *becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer + *seconds. + * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the + *generated rows. The source will try its best to reach `rowsPerSecond`, but the query may + *be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. + */ +class RateStreamProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateStreamProvider._ + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { --- End diff -- We should not pass SparkSession just for getting hadoopConf, right? cc @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r178031437 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader} +import org.apache.spark.sql.types._ + +/** + * A source that generates increment long values with timestamps. Each generated row has two + * columns: a timestamp column for the generated time and an auto increment long column starting + * with 0L. + * + * This source supports the following options: + * - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. + * - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed + *becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer + *seconds. + * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the + *generated rows. The source will try its best to reach `rowsPerSecond`, but the query may + *be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. + */ +class RateStreamProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateStreamProvider._ + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { --- End diff -- we can change it. It only needs a hadoop conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r178027822 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader} +import org.apache.spark.sql.types._ + +/** + * A source that generates increment long values with timestamps. Each generated row has two + * columns: a timestamp column for the generated time and an auto increment long column starting + * with 0L. + * + * This source supports the following options: + * - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. + * - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed + *becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer + *seconds. + * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the + *generated rows. The source will try its best to reach `rowsPerSecond`, but the query may + *be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. + */ +class RateStreamProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateStreamProvider._ + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { --- End diff -- One of constructor parameter of `HDFSMetadataLog` is `SparkSession`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r178021180 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader} +import org.apache.spark.sql.types._ + +/** + * A source that generates increment long values with timestamps. Each generated row has two + * columns: a timestamp column for the generated time and an auto increment long column starting + * with 0L. + * + * This source supports the following options: + * - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. + * - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed + *becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer + *seconds. + * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the + *generated rows. The source will try its best to reach `rowsPerSecond`, but the query may + *be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. + */ +class RateStreamProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateStreamProvider._ + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { --- End diff -- Why does `HDFSMetadataLog` need SparkSession? For table description, we can put it in `DataSourceOptions`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r177989371 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader} +import org.apache.spark.sql.types._ + +/** + * A source that generates increment long values with timestamps. Each generated row has two + * columns: a timestamp column for the generated time and an auto increment long column starting + * with 0L. + * + * This source supports the following options: + * - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. + * - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed + *becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer + *seconds. + * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the + *generated rows. The source will try its best to reach `rowsPerSecond`, but the query may + *be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. + */ +class RateStreamProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateStreamProvider._ + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { --- End diff -- For example if one specific source requires `HDFSMetadataLog` for recovery, then it requires `SparkSession`. Also for my case, I need to get table description from catalog, it also requires `SparkSession`. Though this may not be a typical use case, but I think it might be easy for user if we expose `SparkSession`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r177984724 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader} +import org.apache.spark.sql.types._ + +/** + * A source that generates increment long values with timestamps. Each generated row has two + * columns: a timestamp column for the generated time and an auto increment long column starting + * with 0L. + * + * This source supports the following options: + * - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. + * - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed + *becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer + *seconds. + * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the + *generated rows. The source will try its best to reach `rowsPerSecond`, but the query may + *be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. + */ +class RateStreamProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateStreamProvider._ + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { --- End diff -- Can you give some concrete examples about why we need `SparkSession` in data source implementations? If it's only for config, we should use `DataSourceOptions`. If you wanna access some internal states of Spark, we should improve the interface to exposes these states explicitly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r177953871 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader} +import org.apache.spark.sql.types._ + +/** + * A source that generates increment long values with timestamps. Each generated row has two + * columns: a timestamp column for the generated time and an auto increment long column starting + * with 0L. + * + * This source supports the following options: + * - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. + * - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed + *becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer + *seconds. + * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the + *generated rows. The source will try its best to reach `rowsPerSecond`, but the query may + *be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. + */ +class RateStreamProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateStreamProvider._ + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { --- End diff -- Thanks for the explanation @jose-torres . This seems like a quite common usage scenario, I also see that socket source and console sink require SparkSession, also in my customized hive streaming sink (https://github.com/jerryshao/spark-hive-streaming-sink/blob/7b3afcee280d2e70ffb12dde24184726b618829d/core/src/main/scala/com/hortonworks/spark/hive/HiveSourceProvider.scala#L46). If we add that parameter back, things might be much easier. What's your opinion @cloud-fan ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r177943116 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader} +import org.apache.spark.sql.types._ + +/** + * A source that generates increment long values with timestamps. Each generated row has two + * columns: a timestamp column for the generated time and an auto increment long column starting + * with 0L. + * + * This source supports the following options: + * - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. + * - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed + *becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer + *seconds. + * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the + *generated rows. The source will try its best to reach `rowsPerSecond`, but the query may + *be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. + */ +class RateStreamProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateStreamProvider._ + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { --- End diff -- I agree that there's a mismatch here. The reason it doesn't currently have this parameter is that one of the DataSourceV2 design goals (https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit#heading=h.mi1fbff5f8f9) was to avoid API dependencies on upper level APIs like SparkSession. (IIRC Wenchen and I discussed SparkSession specifically in the design stage.) In this story, SparkSession.get{Active/Default}Session is just a way to keep our existing sources working rather than an encouraged development practice. I agree that there's a mismatch which could be worth some discussion, but I think it's out of scope for this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r177933081 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader} +import org.apache.spark.sql.types._ + +/** + * A source that generates increment long values with timestamps. Each generated row has two + * columns: a timestamp column for the generated time and an auto increment long column starting + * with 0L. + * + * This source supports the following options: + * - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. + * - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed + *becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer + *seconds. + * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the + *generated rows. The source will try its best to reach `rowsPerSecond`, but the query may + *be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. + */ +class RateStreamProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateStreamProvider._ + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { --- End diff -- What do you think @jose-torres @tdas @gatorsmile ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r177932994 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader} +import org.apache.spark.sql.types._ + +/** + * A source that generates increment long values with timestamps. Each generated row has two + * columns: a timestamp column for the generated time and an auto increment long column starting + * with 0L. + * + * This source supports the following options: + * - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. + * - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed + *becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer + *seconds. + * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the + *generated rows. The source will try its best to reach `rowsPerSecond`, but the query may + *be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. + */ +class RateStreamProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateStreamProvider._ + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { --- End diff -- Here if `MicrobatchReadSupport` could pass in `SparkSession` parameter like `StreamSourceProvider#createSource` (sqlContext), then it is not required to get session from thread local variable or default variable, also the UT doesn't required to `setDefaultSession`. That's what I thought when I did this refactoring work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r177880353 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala --- @@ -64,6 +64,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with if (loadTestDataBeforeTests) { loadTestData() } +SparkSession.setActiveSession(spark) --- End diff -- Discussed offline. What we should do instead is set the default session when the test spark session is initialized, since that initialization doesn't invoke SparkSession.getOrCreate() which normally sets it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r177871475 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala --- @@ -64,6 +64,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with if (loadTestDataBeforeTests) { loadTestData() } +SparkSession.setActiveSession(spark) --- End diff -- The active session is required for instantiating the DataSourceReader, which is done at planning time (spark.readStream.{...}.load()) in order to determine the schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20922#discussion_r177863902 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala --- @@ -64,6 +64,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with if (loadTestDataBeforeTests) { loadTestData() } +SparkSession.setActiveSession(spark) --- End diff -- The active session should be set before we execute the plan, right? For example, in QueryExecution for each query. What is the reason we need to do it here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org