[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208594904 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow --- End diff -- `getKey` was basically UnsafeProjection in statefulOperator so didn't necessarily require UnsafeRow. I just followed the usage to make it less restrict, but we know, in reality `row` will be always UnsafeRow. So OK to fix if it provides consistency. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208592556 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -871,6 +871,16 @@ object SQLConf { .intConf .createWithDefault(2) + val STREAMING_AGGREGATION_STATE_FORMAT_VERSION = +buildConf("spark.sql.streaming.aggregation.stateFormatVersion") + .internal() + .doc("State format version used by streaming aggregation operations in a streaming query. " + +"State between versions are tend to be incompatible, so state format version shouldn't " + +"be modified after running.") + .intConf + .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") + .createWithDefault(2) --- End diff -- Nice suggestion. Will add the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208591232 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** --- End diff -- Maybe I misinterpret your suggestion before. I thought you are suggesting move to state package class. Will place it to separate file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 My series of patches could be possible based on two metrics: `size for memory usage of latest version` and `size for total memory usage of loaded versions`. SPARK-24717 (#21700) enabled the possibility to tune the overall state memory usage, and if end users have either one metric they couldn't tune it. IMHO, I'm not 100% sure how much this patch provides confusion to the end users, but if the intention of `memoryUsedBytes` is for measuring overall state partition, what about replacing `memoryUsedBytes` as `size for total memory usage of loaded versions`, but also placing `size for memory usage of latest version` to custom metric? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 Also added javadoc as well. Most of contents are from StateStore but I didn't copy the note to implementation for state store since it is duplicated. Please let me know if we want to add content for the parameter target state store as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 @tdas Done running perf. test with 4 more tests: > BenchmarkMovingAggregationsListenerKeyMuchBigger rate: 16 version | input rows per second | processed rows per second | total state rows | used bytes of current state version | | | | | latest master (c9914cf) | 159877.232 | 149537.817 | 65000 | 133511303 | | patch (on top of c9914cf) | 160049.118 | 152497.945 | 65000 | 73236351 | state size: 54.854 % (reduces 45.15%) > BenchmarkMovingAggregationsListenerManyKeys rate: 12 version | input rows per second | processed rows per second | total state rows | used bytes of current state version | | | | | latest master (c9914cf) | 120266.810 | 107482.042 | 65000 | 38433719 | | patch (on top of c9914cf) | 119865.855 | 109268.772 | 65000 | 24900343 | state size: 64.787% (reduces 35.21%) > BenchmarkMovingAggregationsListenerManyValues rate: 25000 version | input rows per second | processed rows per second | total state rows | used bytes of current state version | | | | | latest master (c9914cf) | 25009.236 | 21216.126 | 9 | 77161711 (857.352 per row) | | patch (on top of c9914cf) | 25060.635 | 20774.500 | 99495 | 78230335 (786.274 per row) | state size: 91.709 % (reduces 8.29 %) > BenchmarkMovingAggregationsListenerValueMuchBigger rate: 85000 version | input rows per second | processed rows per second | total state rows | used bytes of current state version | | | | | latest master (c9914cf) | 85310.774 | 79091.271 | 1000 | 1324255 | | patch (on top of c9914cf) | 84791.761 | 79755.905 | 1000 | 1282687 | state size: 96.861 % (reduces 3.14 %) I don't find any outstanding perf. hit, and expected state size reduction is shown from all over the cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 Thanks @zsxwing for merging and thanks all for reviewing! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21622 Thanks @HyukjinKwon for merging, and thanks all for reviewing! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 @tdas Kindly reminder. I'll take the doc step when you say it's OK to go. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 @tdas Kindly reminder. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21622 @HyukjinKwon Could you take this forward given that the patch is minor and CI test is passed? Thanks in advance! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r207881161 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -513,6 +515,125 @@ class StreamSuite extends StreamTest { } } + test("explain-continuous") { +val inputData = ContinuousMemoryStream[Int] +val df = inputData.toDS().map(_ * 2).filter(_ > 5) + +// Test `df.explain` +val explain = ExplainCommand(df.queryExecution.logical, extended = false) +val explainString = + spark.sessionState +.executePlan(explain) +.executedPlan +.executeCollect() +.map(_.getString(0)) +.mkString("\n") +assert(explainString.contains("Filter")) +assert(explainString.contains("MapElements")) +assert(!explainString.contains("LocalTableScan")) + +// Test StreamingQuery.display +val q = df.writeStream.queryName("memory_continuous_explain") + .outputMode(OutputMode.Update()).format("memory") + .trigger(Trigger.Continuous("1 seconds")) + .start() + .asInstanceOf[StreamingQueryWrapper] + .streamingQuery +try { + // in continuous mode, the query will be run even there's no data + // sleep a bit to ensure initialization + eventually(timeout(2.seconds), interval(100.milliseconds)) { +assert(q.lastExecution != null) + } + + val explainWithoutExtended = q.explainInternal(false) + + // `extended = false` only displays the physical plan. + assert("Streaming RelationV2 ContinuousMemoryStream".r +.findAllMatchIn(explainWithoutExtended).size === 0) + assert("ScanV2 ContinuousMemoryStream".r +.findAllMatchIn(explainWithoutExtended).size === 1) + + val explainWithExtended = q.explainInternal(true) + // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical + // plan. + assert("Streaming RelationV2 ContinuousMemoryStream".r +.findAllMatchIn(explainWithExtended).size === 3) + assert("ScanV2 ContinuousMemoryStream".r +.findAllMatchIn(explainWithExtended).size === 1) +} finally { + q.stop() +} + } + + test("codegen-microbatch") { +import org.apache.spark.sql.execution.debug._ + +val inputData = MemoryStream[Int] +val df = inputData.toDS().map(_ * 2).filter(_ > 5) + +// Test StreamingQuery.codegen +val q = df.writeStream.queryName("memory_microbatch_codegen") + .outputMode(OutputMode.Update) + .format("memory") + .trigger(Trigger.ProcessingTime("1 seconds")) + .start() + +try { + assert("No physical plan. Waiting for data." === codegenString(q)) + assert(codegenStringSeq(q).isEmpty) + + inputData.addData(1, 2, 3, 4, 5) + q.processAllAvailable() + + val codegenStr = codegenString(q) --- End diff -- Nice finding. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21622 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 @zsxwing Addressed review comments. Please take a look again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 @zsxwing OK I also wonder `debug` is applicable for streaming but wanted to fill the gap earlier. Will remove `debug` for streaming. Will update shortly. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 @tdas I found the spare time to run performance tests though I've run only one app for now... I couldn't run the tests concurrently. Please let me know if you are not confident with the results from one app: I'll find more time to go with all test cases. Hope this number could give confident to accept the patch. > Machine info. MBP 15-inch Mid 2015 * i7 2.5Ghz (4 core) * 16GB 1600 Mhz DDR3 * SSD 512G > Test information * base commit : c9914cf (latest master branch) * patch internally rebased with base commit before testing * spark-submit options: master local[3] --driver-memory 6g * I don't run perf. test with all cores and memory: I left some spare resource for OS and background apps. > Performance test code https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListener.scala Please note that there're 4 more apps (big key size, big value size, many key columns, many value columns) in same repository. > Test result Both of version didn't catch up rate per seconds 20, but since processed rows per second were around 188000 I felt I don't need to adjust rate per seconds more tightly (like 185000, 19, etc...). The numbers for input rows per seconds and processed rows per second are calculated by taking average of 3 batches (38, 39, 40 respectively). The numbers regarding state are picked when total state rows went to 6. version | input rows per second | processed rows per second | total state rows | used bytes of current state version | | | | | latest master (c9914cf) | 200492.065 | 10.316 | 6 | 17,755,895 | | patch (on top of c9914cf) | 199242.598 | 188160.833 | 6 | 14,687,543 | So while two processed rows per seconds didn't show outstanding difference (under 1%), the patch reduced memory usage of state (for latest version) by 17.29 %. One thing to note is, in performance test, state is saved to the local SSD. It may give (small? trivial?) performance benefit on the patch when we set remote checkpoint directory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r207096556 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) + } + + override def deserializeOffset(json: String): Offset = { +TextSocketOffset(Seria
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r207096219 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) --- End diff -- Yeah, agreed. I'm OK if same implications are used in other places. --- ---
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 Retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21622 Test failure looks unrelated. Jenkins, retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 @tdas I've applied your review comments except documentation. (Will add WIP to the PR's title if it sounds clearer) There may be something you can add the review comments and so I'd like to work on documentation when the patch is in a shape to "ready to merge". Otherwise I'll try to find time/resource and run the performance tests again, but it might take couple of days or more to get it. Will update once I run and get new numbers. During the wait please continuous reviewing the code. It would help running the tests with latest updated patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206790470 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.concurrent.ConcurrentHashMap + +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +class MemoryStateStore extends StateStore() { + import scala.collection.JavaConverters._ + private val map = new ConcurrentHashMap[UnsafeRow, UnsafeRow] + + override def iterator(): Iterator[UnsafeRowPair] = { +map.entrySet.iterator.asScala.map { case e => new UnsafeRowPair(e.getKey, e.getValue) } + } + + override def get(key: UnsafeRow): UnsafeRow = map.get(key) + + override def put(key: UnsafeRow, newValue: UnsafeRow): Unit = { +map.put(key.copy(), newValue.copy()) + } + + override def remove(key: UnsafeRow): Unit = { +map.remove(key) --- End diff -- Yeah missed that. Will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206791736 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] --- End diff -- It would be going to be `getStateValueSchema` btw, once we change return type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206784385 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -201,33 +200,37 @@ object WatermarkSupport { case class StateStoreRestoreExec( keyExpressions: Seq[Attribute], stateInfo: Option[StatefulOperatorStateInfo], +stateFormatVersion: Int, child: SparkPlan) extends UnaryExecNode with StateStoreReader { + private[sql] val stateManager = StreamingAggregationStateManager.createStateManager( +keyExpressions, child.output, stateFormatVersion) + override protected def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithStateStore( getStateInfo, keyExpressions.toStructType, - child.output.toStructType, + stateManager.getValueExpressions.toStructType, --- End diff -- Right. Sounds like `StructType` is preferred than `Seq[Attribute]` in this case. Will apply. Maybe dumb question from newbie on Spark SQL (still trying to get familiar with) : I guess we prefer StructType in this case cause it's less restrictive and also get rid of headache of dealing with fields reference. Do I understand correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206791325 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulOperatorsHelperSuite.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.streaming.StatefulOperatorsHelper.StreamingAggregationStateManager +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class StatefulOperatorsHelperSuite extends StreamTest { + import TestMaterial._ + + test("StateManager v1 - get, put, iter") { +val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 1) + +// in V1, input row is stored as value +testGetPutIterOnStateManager(stateManager, OUTPUT_ATTRIBUTES, TEST_ROW, TEST_KEY_ROW, TEST_ROW) + } + + // StateManagerImplV2 + test("StateManager v2 - get, put, iter") { +val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 2) + +// in V2, row for values itself (excluding keys from input row) is stored as value +// so that stored value doesn't have key part, but state manager V2 will provide same output +// as V1 when getting row for key +testGetPutIterOnStateManager(stateManager, VALUES_ATTRIBUTES, TEST_ROW, TEST_KEY_ROW, + TEST_VALUE_ROW) + } + + private def newStateManager( + keysAttributes: Seq[Attribute], + outputAttributes: Seq[Attribute], --- End diff -- Yes, and actually, for StateManager, `input row attributes` and `output attributes` are same according to how StateStore*Exec work, so I picked either one. I'm happy to rename if `inputRowAttributes` is clearer to give insight which schema should be passed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206786014 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala --- @@ -53,7 +53,35 @@ class StreamingAggregationSuite extends StateStoreMetricsTest import testImplicits._ - test("simple count, update mode") { + def executeFuncWithStateVersionSQLConf( + stateVersion: Int, + confPairs: Seq[(String, String)], + func: => Any): Unit = { +withSQLConf(confPairs ++ + Seq(SQLConf.STREAMING_AGGREGATION_STATE_FORMAT_VERSION.key -> stateVersion.toString): _*) { + func +} + } + + def testWithAllStateVersions(name: String, confPairs: (String, String)*) --- End diff -- Actually it's basically from wondering of how `withSQLConf` works. Does `withSQLConf` handle nested `withSQLConf` properly? If then we don't need to add `confPairs` param at all, and if not I guess we might still want to add this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206780521 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] +def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow +def get(store: StateStore, key: UnsafeRow): UnsafeRow --- End diff -- I might think naively about this: I thought its interface is similar to StateStore so wondered we need to add docs, but I think I was wrong. Will add docs. Thanks for the insightful input! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206779898 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] +def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow --- End diff -- > In fact, if there exists a StateManager to manage all the state in the store, then ALL operations to add/remove state should go through the manager and store should not be accessed directly. Totally agreed that it should be better design of StateManager. I don't remember I tried to do before, so let me try applying your suggestion and see there's anything blocks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206781209 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] +def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow +def get(store: StateStore, key: UnsafeRow): UnsafeRow +def put(store: StateStore, row: UnsafeRow): Unit + } + + object StreamingAggregationStateManager extends Logging { +def createStateManager( +keyExpressions: Seq[Attribute], +childOutput: Seq[Attribute], +stateFormatVersion: Int): StreamingAggregationStateManager = { + stateFormatVersion match { +case 1 => new StreamingAggregationStateManagerImplV1(keyExpressions, childOutput) +case 2 => new StreamingAggregationStateManagerImplV2(keyExpressions, childOutput) +case _ => throw new IllegalArgumentException(s"Version $stateFormatVersion is invalid") + } +} + } + + abstract class StreamingAggregationStateManagerBaseImpl( + protected val keyExpressions: Seq[Attribute], + protected val childOutput: Seq[Attribute]) extends StreamingAggregationStateManager { + +@transient protected lazy val keyProjector = + GenerateUnsafeProjection.generate(keyExpressions, childOutput) + +def extractKey(row: InternalRow): UnsafeRow = keyProjector(row) + } + + class StreamingAggregationStateManagerImplV1( + keyExpressions: Seq[Attribute], + childOutput: Seq[Attribute]) +extends StreamingAggregationStateManagerBaseImpl(keyExpressions, childOutput) { + +override def getValueExpressions: Seq[Attribute] = { + childOutput +} + +override def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow = { + rowPair.value +} + +override def get(store: StateStore, key: UnsafeRow): UnsafeRow = { + store.get(key) +} + +override def put(store: StateStore, row: UnsafeRow): Unit = { + store.put(extractKey(row), row) +} + } + + class StreamingAggregationStateManagerImplV2( --- End diff -- Great point. I might be in a rush to show its shape. Will add doc for state formats in both V1 and V2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206790358 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.concurrent.ConcurrentHashMap + +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +class MemoryStateStore extends StateStore() { --- End diff -- It was actually just extracted from other place to reuse among the places, but I agree it's better to document once it is kind of public API for testing. Will add. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206778127 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { --- End diff -- Will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206780754 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] +def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow +def get(store: StateStore, key: UnsafeRow): UnsafeRow +def put(store: StateStore, row: UnsafeRow): Unit + } + + object StreamingAggregationStateManager extends Logging { +def createStateManager( +keyExpressions: Seq[Attribute], +childOutput: Seq[Attribute], --- End diff -- Sounds much better and you're right about concept of `child`. Will rename. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206778355 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow --- End diff -- Renaming sounds better. Will rename, and will also add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206790505 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulOperatorsHelperSuite.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.streaming.StatefulOperatorsHelper.StreamingAggregationStateManager +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class StatefulOperatorsHelperSuite extends StreamTest { --- End diff -- Will rename. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206788634 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulOperatorsHelperSuite.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.streaming.StatefulOperatorsHelper.StreamingAggregationStateManager +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class StatefulOperatorsHelperSuite extends StreamTest { + import TestMaterial._ + + test("StateManager v1 - get, put, iter") { +val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 1) + +// in V1, input row is stored as value +testGetPutIterOnStateManager(stateManager, OUTPUT_ATTRIBUTES, TEST_ROW, TEST_KEY_ROW, TEST_ROW) + } + + // StateManagerImplV2 + test("StateManager v2 - get, put, iter") { +val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 2) + +// in V2, row for values itself (excluding keys from input row) is stored as value +// so that stored value doesn't have key part, but state manager V2 will provide same output +// as V1 when getting row for key +testGetPutIterOnStateManager(stateManager, VALUES_ATTRIBUTES, TEST_ROW, TEST_KEY_ROW, + TEST_VALUE_ROW) + } + + private def newStateManager( + keysAttributes: Seq[Attribute], + outputAttributes: Seq[Attribute], + version: Int): StreamingAggregationStateManager = { +StreamingAggregationStateManager.createStateManager(keysAttributes, outputAttributes, version) + } + + private def testGetPutIterOnStateManager( + stateManager: StreamingAggregationStateManager, + expectedValueExpressions: Seq[Attribute], + inputRow: UnsafeRow, + expectedStateKey: UnsafeRow, + expectedStateValue: UnsafeRow): Unit = { + +assert(stateManager.getValueExpressions === expectedValueExpressions) + +val memoryStateStore = new MemoryStateStore() +stateManager.put(memoryStateStore, inputRow) + +assert(memoryStateStore.iterator().size === 1) + +val keyRow = stateManager.extractKey(inputRow) +assert(keyRow === expectedStateKey) + +// iterate state store and verify whether expected format of key and value are stored +val pair = memoryStateStore.iterator().next() +assert(pair.key === keyRow) +assert(pair.value === expectedStateValue) +assert(stateManager.restoreOriginRow(pair) === inputRow) + +// verify the stored value once again via get +assert(memoryStateStore.get(keyRow) === expectedStateValue) + +// state manager should return row which is same as input row regardless of format version +assert(inputRow === stateManager.get(memoryStateStore, keyRow)) + } + +} + +object TestMaterial { + val KEYS: Seq[String] = Seq("key1", "key2") --- End diff -- I intended to use them like `static final` fields: so treated them as constants and follow the style guide for constants - `Constants should be all uppercase letters and be put in a companion object.` That's why I extracted them into separate object (though it is not a companion object due to naming better) as well as naming as uppercases. But that is not intentional and definitely bad if it requires us to jump back and forth. I'm going to place them as earliest part of class for now
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206778971 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] --- End diff -- It is to define the schema of value from / to state. For V1 it would be same to input schema and for V2 it would be `input schema - key schema`. Would `getStateValueExpressions` be OK for us? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206778077 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { --- End diff -- Yeah right. I found your PR useful to get an idea of how to model the classes because it was dealing with similar requirement, but didn't indicate the reason why you place it into StatefulOperatorsHelper. I'll move them to the state package. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206775357 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -871,6 +871,16 @@ object SQLConf { .intConf .createWithDefault(2) + val STREAMING_AGGREGATION_STATE_FORMAT_VERSION = + buildConf("spark.sql.streaming.streamingAggregation.stateFormatVersion") --- End diff -- Ah OK. Sounds better. Will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206776398 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -871,6 +871,16 @@ object SQLConf { .intConf .createWithDefault(2) + val STREAMING_AGGREGATION_STATE_FORMAT_VERSION = + buildConf("spark.sql.streaming.streamingAggregation.stateFormatVersion") + .internal() + .doc("State format version used by streaming aggregation operations triggered " + +"explicitly or implicitly via agg() in a streaming query. State between versions are " + --- End diff -- I was to explain that the option only applies to the operators which go through StateStoreRestoreExec / StateStoreSaveExec (so max("field1") as well as agg("field1" -> "max")), but now I feel it just gives confusion and I don't think end users need to understand details behind of config. Will remove the part `explicitly or implicitly via agg()`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21622#discussion_r206766835 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,6 +42,23 @@ class MetricsReporter( registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + private val timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + registerGauge("eventTime-watermark", +progress => convertStringDateToMillis(progress.eventTime.get("watermark")), 0L) + + registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 0L) + registerGauge("states-usedBytes", _.stateOperators.map(_.memoryUsedBytes).sum, 0L) + --- End diff -- Thanks for the input! I'll keep the patch as it is. Could you suggest approach to extend the maintained metrics? I would like to expand more, and newer things might be coming from custom metrics (like from source and sink) so might be worth to have extension point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 @zsxwing Kindly reminder. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21622 Pinging @tdas and @zsxwing for reviewing. It's small one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 @tdas Thanks for the review! Addressed review comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r206755595 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -48,12 +49,24 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = -new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) +new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics) private[sql] def jsonValue: JValue = { -("numRowsTotal" -> JInt(numRowsTotal)) ~ -("numRowsUpdated" -> JInt(numRowsUpdated)) ~ -("memoryUsedBytes" -> JInt(memoryUsedBytes)) +def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { + if (map.isEmpty) return JNothing + val keys = map.keySet.asScala.toSeq.sorted + keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _) +} + +val jsonVal = ("numRowsTotal" -> JInt(numRowsTotal)) ~ + ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ + ("memoryUsedBytes" -> JInt(memoryUsedBytes)) + +if (!customMetrics.isEmpty) { --- End diff -- Actually didn't notice that. Thanks for letting me know! Will simplify. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r206755538 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -48,12 +49,24 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = -new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) +new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics) private[sql] def jsonValue: JValue = { -("numRowsTotal" -> JInt(numRowsTotal)) ~ -("numRowsUpdated" -> JInt(numRowsUpdated)) ~ -("memoryUsedBytes" -> JInt(memoryUsedBytes)) +def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { --- End diff -- I've first trying to leverage `StreamingQueryProgress.safeMapToJValue` but can't find proper place to move to be co-used, so I simply copied it. Will simplify the code block and inline. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r206754359 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -81,10 +81,10 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato } object SQLMetrics { - private val SUM_METRIC = "sum" - private val SIZE_METRIC = "size" - private val TIMING_METRIC = "timing" - private val AVERAGE_METRIC = "average" + val SUM_METRIC = "sum" + val SIZE_METRIC = "size" + val TIMING_METRIC = "timing" + val AVERAGE_METRIC = "average" --- End diff -- It was to handle exception case while aggregating custom metrics, especially filtering out average since it is not aggregated correctly. Since we remove custom average metric, we no longer need to filter out them. Will revert the change as well as relevant logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21357 @tdas The rationalization of this patch is to group functions which deal with delta and snapshot files into one so that the difference between delta file and snapshot file will be clearly shown (actually no difference other than allowing TOMBSTONE value in delta file) as well as easy to document about these files. It's also easier to add tests for delta / snapshot files. Indeed my underlying rationalization is to make the class easier to understand from newcomers (actually I found it helpful to group them logically to understand the code better), but the file has been getting enough love from various contributors so may not worth to put effort to make it easiler. I respect the rule of Spark project, and happy to close if we don't feel benefitial to go on. Let's close it and revisit some other one feels benefitial. Thanks for providing your voice on this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStorePr...
Github user HeartSaVioR closed the pull request at: https://github.com/apache/spark/pull/21357 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 @tdas Thanks for the detailed review! I'll follow up your comments and update the patch. Btw, If my memory is right, I tried out increasing "rate" while benchmarking, but rate source itself became bottleneck. Not sure c5.xlarge is not enough or I might be missed regarding option(s). Sadly I can't run benchmark often because I don't have any dedicated machine. I would avoid running benchmark in non-dedicated machine for seeing computational limit, so paying to AWS to get dedicated instance/machine. I'll try out increasing "rate" once more soon, but please guide me if you have any suggestions to the benchmark code or approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206392487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,8 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: Option[JValue] = None) extends Serializable { --- End diff -- @HyukjinKwon Nice finding. I missed it while reviewing. Btw, FYI, in #21469 I'm adding new field with default value in StateOperatorProgress, like `val customMetrics: ju.Map[String, JLong] = new ju.HashMap()` and MiMa doesn't complain. https://github.com/apache/spark/pull/21469/files#diff-e09301244e3c6b1a69eda6c4bd2ddb52 @arunmahadevan Maybe `ju.Map[String, JLong]` will also work here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206388466 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("continuous data") { +serverThread = new ServerThread() +serverThread.start() + +val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", +"port" -> serverThread.port.toString).asJava)) +reader.setStartOffset(Optional.empty()) +val tasks = reader.planRowInputPartitions() +assert(tasks.size == 2) + +val numRecords = 10 +val data = scala.collection.mutable.ListBuffer[Int]() +val offsets = scala.collection.mutable.ListBuffer[Int]() +import org.scalatest.time.SpanSugar._ +failAfter(5 seconds) { + // inject rows, read and check the data and offsets --- End diff -- Maybe adding more line comments in code block would help understanding the test code easier, like intentionally committing in the middle of range, etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206386593 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) + } + + override def deserializeOffset(json: String): Offset = { +TextSocketOffset(Seria
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206385714 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) + } + + override def deserializeOffset(json: String): Offset = { +TextSocketOffset(Seria
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206357959 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { --- End diff -- While the values are good to be placed with companion object, it looks like redundant to have them in both micro-batch and continuous, so might be better to have common object to place this. We may need to find more spots to deduplicate between micro-batch and continuous for socket. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206371107 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) --- End diff -- I'd rather make it safer via either one of two approaches: 1. assert
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206388213 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("continuous data") { +serverThread = new ServerThread() +serverThread.start() + +val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", +"port" -> serverThread.port.toString).asJava)) +reader.setStartOffset(Optional.empty()) +val tasks = reader.planRowInputPartitions() +assert(tasks.size == 2) + +val numRecords = 10 +val data = scala.collection.mutable.ListBuffer[Int]() +val offsets = scala.collection.mutable.ListBuffer[Int]() +import org.scalatest.time.SpanSugar._ +failAfter(5 seconds) { + // inject rows, read and check the data and offsets + for (i <- 0 until numRecords) { +serverThread.enqueue(i.toString) + } + tasks.asScala.foreach { +case t: TextSocketContinuousInputPartition => + val r = t.createPartitionReader().asInstanceOf[TextSocketContinuousInputPartitionReader] + for (i <- 0 until numRecords / 2) { +r.next() + offsets.append(r.getOffset().asInstanceOf[ContinuousRecordPartitionOffset].offset) +data.append(r.get().getString(0).toInt) +if (i == 2) { + commitOffset(t.partitionId, i + 1) +} + } + assert(offsets.toSeq == Range.inclusive(1, 5)) + assert(data.toSeq == Range(t.partitionId, 10, 2)) + offsets.clear() + data.clear() +case _ => throw new IllegalStateException("Unexpected task type") + } + assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == List(3, 3)) + reader.commit(TextSocketOffset(List(5, 5))) + assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == List(5, 5)) +} + +def commitOffset(partition: Int, offset: Int): Unit = { + val offsetsToCommit = reader.getStartOffset.asInstanceOf[TextSocketOffset] +.offsets.updated(partition, offset) + reader.commit(TextSocketOffset(offsetsToCommit)) + assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == offsetsToCommit) +} + } + + test("continuous data - invalid commit") { +serverThread = new ServerThread() +serverThread.start() + +val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", +"port" -> serverThread.port.toString).asJava)) +reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5 +// ok to commit same offset +reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5 +assertThrows[IllegalStateException] { + reader.commit(TextSocketOffset(List(6, 6))) +} + } + + test("continuous data with timestamp") { +serverThread = new ServerThread() +serverThread.start() + +val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", +"includeTimestamp" -> "true", +"port" -> serverThread.port.toString).asJava)) +reader.setStartOffset(Optional.empty()) +val tasks = reader.planRowInputPartitions() +assert(tasks.size == 2) + +val numRecords = 4 +import org.apache.spark.sql.Row --- End diff -- Looks like unused import --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206384495 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) + } + + override def deserializeOffset(json: String): Offset = { +TextSocketOffset(Seria
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21721 Looks like we would we also need to add SourceProgress and SinkProgress into mima exclude list. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21199 @arunmahadevan Thanks for rebasing. I'll take a look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206349942 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; + +/** + * A mix in interface for {@link DataSourceWriter}. Data source writers can implement this --- End diff -- OK got your intention. I think it makes sense. I'm OK with all three options and personally prefer 1 or 2 if the intention is to mix-in, but let's see committers' feedback on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20859: [SPARK-23702][SS] Forbid watermarks on both sides of sta...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/20859 How would we like to handle this patch? I guess we add feature on handling multiple watermarks in #21701 so based on the direction this patch might be going to be abandoned. IMHO I'm not 100% sure we have clear use cases for defining multiple watermarks for one source. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18410: [SPARK-20971][SS] purge metadata log in FileStreamSource
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/18410 Looks like this PR is not needed, since `CompactibleFileStreamLog` also takes care of metadata log. https://github.com/apache/spark/commits/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21199 @arunmahadevan Sorry I forgot to review this so far. Could you fix merge conflicts? I'd pull the code to the local and review since the code diff is not small. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/20675 Looks like the patch is outdated, and when continuous query supports shuffled stateful operators, implementing task level retry is not that trivial. To get correct result of aggregation, when one of task fails at epoch N, all the tasks and states should be restored to epoch N. I definitely agree that it would be ideal to have stable task level retry, just wondering this patch would work with follow-up tasks for continuous mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206010782 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; + +/** + * A mix in interface for {@link DataSourceWriter}. Data source writers can implement this --- End diff -- If we intend creating a new interface as mix-in, we may not need to create individual interfaces for each DataSourceReader and DataSourceWriter. We could have only one interface and let DataSourceReader and DataSourceWriter add such mix-in interface. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206010370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -143,18 +150,50 @@ trait ProgressReporter extends Logging { } logDebug(s"Execution stats: $executionStats") +// extracts custom metrics from readers and writers +def extractMetrics(getMetrics: () => CustomMetrics, + onInvalidMetrics: (Exception) => Unit): Option[JValue] = { + val metrics = getMetrics() + if (metrics != null) { --- End diff -- We could de-indent via `return early`: it would be simpler cause there's nothing but returning `None` if metrics is null, and style guide has such case as one of exceptional case where return statement is preferred. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206009928 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -143,18 +150,50 @@ trait ProgressReporter extends Logging { } logDebug(s"Execution stats: $executionStats") +// extracts custom metrics from readers and writers +def extractMetrics(getMetrics: () => CustomMetrics, + onInvalidMetrics: (Exception) => Unit): Option[JValue] = { + val metrics = getMetrics() + if (metrics != null) { +try { + Some(parse(metrics.json())) +} catch { + case ex: Exception => onInvalidMetrics(ex) --- End diff -- https://github.com/databricks/scala-style-guide#exception-handling-try-vs-try According to the guide, this line needs to be replaced with `case NonFatal(e) =>`, and I'd place `onInvalidMetrics` and `None` to same indentation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 Thanks @zsxwing for reviewing! Addressed review comments. Please take a look at again. Thanks in advance! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r205961782 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala --- @@ -116,6 +175,30 @@ package object debug { } } + implicit class DebugStreamQuery(query: StreamingQuery) extends Logging { +def debug(): Unit = { + unwrapStreamingQueryWrapper(query) match { +case w: StreamExecution => + if (w.lastExecution == null) { +debugPrint("No physical plan. Waiting for data.") + } else { +val executedPlan = w.lastExecution.executedPlan +if (executedPlan.find(_.isInstanceOf[WriteToContinuousDataSourceExec]).isDefined) { + debugPrint("Debug on continuous mode is not supported.") --- End diff -- `debug()` will run query shortly to collect information: for micro-batch it will be 1 batch but in continuous mode it will never be finished. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r205961840 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala --- @@ -88,23 +100,70 @@ package object debug { } } + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan into one String + * + * @param query the streaming query for codegen + * @return single String containing all WholeStageCodegen subtrees and corresponding codegen + */ + def codegenString(query: StreamingQuery): String = { +val msg = unwrapStreamingQueryWrapper(query) match { + case w: StreamExecution => +if (w.lastExecution != null) { + codegenString(w.lastExecution.executedPlan) +} else { + "No physical plan. Waiting for data." +} + + case _ => "Only supported for StreamExecution." +} +msg + } + + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan + * + * @param query the streaming query for codegen + * @return Sequence of WholeStageCodegen subtrees and corresponding codegen + */ + def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = { +val planAndCodes = unwrapStreamingQueryWrapper(query) match { + case w: StreamExecution if w.lastExecution != null => +codegenStringSeq(w.lastExecution.executedPlan) + + case _ => Seq.empty +} +planAndCodes + } + + /* Helper function to reuse duplicated code block between batch and streaming. */ --- End diff -- Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r205961761 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -829,6 +955,18 @@ class StreamSuite extends StreamTest { assert(query.exception.isEmpty) } } + + private def waitForLastExecution(q: StreamExecution): Unit = { --- End diff -- Thanks for letting me know! Addressed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r205961813 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala --- @@ -88,23 +100,70 @@ package object debug { } } + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan into one String + * + * @param query the streaming query for codegen + * @return single String containing all WholeStageCodegen subtrees and corresponding codegen + */ + def codegenString(query: StreamingQuery): String = { +val msg = unwrapStreamingQueryWrapper(query) match { + case w: StreamExecution => +if (w.lastExecution != null) { + codegenString(w.lastExecution.executedPlan) +} else { + "No physical plan. Waiting for data." +} + + case _ => "Only supported for StreamExecution." +} +msg + } + + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan + * + * @param query the streaming query for codegen + * @return Sequence of WholeStageCodegen subtrees and corresponding codegen + */ + def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = { +val planAndCodes = unwrapStreamingQueryWrapper(query) match { + case w: StreamExecution if w.lastExecution != null => +codegenStringSeq(w.lastExecution.executedPlan) + + case _ => Seq.empty +} +planAndCodes + } + + /* Helper function to reuse duplicated code block between batch and streaming. */ + private def debugInternal(plan: SparkPlan): Unit = { +val visited = new collection.mutable.HashSet[TreeNodeRef]() +val debugPlan = plan transform { + case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) => +visited += new TreeNodeRef(s) +DebugExec(s) +} +debugPrint(s"Results returned: ${debugPlan.execute().count()}") +debugPlan.foreach { + case d: DebugExec => d.dumpStats() + case _ => +} + } + + private def unwrapStreamingQueryWrapper(query: StreamingQuery): StreamingQuery = { +query match { + case wrapper: StreamingQueryWrapper => wrapper.streamingQuery + case _ => query --- End diff -- OK. I can apply your suggestion but not 100% sure about intention: propagate exception to the caller side (user code), or catch exception inside debug package and handle it. I'll apply latter case but please let me know when you intended former case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r205961764 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -513,6 +514,131 @@ class StreamSuite extends StreamTest { } } + test("explain-continuous") { +val inputData = ContinuousMemoryStream[Int] +val df = inputData.toDS().map(_ * 2).filter(_ > 5) + +// Test `df.explain` +val explain = ExplainCommand(df.queryExecution.logical, extended = false) +val explainString = + spark.sessionState +.executePlan(explain) +.executedPlan +.executeCollect() +.map(_.getString(0)) +.mkString("\n") +assert(explainString.contains("Filter")) +assert(explainString.contains("MapElements")) +assert(!explainString.contains("LocalTableScan")) + +// Test StreamingQuery.display +val q = df.writeStream.queryName("memory_continuous_explain") + .outputMode(OutputMode.Update()).format("memory") + .trigger(Trigger.Continuous("1 seconds")) + .start() + .asInstanceOf[StreamingQueryWrapper] + .streamingQuery +try { + // in continuous mode, the query will be run even there's no data + // sleep a bit to ensure initialization + waitForLastExecution(q) + + val explainWithoutExtended = q.explainInternal(false) + + print(explainWithoutExtended) --- End diff -- Will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 Add tests for StatefulOperatorsHelper itself as well. (Sorry for pushing commits multiple times which trigger multiple builds. It might be ideal if older test builds are terminated once newer test build for specific PR is just launched.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 Now I'd like to propose changing default behavior to apply new path but keeping backward compatibility, so applied it to the patch. I'm still open on decision to apply it as advanced option as first approach, and happy to roll back when we decide on that way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 My pleasure. Thanks for spending your time to review thoughtfully and merge this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 @tdas Addressed review comments. Please take a look again. Thanks in advance! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r203577783 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -270,11 +273,43 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } else Iterator.empty } + /** This method is intended to be only used for unit test(s). DO NOT TOUCH ELEMENTS IN MAP! */ + private[state] def getClonedLoadedMaps(): util.SortedMap[Long, MapType] = synchronized { --- End diff -- Agreed. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r203577561 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala --- @@ -64,21 +66,143 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] require(!StateStore.isMaintenanceRunning) } + def updateVersionTo( + provider: StateStoreProvider, + currentVersion: Int, + targetVersion: Int): Int = { +var newCurrentVersion = currentVersion +for (i <- newCurrentVersion until targetVersion) { + newCurrentVersion = incrementVersion(provider, i) +} +require(newCurrentVersion === targetVersion) +newCurrentVersion + } + + def incrementVersion(provider: StateStoreProvider, currentVersion: Int): Int = { +val store = provider.getStore(currentVersion) +put(store, "a", currentVersion + 1) +store.commit() +currentVersion + 1 + } + + def checkLoadedVersions( + loadedMaps: util.SortedMap[Long, ProviderMapType], + count: Int, + earliestKey: Long, + latestKey: Long): Unit = { +assert(loadedMaps.size() === count) +assert(loadedMaps.firstKey() === earliestKey) +assert(loadedMaps.lastKey() === latestKey) + } + + def checkVersion( + loadedMaps: util.SortedMap[Long, ProviderMapType], + version: Long, + expectedData: Map[String, Int]): Unit = { + +val originValueMap = loadedMaps.get(version).asScala.map { entry => + rowToString(entry._1) -> rowToInt(entry._2) +}.toMap + +assert(originValueMap === expectedData) + } + + test("retaining only two latest versions when MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 2") { +val provider = newStoreProvider(opId = Random.nextInt, partition = 0, + numOfVersToRetainInMemory = 2) + +var currentVersion = 0 + +// commit the ver 1 : cache will have one element +currentVersion = incrementVersion(provider, currentVersion) +assert(getData(provider) === Set("a" -> 1)) +var loadedMaps = provider.getClonedLoadedMaps() +checkLoadedVersions(loadedMaps, 1, 1L, 1L) --- End diff -- Yeah I'd add 'L' everywhere if the type of literal number is long so that we don't rely on autocasting and be sure about the type explicitly, but no strong opinion about this. I can follow existing Spark preferences. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 @tdas Thanks for the detailed review! Addressed review comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r202933053 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -270,11 +273,42 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } else Iterator.empty } + /** This method is intended to be only used for unit test(s). DO NOT TOUCH ELEMENTS IN MAP! */ + private[state] def getClonedLoadedMaps(): util.SortedMap[Long, MapType] = synchronized { +// shallow copy as a minimal guard +loadedMaps.clone().asInstanceOf[util.SortedMap[Long, MapType]] + } + + private def putStateIntoStateCache(newVersion: Long, map: MapType): Unit = synchronized { +if (numberOfVersionsToRetainInMemory <= 0) { + if (loadedMaps.size() > 0) loadedMaps.clear() + return +} + +while (loadedMaps.size() > numberOfVersionsToRetainInMemory) { + loadedMaps.remove(loadedMaps.lastKey()) +} + +val size = loadedMaps.size() +if (size == numberOfVersionsToRetainInMemory) { + val versionIdForLastKey = loadedMaps.lastKey() + if (versionIdForLastKey > newVersion) { +// this is the only case which put doesn't need --- End diff -- Will update the comment to clarify a bit more. We just avoid the case when the element is being added to the last and required to be evicted right away. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r202932784 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala --- @@ -64,21 +64,122 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] require(!StateStore.isMaintenanceRunning) } + def updateVersionTo(provider: StateStoreProvider, currentVersion: => Int, + targetVersion: Int): Int = { --- End diff -- Thanks for correcting style guide. Will fix. Regarding `currentVersion: => Int` is somehow I was trying to reuse currentVersion and didn't roll back. Will fix. And I agree it would be better to have `incrementVersion` to shorter the code. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21357 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 @arunmahadevan @jose-torres https://issues.apache.org/jira/browse/SPARK-24763?focusedCommentId=16541367=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16541367 I had a chance to test this patch with more kinds of use cases, and in overall enabling option shows on far or slightly better performance whereas it reduces state size according to the ratio of size of key-value pair. I'm now feeling that it would make sense to adopt new strategy to the default and use old behavior as fallback of supporting old app, but the numbers is for persuading committers and I still agree decision would be necessary from committer(s). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 @jose-torres Addressed review comment. Please take a look again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r201866930 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala --- @@ -99,43 +102,84 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) -updateVersionTo(3) +// this trigger exceeding cache and 1 will be evicted +currentVersion = updateVersionTo(provider, currentVersion, 3) assert(getData(provider) === Set("a" -> 3)) loadedMaps = provider.getClonedLoadedMaps() -assert(loadedMaps.size() === 3) +assert(loadedMaps.size() === 2) assert(loadedMaps.firstKey() === 3L) -assert(loadedMaps.lastKey() === 1L) +assert(loadedMaps.lastKey() === 2L) assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3)) assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) + } + + test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 1") { +val provider = newStoreProvider(opId = Random.nextInt, partition = 0, + numOfVersToRetainInMemory = 1) + +var currentVersion = 0 + +def restoreOriginValues(map: provider.MapType): Map[String, Int] = { --- End diff -- I've just allowed redundant function definition cause there's no way to use `provider.MapType` in parameter type unless provider is defined. If we really want to get rid of redundant function definition, we may have to change it to ConcurrentMap directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r201866279 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala --- @@ -64,6 +64,63 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] require(!StateStore.isMaintenanceRunning) } + test("retaining only latest configured size of versions in memory") { --- End diff -- It is fairly easy to check whether reading from cache or reading from file with https://github.com/apache/spark/pull/21469/commits/c9aada520889b87ace0886805910f0d56d099bd2 in #21469 since it introduces metrics for cache hit and cache miss, but not easy to check in this PR itself. So I just rely on checking cache to ensure the data is correctly evicted and not available in cache as expected. Hope this is OK. Btw, I caught a silly bug while adding tests to cover your suggestion. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 I guess we would have to treat reducing state memory size to have worth to do: as described in above commit, we already optimized in HDFSBackedStateStoreProvider for reducing state store disk size (as well as network transfer) via not storing 4 bytes per each row (from both key and value). This approach would normally save more than previous optimization on value row, given key would have window information which contains two values: start and end. The main issue on this approach for me is possible perf. impact on workloads. Hopefully the workload I've covered shows even slight perf. improvement but not sure for other workloads yet. I might say we need to consider changing default behavior when I have overall good backing numbers, but in any way, I'm sure I agree that deciding from committer(s) is necessary. Would we be better to initiate mail thread in dev. mailing list? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r201848371 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -825,6 +825,16 @@ object SQLConf { .intConf .createWithDefault(100) + val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION = --- End diff -- And the default value of this is `false` so end users will be aware of existence of this option, and have a chance to read the explanation before setting this option to `true`. We might elaborate a bit more on the config: tradeoff between reduced memory usage vs possible perf. hit and suggest running this in non-production before applying this to production. If we feel safer on elaborating more on this, I'm happy to update it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 @arunmahadevan I'm actually in favor of changing default behavior, just not 100% sure the result would be promising for exhaustive use cases. I might need to prepare more kinds of key/value pair (key size bigger than value size, key size smaller than value size, key size equals to value size, what else I'm missing here?) and run some tests and back it up with new numbers. Btw, as you commented, there seems two approaches to identify the old and new format: > looking at the fields in the row Actually I tried to do it before (via checking count of fields in value row, since this patch reduces the count of fields in value row), and soon realized I can't do it because HDFSBackedStateStoreProvider relies on provided keySchema and valueSchema when serializing / deserializing rows, not leveraging UnsafeRow's serialization/deserialization mechanism (writeExternal/readExternal or write/read via Kyro), so it will just show undefined behavior if the schema doesn't match with actual rows, and we can't verify this. Current approach saves cost to write/read two additional integers with sacrificing the way to verify the rows. If we would want to add the feature, state migration should be happened. > introducing a row version to differentiate old vs new We could do this via applying same approach in #21739 so this is valid, but query with old state format should do state migration (not easy to do since it should be done against multiple versions of states), or continue relying on old state format. @jose-torres Could you please take a look at @arunmahadevan 's comment as well as this comment and comment yours? Thanks in advance! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r201829938 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -825,6 +825,16 @@ object SQLConf { .intConf .createWithDefault(100) + val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION = --- End diff -- I'm sorry, but I'm not sure if I understand your suggestion correctly. I guess defining configuration to spark conf would be easier to guard against modification after starting query, via existing approach - adding conf to OffsetSeqMetadata - whereas I'm not sure we could guard against modification of query option. I might be missing something here. Could you elaborate a bit more? Thanks in advance! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r201562690 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -239,8 +241,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit @volatile private var valueSchema: StructType = _ @volatile private var storeConf: StateStoreConf = _ @volatile private var hadoopConf: Configuration = _ + @volatile private var numberOfVersionsToRetainInMemory: Int = _ - private lazy val loadedMaps = new mutable.HashMap[Long, MapType] + private lazy val loadedMaps = new util.TreeMap[Long, MapType](Ordering[Long].reverse) --- End diff -- Just FYI: Referring java.util.TreeMap is unavoidable cause Scala doesn't support mutable SortedMap until Scala 2.12, hence needs additional changes for interop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r201515401 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.streaming.state; + +import java.util.Comparator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}. + * + * As TreeMap does, this implementation sorts elements in natural order, and cuts off + * smaller elements to retain at most bigger N elements. + * + * You can provide reversed order of comparator to retain smaller elements instead. + * + * This class is not thread-safe, so synchronization would be needed to use this concurrently. + * + * @param key type + * @param value type + */ +public final class BoundedSortedMap extends TreeMap { --- End diff -- Moved the logic to HDFSBackedStateStoreProvider and removed BoundedSortedMap as well as test suite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 Now I'm thinking about removing "metricProviderLoaderCountOfVersionsInMap" and also removing StateStoreCustomAverageMetric, since the value doesn't look correct with stream-stream join which handles multiple states in an operation. We may try out SQLMetric to extend AccumulatorV2[(kind of Metric class), (kind of Metric class)] which Metric class can handle merging multiple values correctly according to the kind of metric, but this is beyond the scope of PR, so just removing "metricProviderLoaderCountOfVersionsInMap" would be clearer for this patch for now. What do you think, @arunmahadevan @jose-torres ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21622 Thanks for reviewing @arunmahadevan and @jose-torres ! Could we finalize review on #21469 to see a chance to include "providerLoadedMapSizeBytes" to here? Or is it OK to handle it with follow-up issue? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r201483544 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -825,6 +825,16 @@ object SQLConf { .intConf .createWithDefault(100) + val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION = --- End diff -- This is not compatible with current stateful aggregation (definitely, that's the improvement of this patch) and there is no undo. So once end users enable the option in the query, the option must be enabled unless end users clear out checkpoint. (I've added the new option to OffsetSeqMetadata to remember the first setting like partition count). I'm seeing performance on far or even slightly better on specific workload (publicized in description link), but I would say I cannot try out exhaustive workloads. I actually expected a tradeoff between performance vs state memory usage, so assuming if other workloads follow the tradeoff, end users may need to try out this option in their query with non-production environment (for example, staged) to ensure enabling option doesn't break their expectation of performance. That's why I also make changes available as an option instead of modifying default behavior. If we apply this to the default behavior, we need to provide state migration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org