[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20922


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-29 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r178232592
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---
@@ -64,6 +64,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
 if (loadTestDataBeforeTests) {
   loadTestData()
 }
+SparkSession.setActiveSession(spark)
--- End diff --

(This issue was spun off into #20926.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r17881
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.types._
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto 
increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
+ *becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
+ *seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
+ *be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
+ */
+class RateStreamProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateStreamProvider._
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
--- End diff --

We should not pass SparkSession just for getting hadoopConf, right? cc 
@zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r178031437
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.types._
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto 
increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
+ *becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
+ *seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
+ *be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
+ */
+class RateStreamProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateStreamProvider._
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
--- End diff --

we can change it. It only needs a hadoop conf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-29 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r178027822
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.types._
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto 
increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
+ *becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
+ *seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
+ *be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
+ */
+class RateStreamProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateStreamProvider._
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
--- End diff --

One of constructor parameter of `HDFSMetadataLog` is `SparkSession`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r178021180
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.types._
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto 
increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
+ *becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
+ *seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
+ *be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
+ */
+class RateStreamProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateStreamProvider._
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
--- End diff --

Why does `HDFSMetadataLog` need SparkSession?

For table description, we can put it in `DataSourceOptions`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-29 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r177989371
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.types._
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto 
increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
+ *becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
+ *seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
+ *be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
+ */
+class RateStreamProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateStreamProvider._
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
--- End diff --

For example if one specific source requires `HDFSMetadataLog` for recovery, 
then it requires `SparkSession`. Also for my case, I need to get table 
description from catalog, it also requires `SparkSession`. Though this may not 
be a typical use case, but I think it might be easy for user if we expose 
`SparkSession`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r177984724
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.types._
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto 
increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
+ *becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
+ *seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
+ *be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
+ */
+class RateStreamProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateStreamProvider._
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
--- End diff --

Can you give some concrete examples about why we need `SparkSession` in 
data source implementations? If it's only for config, we should use 
`DataSourceOptions`. If you wanna access some internal states of Spark, we 
should improve the interface to exposes these states explicitly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-28 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r177953871
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.types._
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto 
increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
+ *becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
+ *seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
+ *be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
+ */
+class RateStreamProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateStreamProvider._
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
--- End diff --

Thanks for the explanation @jose-torres . This seems like a quite common 
usage scenario, I also see that socket source and console sink require 
SparkSession, also in my customized hive streaming sink 
(https://github.com/jerryshao/spark-hive-streaming-sink/blob/7b3afcee280d2e70ffb12dde24184726b618829d/core/src/main/scala/com/hortonworks/spark/hive/HiveSourceProvider.scala#L46).
 If we add that parameter back, things might be much easier.

What's your opinion @cloud-fan ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-28 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r177943116
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.types._
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto 
increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
+ *becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
+ *seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
+ *be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
+ */
+class RateStreamProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateStreamProvider._
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
--- End diff --

I agree that there's a mismatch here.

The reason it doesn't currently have this parameter is that one of the 
DataSourceV2 design goals 
(https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit#heading=h.mi1fbff5f8f9)
 was to avoid API dependencies on upper level APIs like SparkSession. (IIRC 
Wenchen and I discussed SparkSession specifically in the design stage.) In this 
story, SparkSession.get{Active/Default}Session is just a way to keep our 
existing sources working rather than an encouraged development practice.

I agree that there's a mismatch which could be worth some discussion, but I 
think it's out of scope for this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-28 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r177933081
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.types._
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto 
increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
+ *becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
+ *seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
+ *be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
+ */
+class RateStreamProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateStreamProvider._
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
--- End diff --

What do you think @jose-torres @tdas @gatorsmile ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-28 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r177932994
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util.Optional
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.types._
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto 
increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be 
generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the 
generating speed
+ *becomes `rowsPerSecond`. Using finer granularities than seconds will 
be truncated to integer
+ *seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *generated rows. The source will try its best to reach 
`rowsPerSecond`, but the query may
+ *be resource constrained, and `numPartitions` can be tweaked to help 
reach the desired speed.
+ */
+class RateStreamProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateStreamProvider._
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
--- End diff --

Here if `MicrobatchReadSupport` could pass in `SparkSession` parameter like 
`StreamSourceProvider#createSource` (sqlContext), then it is not required to 
get session from thread local variable or default variable, also the UT doesn't 
required to `setDefaultSession`.

That's what I thought when I did this refactoring work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-28 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r177880353
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---
@@ -64,6 +64,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
 if (loadTestDataBeforeTests) {
   loadTestData()
 }
+SparkSession.setActiveSession(spark)
--- End diff --

Discussed offline. What we should do instead is set the default session 
when the test spark session is initialized, since that initialization doesn't 
invoke SparkSession.getOrCreate() which normally sets it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-28 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r177871475
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---
@@ -64,6 +64,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
 if (loadTestDataBeforeTests) {
   loadTestData()
 }
+SparkSession.setActiveSession(spark)
--- End diff --

The active session is required for instantiating the DataSourceReader, 
which is done at planning time (spark.readStream.{...}.load()) in order to 
determine the schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20922: Roll forward "[SPARK-23096][SS] Migrate rate sour...

2018-03-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20922#discussion_r177863902
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---
@@ -64,6 +64,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
 if (loadTestDataBeforeTests) {
   loadTestData()
 }
+SparkSession.setActiveSession(spark)
--- End diff --

The active session should be set before we execute the plan, right? For 
example, in QueryExecution for each query. What is the reason we need to do it 
here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org