Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20922#discussion_r177932994
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
    @@ -0,0 +1,125 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.util.Optional
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.sql.AnalysisException
    +import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2._
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
    +import org.apache.spark.sql.types._
    +
    +/**
    + *  A source that generates increment long values with timestamps. Each 
generated row has two
    + *  columns: a timestamp column for the generated time and an auto 
increment long column starting
    + *  with 0L.
    + *
    + *  This source supports the following options:
    + *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
    + *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
    + *    becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
    + *    seconds.
    + *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
    + *    generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
    + *    be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
    + */
    +class RateStreamProvider extends DataSourceV2
    +  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
    +  import RateStreamProvider._
    +
    +  override def createMicroBatchReader(
    +      schema: Optional[StructType],
    +      checkpointLocation: String,
    +      options: DataSourceOptions): MicroBatchReader = {
    --- End diff --
    
    Here if `MicrobatchReadSupport` could pass in `SparkSession` parameter like 
`StreamSourceProvider#createSource` (sqlContext), then it is not required to 
get session from thread local variable or default variable, also the UT doesn't 
required to `setDefaultSession`.
    
    That's what I thought when I did this refactoring work.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to