[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r239670323 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -631,6 +631,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sqlstreaming.watermark.enable") +.doc("Whether use watermark in sqlstreaming.") +.booleanConf +.createWithDefault(false) + + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") +.doc("The output mode used in sqlstreaming") +.stringConf +.createWithDefault("append") + + val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger") --- End diff -- > insert into kafka_sql_out select stream t1.value from (select cast(value as string), timestamp as time1 from kafka_sql_in1) as t1 inner join (select cast(value as string), timestamp as time2 from kafka_sql_in2) as t2 on time1 >= time2 and time1 <= time2 + interval 10 seconds where t1.value == t2.value No, SQLStreaming support stream join stream. The watermark config is put in the table properties. As for trigger interval, different sources in stream join stream scene needs different trigger config? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r239500890 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -631,6 +631,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sqlstreaming.watermark.enable") +.doc("Whether use watermark in sqlstreaming.") +.booleanConf +.createWithDefault(false) + + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") +.doc("The output mode used in sqlstreaming") +.stringConf +.createWithDefault("append") + + val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger") --- End diff -- so here stream-stream join is not supported right? to elaborate can i create two stream source tables and then join both and write to sink? because if i want to create two streams for 2 different topics, i may need to provide different configurations for watermark or window or rigger interval. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r239113033 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -631,6 +631,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sqlstreaming.watermark.enable") +.doc("Whether use watermark in sqlstreaming.") +.booleanConf +.createWithDefault(false) + + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") +.doc("The output mode used in sqlstreaming") +.stringConf +.createWithDefault("append") + + val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger") --- End diff -- I don't think there are any problems with this. SQLStreaming is using Command to run streaming query, which is similar to InsertIntoHiveTable. herefore, the batch SQL and streaming SQL solution is expected.In addition, currently an application can only run one streaming SQL. Therefore, the batch SQL and streaming SQL solution is expected --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r239109280 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils +import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.Utils + +/** + * The basic RunnableCommand for SQLStreaming, using Command.run to start a streaming query. + * + * @param sparkSession + * @param extraOptions + * @param partitionColumnNames + * @param child + */ +case class SQLStreamingSink(sparkSession: SparkSession, +table: CatalogTable, +child: LogicalPlan) + extends RunnableCommand { + + private val sqlConf = sparkSession.sqlContext.conf + + /** + * The given column name may not be equal to any of the existing column names if we were in + * case-insensitive context. Normalize the given column name to the real one so that we don't + * need to care about case sensitivity afterwards. + */ + private def normalize(df: DataFrame, columnName: String, columnType: String): String = { +val validColumnNames = df.logicalPlan.output.map(_.name) +validColumnNames.find(sparkSession.sessionState.analyzer.resolver(_, columnName)) + .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " + +s"existing columns (${validColumnNames.mkString(", ")})")) + } + + /** + * Parse spark.sqlstreaming.trigger.seconds to Trigger + */ + private def parseTrigger(): Trigger = { +val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger) +Trigger.ProcessingTime(trigger, TimeUnit.MICROSECONDS) --- End diff -- Yeah, I will change it to milliseconds. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r238336135 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils +import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.Utils + +/** + * The basic RunnableCommand for SQLStreaming, using Command.run to start a streaming query. + * + * @param sparkSession + * @param extraOptions + * @param partitionColumnNames + * @param child + */ +case class SQLStreamingSink(sparkSession: SparkSession, +table: CatalogTable, +child: LogicalPlan) + extends RunnableCommand { + + private val sqlConf = sparkSession.sqlContext.conf + + /** + * The given column name may not be equal to any of the existing column names if we were in + * case-insensitive context. Normalize the given column name to the real one so that we don't + * need to care about case sensitivity afterwards. + */ + private def normalize(df: DataFrame, columnName: String, columnType: String): String = { +val validColumnNames = df.logicalPlan.output.map(_.name) +validColumnNames.find(sparkSession.sessionState.analyzer.resolver(_, columnName)) + .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " + +s"existing columns (${validColumnNames.mkString(", ")})")) + } + + /** + * Parse spark.sqlstreaming.trigger.seconds to Trigger + */ + private def parseTrigger(): Trigger = { +val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger) +Trigger.ProcessingTime(trigger, TimeUnit.MICROSECONDS) --- End diff -- do we require micro seconds unit here? milliseconds/seconds will do i guess.the lowest latency supported by structured stream is 100 ms. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r238329995 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -631,6 +631,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sqlstreaming.watermark.enable") +.doc("Whether use watermark in sqlstreaming.") +.booleanConf +.createWithDefault(false) + + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") +.doc("The output mode used in sqlstreaming") +.stringConf +.createWithDefault("append") + + val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger") --- End diff -- we have so many configurations, i think in thrift server scenarios where user can open multiple sessions and run streaming query based on different query context. each query will be requiring its own context of trigger intervals,water marking,windowing. can you elaborate a bit how we address these scenarios. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r237721103 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton { + private val catalog = spark.sessionState.catalog + + test("CTAS: create data source stream table") { +withTempPath { dir => + withTable("t") { +sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS ( + |PATH = '${dir.toURI}', + |location = '${dir.toURI}', + |isStreaming = 'true') + |AS SELECT 1 AS a, 2 AS b, 3 AS c + """.stripMargin) --- End diff -- In this place, child is a streaming logicalPlan. If not, it will throw Exceptions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r237372804 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton { + private val catalog = spark.sessionState.catalog + + test("CTAS: create data source stream table") { +withTempPath { dir => + withTable("t") { +sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS ( + |PATH = '${dir.toURI}', + |location = '${dir.toURI}', + |isStreaming = 'true') + |AS SELECT 1 AS a, 2 AS b, 3 AS c + """.stripMargin) --- End diff -- At https://github.com/apache/spark/pull/22575/files#diff-fa4547f0c6dd7810576cd4262a2dfb46R78 the `child` logicalPlan is not streaming logicalPlan? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r226853809 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -631,6 +631,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") --- End diff -- Then, may you provide a more appropriate name? These configurations should be used only in SQLStreaming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r226853804 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton { + private val catalog = spark.sessionState.catalog + + test("CTAS: create data source stream table") { +withTempPath { dir => + withTable("t") { +sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS ( + |PATH = '${dir.toURI}', +|location = '${dir.toURI}', --- End diff -- Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r226853724 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -63,7 +63,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister providerName: String, parameters: Map[String, String]): (String, StructType) = { validateStreamOptions(parameters) -require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one") +if(schema.isDefined) { --- End diff -- KafkaStreamSourceProvider is an expression for SQLStreaming. When creating DataSource based on Kafka Streaming Table, schema will be passed, thus KafkaSourceProvider should be compatible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r225998489 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton { + private val catalog = spark.sessionState.catalog + + test("CTAS: create data source stream table") { +withTempPath { dir => + withTable("t") { +sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS ( + |PATH = '${dir.toURI}', +|location = '${dir.toURI}', --- End diff -- nit: indent here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r225997731 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -631,6 +631,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") --- End diff -- Not sure `spark.sqlstreaming` is the right way of config name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r225992780 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -63,7 +63,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister providerName: String, parameters: Map[String, String]): (String, StructType) = { validateStreamOptions(parameters) -require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one") +if(schema.isDefined) { --- End diff -- Why need this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org