[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208594904
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
--- End diff --

`getKey` was basically UnsafeProjection in statefulOperator so didn't 
necessarily require UnsafeRow. I just followed the usage to make it less 
restrict, but we know, in reality `row` will be always UnsafeRow. So OK to fix 
if it provides consistency.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208592556
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -871,6 +871,16 @@ object SQLConf {
 .intConf
 .createWithDefault(2)
 
+  val STREAMING_AGGREGATION_STATE_FORMAT_VERSION =
+buildConf("spark.sql.streaming.aggregation.stateFormatVersion")
+  .internal()
+  .doc("State format version used by streaming aggregation operations 
in a streaming query. " +
+"State between versions are tend to be incompatible, so state 
format version shouldn't " +
+"be modified after running.")
+  .intConf
+  .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
+  .createWithDefault(2)
--- End diff --

Nice suggestion. Will add the test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208591232
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
--- End diff --

Maybe I misinterpret your suggestion before. I thought you are suggesting 
move to state package class. Will place it to separate file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-08-08 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
My series of patches could be possible based on two metrics: `size for 
memory usage of latest version` and `size for total memory usage of loaded 
versions`. SPARK-24717 (#21700) enabled the possibility to tune the overall 
state memory usage, and if end users have either one metric they couldn't tune 
it.

IMHO, I'm not 100% sure how much this patch provides confusion to the end 
users, but if the intention of `memoryUsedBytes` is for measuring overall state 
partition, what about replacing `memoryUsedBytes` as `size for total memory 
usage of loaded versions`, but also placing `size for memory usage of latest 
version` to custom metric?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-08-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
Also added javadoc as well. Most of contents are from StateStore but I 
didn't copy the note to implementation for state store since it is duplicated. 
Please let me know if we want to add content for the parameter target state 
store as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-08-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
@tdas 
Done running perf. test with 4 more tests:

> BenchmarkMovingAggregationsListenerKeyMuchBigger

rate: 16

version | input rows per second | processed rows per second | total state 
rows | used bytes of current state version
 |  |  |  | 
| latest master (c9914cf) | 159877.232 | 149537.817 | 65000 | 133511303 |
| patch (on top of c9914cf) | 160049.118 | 152497.945 | 65000 | 73236351 |

state size: 54.854 % (reduces 45.15%)

> BenchmarkMovingAggregationsListenerManyKeys

rate: 12

version | input rows per second | processed rows per second | total state 
rows | used bytes of current state version
 |  |  |  | 
| latest master (c9914cf) | 120266.810 | 107482.042 | 65000 | 38433719 |
| patch (on top of c9914cf) | 119865.855 | 109268.772 | 65000 | 24900343 |

state size: 64.787% (reduces 35.21%)

> BenchmarkMovingAggregationsListenerManyValues

rate: 25000

version | input rows per second | processed rows per second | total state 
rows | used bytes of current state version
 |  |  |  | 
| latest master (c9914cf) | 25009.236 | 21216.126 | 9 | 77161711 
(857.352 per row) |
| patch (on top of c9914cf) | 25060.635 | 20774.500 | 99495 | 78230335 
(786.274 per row) |

state size: 91.709 % (reduces 8.29 %)

> BenchmarkMovingAggregationsListenerValueMuchBigger

rate: 85000

version | input rows per second | processed rows per second | total state 
rows | used bytes of current state version
 |  |  |  | 
| latest master (c9914cf) | 85310.774 | 79091.271 | 1000 | 1324255 |
| patch (on top of c9914cf) | 84791.761 | 79755.905 | 1000 | 1282687 |

state size: 96.861 % (reduces 3.14 %)

I don't find any outstanding perf. hit, and expected state size reduction 
is shown from all over the cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-08-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
Thanks @zsxwing for merging and thanks all for reviewing!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-08-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21622
  
Thanks @HyukjinKwon for merging, and thanks all for reviewing!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-08-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
@tdas Kindly reminder. I'll take the doc step when you say it's OK to go.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-08-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@tdas Kindly reminder.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-08-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21622
  
@HyukjinKwon Could you take this forward given that the patch is minor and 
CI test is passed? Thanks in advance!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-08-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21222#discussion_r207881161
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -513,6 +515,125 @@ class StreamSuite extends StreamTest {
 }
   }
 
+  test("explain-continuous") {
+val inputData = ContinuousMemoryStream[Int]
+val df = inputData.toDS().map(_ * 2).filter(_ > 5)
+
+// Test `df.explain`
+val explain = ExplainCommand(df.queryExecution.logical, extended = 
false)
+val explainString =
+  spark.sessionState
+.executePlan(explain)
+.executedPlan
+.executeCollect()
+.map(_.getString(0))
+.mkString("\n")
+assert(explainString.contains("Filter"))
+assert(explainString.contains("MapElements"))
+assert(!explainString.contains("LocalTableScan"))
+
+// Test StreamingQuery.display
+val q = df.writeStream.queryName("memory_continuous_explain")
+  .outputMode(OutputMode.Update()).format("memory")
+  .trigger(Trigger.Continuous("1 seconds"))
+  .start()
+  .asInstanceOf[StreamingQueryWrapper]
+  .streamingQuery
+try {
+  // in continuous mode, the query will be run even there's no data
+  // sleep a bit to ensure initialization
+  eventually(timeout(2.seconds), interval(100.milliseconds)) {
+assert(q.lastExecution != null)
+  }
+
+  val explainWithoutExtended = q.explainInternal(false)
+
+  // `extended = false` only displays the physical plan.
+  assert("Streaming RelationV2 ContinuousMemoryStream".r
+.findAllMatchIn(explainWithoutExtended).size === 0)
+  assert("ScanV2 ContinuousMemoryStream".r
+.findAllMatchIn(explainWithoutExtended).size === 1)
+
+  val explainWithExtended = q.explainInternal(true)
+  // `extended = true` displays 3 logical plans 
(Parsed/Optimized/Optimized) and 1 physical
+  // plan.
+  assert("Streaming RelationV2 ContinuousMemoryStream".r
+.findAllMatchIn(explainWithExtended).size === 3)
+  assert("ScanV2 ContinuousMemoryStream".r
+.findAllMatchIn(explainWithExtended).size === 1)
+} finally {
+  q.stop()
+}
+  }
+
+  test("codegen-microbatch") {
+import org.apache.spark.sql.execution.debug._
+
+val inputData = MemoryStream[Int]
+val df = inputData.toDS().map(_ * 2).filter(_ > 5)
+
+// Test StreamingQuery.codegen
+val q = df.writeStream.queryName("memory_microbatch_codegen")
+  .outputMode(OutputMode.Update)
+  .format("memory")
+  .trigger(Trigger.ProcessingTime("1 seconds"))
+  .start()
+
+try {
+  assert("No physical plan. Waiting for data." === codegenString(q))
+  assert(codegenStringSeq(q).isEmpty)
+
+  inputData.addData(1, 2, 3, 4, 5)
+  q.processAllAvailable()
+
+  val codegenStr = codegenString(q)
--- End diff --

Nice finding. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-08-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21622
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-08-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
@zsxwing Addressed review comments. Please take a look again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-08-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
@zsxwing OK I also wonder `debug` is applicable for streaming but wanted to 
fill the gap earlier. Will remove `debug` for streaming. Will update shortly. 
Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-08-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
@tdas 

I found the spare time to run performance tests though I've run only one 
app for now... I couldn't run the tests concurrently. Please let me know if you 
are not confident with the results from one app: I'll find more time to go with 
all test cases. Hope this number could give confident to accept the patch.

> Machine info.

MBP 15-inch Mid 2015

* i7 2.5Ghz (4 core)
* 16GB 1600 Mhz DDR3
* SSD 512G

> Test information

* base commit : c9914cf (latest master branch)
* patch internally rebased with base commit before testing
* spark-submit options: master local[3] --driver-memory 6g
  * I don't run perf. test with all cores and memory: I left some spare 
resource for OS and background apps.

> Performance test code


https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListener.scala

Please note that there're 4 more apps (big key size, big value size, many 
key columns, many value columns) in same repository.

> Test result

Both of version didn't catch up rate per seconds 20, but since 
processed rows per second were around 188000 I felt I don't need to adjust rate 
per seconds more tightly (like 185000, 19, etc...).

The numbers for input rows per seconds and processed rows per second are 
calculated by taking average of 3 batches (38, 39, 40 respectively). The 
numbers regarding state are picked when total state rows went to 6.

version | input rows per second | processed rows per second | total state 
rows | used bytes of current state version
 |  |  |  | 
| latest master (c9914cf) | 200492.065 | 10.316 | 6 | 17,755,895 |
| patch (on top of c9914cf) | 199242.598 | 188160.833  | 6 | 14,687,543 
|

So while two processed rows per seconds didn't show outstanding difference 
(under 1%), the patch reduced memory usage of state (for latest version) by 
17.29 %. One thing to note is, in performance test, state is saved to the local 
SSD. It may give (small? trivial?) performance benefit on the patch when we set 
remote checkpoint directory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r207096556
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+TextSocketOffset(Seria

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r207096219
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
--- End diff --

Yeah, agreed. I'm OK if same implications are used in other places.


---

---

[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
Retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21622
  
Test failure looks unrelated.

Jenkins, retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
@tdas 
I've applied your review comments except documentation. (Will add WIP to 
the PR's title if it sounds clearer) There may be something you can add the 
review comments and so I'd like to work on documentation when the patch is in a 
shape to "ready to merge".

Otherwise I'll try to find time/resource and run the performance tests 
again, but it might take couple of days or more to get it. Will update once I 
run and get new numbers. During the wait please continuous reviewing the code. 
It would help running the tests with latest updated patch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206790470
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+class MemoryStateStore extends StateStore() {
+  import scala.collection.JavaConverters._
+  private val map = new ConcurrentHashMap[UnsafeRow, UnsafeRow]
+
+  override def iterator(): Iterator[UnsafeRowPair] = {
+map.entrySet.iterator.asScala.map { case e => new 
UnsafeRowPair(e.getKey, e.getValue) }
+  }
+
+  override def get(key: UnsafeRow): UnsafeRow = map.get(key)
+
+  override def put(key: UnsafeRow, newValue: UnsafeRow): Unit = {
+map.put(key.copy(), newValue.copy())
+  }
+
+  override def remove(key: UnsafeRow): Unit = {
+map.remove(key)
--- End diff --

Yeah missed that. Will fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206791736
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, 
GenerateUnsafeRowJoiner}
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
UnsafeRowPair}
+import org.apache.spark.sql.types.StructType
+
+object StatefulOperatorsHelper {
+
+  val supportedVersions = Seq(1, 2)
+  val legacyVersion = 1
+
+  sealed trait StreamingAggregationStateManager extends Serializable {
+def extractKey(row: InternalRow): UnsafeRow
+def getValueExpressions: Seq[Attribute]
--- End diff --

It would be going to be `getStateValueSchema` btw, once we change return 
type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206784385
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -201,33 +200,37 @@ object WatermarkSupport {
 case class StateStoreRestoreExec(
 keyExpressions: Seq[Attribute],
 stateInfo: Option[StatefulOperatorStateInfo],
+stateFormatVersion: Int,
 child: SparkPlan)
   extends UnaryExecNode with StateStoreReader {
 
+  private[sql] val stateManager = 
StreamingAggregationStateManager.createStateManager(
+keyExpressions, child.output, stateFormatVersion)
+
   override protected def doExecute(): RDD[InternalRow] = {
 val numOutputRows = longMetric("numOutputRows")
 
 child.execute().mapPartitionsWithStateStore(
   getStateInfo,
   keyExpressions.toStructType,
-  child.output.toStructType,
+  stateManager.getValueExpressions.toStructType,
--- End diff --

Right. Sounds like `StructType` is preferred than `Seq[Attribute]` in this 
case. Will apply.

Maybe dumb question from newbie on Spark SQL (still trying to get familiar 
with) : I guess we prefer StructType in this case cause it's less restrictive 
and also get rid of headache of dealing with fields reference. Do I understand 
correctly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206791325
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulOperatorsHelperSuite.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import 
org.apache.spark.sql.execution.streaming.StatefulOperatorsHelper.StreamingAggregationStateManager
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+class StatefulOperatorsHelperSuite extends StreamTest {
+  import TestMaterial._
+
+  test("StateManager v1 - get, put, iter") {
+val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 
1)
+
+// in V1, input row is stored as value
+testGetPutIterOnStateManager(stateManager, OUTPUT_ATTRIBUTES, 
TEST_ROW, TEST_KEY_ROW, TEST_ROW)
+  }
+
+  //  StateManagerImplV2 

+  test("StateManager v2 - get, put, iter") {
+val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 
2)
+
+// in V2, row for values itself (excluding keys from input row) is 
stored as value
+// so that stored value doesn't have key part, but state manager V2 
will provide same output
+// as V1 when getting row for key
+testGetPutIterOnStateManager(stateManager, VALUES_ATTRIBUTES, 
TEST_ROW, TEST_KEY_ROW,
+  TEST_VALUE_ROW)
+  }
+
+  private def newStateManager(
+  keysAttributes: Seq[Attribute],
+  outputAttributes: Seq[Attribute],
--- End diff --

Yes, and actually, for StateManager, `input row attributes` and `output 
attributes` are same according to how StateStore*Exec work, so I picked either 
one. I'm happy to rename if `inputRowAttributes` is clearer to give insight 
which schema should be passed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206786014
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -53,7 +53,35 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
 
   import testImplicits._
 
-  test("simple count, update mode") {
+  def executeFuncWithStateVersionSQLConf(
+  stateVersion: Int,
+  confPairs: Seq[(String, String)],
+  func: => Any): Unit = {
+withSQLConf(confPairs ++
+  Seq(SQLConf.STREAMING_AGGREGATION_STATE_FORMAT_VERSION.key -> 
stateVersion.toString): _*) {
+  func
+}
+  }
+
+  def testWithAllStateVersions(name: String, confPairs: (String, String)*)
--- End diff --

Actually it's basically from wondering of how `withSQLConf` works. Does 
`withSQLConf` handle nested `withSQLConf` properly? If then we don't need to 
add `confPairs` param at all, and if not I guess we might still want to add 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206780521
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, 
GenerateUnsafeRowJoiner}
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
UnsafeRowPair}
+import org.apache.spark.sql.types.StructType
+
+object StatefulOperatorsHelper {
+
+  val supportedVersions = Seq(1, 2)
+  val legacyVersion = 1
+
+  sealed trait StreamingAggregationStateManager extends Serializable {
+def extractKey(row: InternalRow): UnsafeRow
+def getValueExpressions: Seq[Attribute]
+def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
--- End diff --

I might think naively about this: I thought its interface is similar to 
StateStore so wondered we need to add docs, but I think I was wrong. Will add 
docs. Thanks for the insightful input!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206779898
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, 
GenerateUnsafeRowJoiner}
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
UnsafeRowPair}
+import org.apache.spark.sql.types.StructType
+
+object StatefulOperatorsHelper {
+
+  val supportedVersions = Seq(1, 2)
+  val legacyVersion = 1
+
+  sealed trait StreamingAggregationStateManager extends Serializable {
+def extractKey(row: InternalRow): UnsafeRow
+def getValueExpressions: Seq[Attribute]
+def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow
--- End diff --

> In fact, if there exists a StateManager to manage all the state in the 
store, then ALL operations to add/remove state should go through the manager 
and store should not be accessed directly.

Totally agreed that it should be better design of StateManager. I don't 
remember I tried to do before, so let me try applying your suggestion and see 
there's anything blocks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206781209
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, 
GenerateUnsafeRowJoiner}
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
UnsafeRowPair}
+import org.apache.spark.sql.types.StructType
+
+object StatefulOperatorsHelper {
+
+  val supportedVersions = Seq(1, 2)
+  val legacyVersion = 1
+
+  sealed trait StreamingAggregationStateManager extends Serializable {
+def extractKey(row: InternalRow): UnsafeRow
+def getValueExpressions: Seq[Attribute]
+def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+def put(store: StateStore, row: UnsafeRow): Unit
+  }
+
+  object StreamingAggregationStateManager extends Logging {
+def createStateManager(
+keyExpressions: Seq[Attribute],
+childOutput: Seq[Attribute],
+stateFormatVersion: Int): StreamingAggregationStateManager = {
+  stateFormatVersion match {
+case 1 => new 
StreamingAggregationStateManagerImplV1(keyExpressions, childOutput)
+case 2 => new 
StreamingAggregationStateManagerImplV2(keyExpressions, childOutput)
+case _ => throw new IllegalArgumentException(s"Version 
$stateFormatVersion is invalid")
+  }
+}
+  }
+
+  abstract class StreamingAggregationStateManagerBaseImpl(
+  protected val keyExpressions: Seq[Attribute],
+  protected val childOutput: Seq[Attribute]) extends 
StreamingAggregationStateManager {
+
+@transient protected lazy val keyProjector =
+  GenerateUnsafeProjection.generate(keyExpressions, childOutput)
+
+def extractKey(row: InternalRow): UnsafeRow = keyProjector(row)
+  }
+
+  class StreamingAggregationStateManagerImplV1(
+  keyExpressions: Seq[Attribute],
+  childOutput: Seq[Attribute])
+extends StreamingAggregationStateManagerBaseImpl(keyExpressions, 
childOutput) {
+
+override def getValueExpressions: Seq[Attribute] = {
+  childOutput
+}
+
+override def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow = {
+  rowPair.value
+}
+
+override def get(store: StateStore, key: UnsafeRow): UnsafeRow = {
+  store.get(key)
+}
+
+override def put(store: StateStore, row: UnsafeRow): Unit = {
+  store.put(extractKey(row), row)
+}
+  }
+
+  class StreamingAggregationStateManagerImplV2(
--- End diff --

Great point. I might be in a rush to show its shape. Will add doc for state 
formats in both V1 and V2.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206790358
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+class MemoryStateStore extends StateStore() {
--- End diff --

It was actually just extracted from other place to reuse among the places, 
but I agree it's better to document once it is kind of public API for testing. 
Will add.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206778127
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, 
GenerateUnsafeRowJoiner}
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
UnsafeRowPair}
+import org.apache.spark.sql.types.StructType
+
+object StatefulOperatorsHelper {
+
+  val supportedVersions = Seq(1, 2)
+  val legacyVersion = 1
+
+  sealed trait StreamingAggregationStateManager extends Serializable {
--- End diff --

Will fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206780754
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, 
GenerateUnsafeRowJoiner}
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
UnsafeRowPair}
+import org.apache.spark.sql.types.StructType
+
+object StatefulOperatorsHelper {
+
+  val supportedVersions = Seq(1, 2)
+  val legacyVersion = 1
+
+  sealed trait StreamingAggregationStateManager extends Serializable {
+def extractKey(row: InternalRow): UnsafeRow
+def getValueExpressions: Seq[Attribute]
+def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+def put(store: StateStore, row: UnsafeRow): Unit
+  }
+
+  object StreamingAggregationStateManager extends Logging {
+def createStateManager(
+keyExpressions: Seq[Attribute],
+childOutput: Seq[Attribute],
--- End diff --

Sounds much better and you're right about concept of `child`. Will rename.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206778355
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, 
GenerateUnsafeRowJoiner}
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
UnsafeRowPair}
+import org.apache.spark.sql.types.StructType
+
+object StatefulOperatorsHelper {
+
+  val supportedVersions = Seq(1, 2)
+  val legacyVersion = 1
+
+  sealed trait StreamingAggregationStateManager extends Serializable {
+def extractKey(row: InternalRow): UnsafeRow
--- End diff --

Renaming sounds better. Will rename, and will also add docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206790505
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulOperatorsHelperSuite.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import 
org.apache.spark.sql.execution.streaming.StatefulOperatorsHelper.StreamingAggregationStateManager
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+class StatefulOperatorsHelperSuite extends StreamTest {
--- End diff --

Will rename.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206788634
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulOperatorsHelperSuite.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import 
org.apache.spark.sql.execution.streaming.StatefulOperatorsHelper.StreamingAggregationStateManager
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+class StatefulOperatorsHelperSuite extends StreamTest {
+  import TestMaterial._
+
+  test("StateManager v1 - get, put, iter") {
+val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 
1)
+
+// in V1, input row is stored as value
+testGetPutIterOnStateManager(stateManager, OUTPUT_ATTRIBUTES, 
TEST_ROW, TEST_KEY_ROW, TEST_ROW)
+  }
+
+  //  StateManagerImplV2 

+  test("StateManager v2 - get, put, iter") {
+val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 
2)
+
+// in V2, row for values itself (excluding keys from input row) is 
stored as value
+// so that stored value doesn't have key part, but state manager V2 
will provide same output
+// as V1 when getting row for key
+testGetPutIterOnStateManager(stateManager, VALUES_ATTRIBUTES, 
TEST_ROW, TEST_KEY_ROW,
+  TEST_VALUE_ROW)
+  }
+
+  private def newStateManager(
+  keysAttributes: Seq[Attribute],
+  outputAttributes: Seq[Attribute],
+  version: Int): StreamingAggregationStateManager = {
+StreamingAggregationStateManager.createStateManager(keysAttributes, 
outputAttributes, version)
+  }
+
+  private def testGetPutIterOnStateManager(
+  stateManager: StreamingAggregationStateManager,
+  expectedValueExpressions: Seq[Attribute],
+  inputRow: UnsafeRow,
+  expectedStateKey: UnsafeRow,
+  expectedStateValue: UnsafeRow): Unit = {
+
+assert(stateManager.getValueExpressions === expectedValueExpressions)
+
+val memoryStateStore = new MemoryStateStore()
+stateManager.put(memoryStateStore, inputRow)
+
+assert(memoryStateStore.iterator().size === 1)
+
+val keyRow = stateManager.extractKey(inputRow)
+assert(keyRow === expectedStateKey)
+
+// iterate state store and verify whether expected format of key and 
value are stored
+val pair = memoryStateStore.iterator().next()
+assert(pair.key === keyRow)
+assert(pair.value === expectedStateValue)
+assert(stateManager.restoreOriginRow(pair) === inputRow)
+
+// verify the stored value once again via get
+assert(memoryStateStore.get(keyRow) === expectedStateValue)
+
+// state manager should return row which is same as input row 
regardless of format version
+assert(inputRow === stateManager.get(memoryStateStore, keyRow))
+  }
+
+}
+
+object TestMaterial {
+  val KEYS: Seq[String] = Seq("key1", "key2")
--- End diff --

I intended to use them like `static final` fields: so treated them as 
constants and follow the style guide for constants - `Constants should be all 
uppercase letters and be put in a companion object.` That's why I extracted 
them into separate object (though it is not a companion object due to naming 
better) as well as naming as uppercases.

But that is not intentional and definitely bad if it requires us to jump 
back and forth. I'm going to place them as earliest part of class for now

[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206778971
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, 
GenerateUnsafeRowJoiner}
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
UnsafeRowPair}
+import org.apache.spark.sql.types.StructType
+
+object StatefulOperatorsHelper {
+
+  val supportedVersions = Seq(1, 2)
+  val legacyVersion = 1
+
+  sealed trait StreamingAggregationStateManager extends Serializable {
+def extractKey(row: InternalRow): UnsafeRow
+def getValueExpressions: Seq[Attribute]
--- End diff --

It is to define the schema of value from / to state. For V1 it would be 
same to input schema and for V2 it would be `input schema - key schema`. Would 
`getStateValueExpressions` be OK for us?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206778077
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, 
GenerateUnsafeRowJoiner}
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
UnsafeRowPair}
+import org.apache.spark.sql.types.StructType
+
+object StatefulOperatorsHelper {
--- End diff --

Yeah right. I found your PR useful to get an idea of how to model the 
classes because it was dealing with similar requirement, but didn't indicate 
the reason why you place it into StatefulOperatorsHelper. I'll move them to the 
state package.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206775357
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -871,6 +871,16 @@ object SQLConf {
 .intConf
 .createWithDefault(2)
 
+  val STREAMING_AGGREGATION_STATE_FORMAT_VERSION =
+
buildConf("spark.sql.streaming.streamingAggregation.stateFormatVersion")
--- End diff --

Ah OK. Sounds better. Will fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r206776398
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -871,6 +871,16 @@ object SQLConf {
 .intConf
 .createWithDefault(2)
 
+  val STREAMING_AGGREGATION_STATE_FORMAT_VERSION =
+
buildConf("spark.sql.streaming.streamingAggregation.stateFormatVersion")
+  .internal()
+  .doc("State format version used by streaming aggregation operations 
triggered " +
+"explicitly or implicitly via agg() in a streaming query. State 
between versions are " +
--- End diff --

I was to explain that the option only applies to the operators which go 
through StateStoreRestoreExec / StateStoreSaveExec (so max("field1") as well as 
agg("field1" -> "max")), but now I feel it just gives confusion and I don't 
think end users need to understand details behind of config. Will remove the 
part `explicitly or implicitly via agg()`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...

2018-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21622#discussion_r206766835
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 ---
@@ -39,6 +42,23 @@ class MetricsReporter(
   registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
   registerGauge("latency", 
_.durationMs.get("triggerExecution").longValue(), 0L)
 
+  private val timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+
+  registerGauge("eventTime-watermark",
+progress => 
convertStringDateToMillis(progress.eventTime.get("watermark")), 0L)
+
+  registerGauge("states-rowsTotal", 
_.stateOperators.map(_.numRowsTotal).sum, 0L)
+  registerGauge("states-usedBytes", 
_.stateOperators.map(_.memoryUsedBytes).sum, 0L)
+
--- End diff --

Thanks for the input! I'll keep the patch as it is.

Could you suggest approach to extend the maintained metrics? I would like 
to expand more, and newer things might be coming from custom metrics (like from 
source and sink) so might be worth to have extension point.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-07-31 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
@zsxwing Kindly reminder.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-07-31 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21622
  
Pinging @tdas and @zsxwing for reviewing. It's small one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-07-31 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@tdas Thanks for the review! Addressed review comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-07-31 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r206755595
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,24 @@ class StateOperatorProgress private[sql](
   def prettyJson: String = pretty(render(jsonValue))
 
   private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
-new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes)
+new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, customMetrics)
 
   private[sql] def jsonValue: JValue = {
-("numRowsTotal" -> JInt(numRowsTotal)) ~
-("numRowsUpdated" -> JInt(numRowsUpdated)) ~
-("memoryUsedBytes" -> JInt(memoryUsedBytes))
+def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => 
JValue): JValue = {
+  if (map.isEmpty) return JNothing
+  val keys = map.keySet.asScala.toSeq.sorted
+  keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ 
~ _)
+}
+
+val jsonVal = ("numRowsTotal" -> JInt(numRowsTotal)) ~
+  ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+  ("memoryUsedBytes" -> JInt(memoryUsedBytes))
+
+if (!customMetrics.isEmpty) {
--- End diff --

Actually didn't notice that. Thanks for letting me know! Will simplify.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-07-31 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r206755538
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,24 @@ class StateOperatorProgress private[sql](
   def prettyJson: String = pretty(render(jsonValue))
 
   private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
-new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes)
+new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, customMetrics)
 
   private[sql] def jsonValue: JValue = {
-("numRowsTotal" -> JInt(numRowsTotal)) ~
-("numRowsUpdated" -> JInt(numRowsUpdated)) ~
-("memoryUsedBytes" -> JInt(memoryUsedBytes))
+def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => 
JValue): JValue = {
--- End diff --

I've first trying to leverage `StreamingQueryProgress.safeMapToJValue` but 
can't find proper place to move to be co-used, so I simply copied it. Will 
simplify the code block and inline.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-07-31 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r206754359
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -81,10 +81,10 @@ class SQLMetric(val metricType: String, initValue: Long 
= 0L) extends Accumulato
 }
 
 object SQLMetrics {
-  private val SUM_METRIC = "sum"
-  private val SIZE_METRIC = "size"
-  private val TIMING_METRIC = "timing"
-  private val AVERAGE_METRIC = "average"
+  val SUM_METRIC = "sum"
+  val SIZE_METRIC = "size"
+  val TIMING_METRIC = "timing"
+  val AVERAGE_METRIC = "average"
--- End diff --

It was to handle exception case while aggregating custom metrics, 
especially filtering out average since it is not aggregated correctly. Since we 
remove custom average metric, we no longer need to filter out them. Will revert 
the change as well as relevant logic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...

2018-07-31 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21357
  
@tdas 
The rationalization of this patch is to group functions which deal with 
delta and snapshot files into one so that the difference between delta file and 
snapshot file will be clearly shown (actually no difference other than allowing 
TOMBSTONE value in delta file) as well as easy to document about these files. 
It's also easier to add tests for delta / snapshot files.

Indeed my underlying rationalization is to make the class easier to 
understand from newcomers (actually I found it helpful to group them logically 
to understand the code better), but the file has been getting enough love from 
various contributors so may not worth to put effort to make it easiler.

I respect the rule of Spark project, and happy to close if we don't feel 
benefitial to go on. Let's close it and revisit some other one feels 
benefitial. Thanks for providing your voice on this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStorePr...

2018-07-31 Thread HeartSaVioR
Github user HeartSaVioR closed the pull request at:

https://github.com/apache/spark/pull/21357


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-31 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
@tdas 
Thanks for the detailed review! I'll follow up your comments and update the 
patch.

Btw, If my memory is right, I tried out increasing "rate" while 
benchmarking, but rate source itself became bottleneck. Not sure c5.xlarge is 
not enough or I might be missed regarding option(s).

Sadly I can't run benchmark often because I don't have any dedicated 
machine. I would avoid running benchmark in non-dedicated machine for seeing 
computational limit, so paying to AWS to get dedicated instance/machine. I'll 
try out increasing "rate" once more soon, but please guide me if you have any 
suggestions to the benchmark code or approach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206392487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,8 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: Option[JValue] = None) extends Serializable {
--- End diff --

@HyukjinKwon 
Nice finding. I missed it while reviewing.

Btw, FYI, in #21469 I'm adding new field with default value in 
StateOperatorProgress, like `val customMetrics: ju.Map[String, JLong] = new 
ju.HashMap()` and MiMa doesn't complain.


https://github.com/apache/spark/pull/21469/files#diff-e09301244e3c6b1a69eda6c4bd2ddb52

@arunmahadevan 
Maybe `ju.Map[String, JLong]` will also work here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206388466
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
 }
   }
 
+  test("continuous data") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val reader = new TextSocketContinuousReader(
+  new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
+"port" -> serverThread.port.toString).asJava))
+reader.setStartOffset(Optional.empty())
+val tasks = reader.planRowInputPartitions()
+assert(tasks.size == 2)
+
+val numRecords = 10
+val data = scala.collection.mutable.ListBuffer[Int]()
+val offsets = scala.collection.mutable.ListBuffer[Int]()
+import org.scalatest.time.SpanSugar._
+failAfter(5 seconds) {
+  // inject rows, read and check the data and offsets
--- End diff --

Maybe adding more line comments in code block would help understanding the 
test code easier, like intentionally committing in the middle of range, etc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206386593
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+TextSocketOffset(Seria

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206385714
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+TextSocketOffset(Seria

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206357959
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
--- End diff --

While the values are good to be placed with companion object, it looks like 
redundant to have them in both micro-batch and continuous, so might be better 
to have common object to place this. 

We may need to find more spots to deduplicate between micro-batch and 
continuous for socket.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206371107
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
--- End diff --

I'd rather make it safer via either one of two approaches: 

1. assert

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206388213
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
 }
   }
 
+  test("continuous data") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val reader = new TextSocketContinuousReader(
+  new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
+"port" -> serverThread.port.toString).asJava))
+reader.setStartOffset(Optional.empty())
+val tasks = reader.planRowInputPartitions()
+assert(tasks.size == 2)
+
+val numRecords = 10
+val data = scala.collection.mutable.ListBuffer[Int]()
+val offsets = scala.collection.mutable.ListBuffer[Int]()
+import org.scalatest.time.SpanSugar._
+failAfter(5 seconds) {
+  // inject rows, read and check the data and offsets
+  for (i <- 0 until numRecords) {
+serverThread.enqueue(i.toString)
+  }
+  tasks.asScala.foreach {
+case t: TextSocketContinuousInputPartition =>
+  val r = 
t.createPartitionReader().asInstanceOf[TextSocketContinuousInputPartitionReader]
+  for (i <- 0 until numRecords / 2) {
+r.next()
+
offsets.append(r.getOffset().asInstanceOf[ContinuousRecordPartitionOffset].offset)
+data.append(r.get().getString(0).toInt)
+if (i == 2) {
+  commitOffset(t.partitionId, i + 1)
+}
+  }
+  assert(offsets.toSeq == Range.inclusive(1, 5))
+  assert(data.toSeq == Range(t.partitionId, 10, 2))
+  offsets.clear()
+  data.clear()
+case _ => throw new IllegalStateException("Unexpected task type")
+  }
+  assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets 
== List(3, 3))
+  reader.commit(TextSocketOffset(List(5, 5)))
+  assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets 
== List(5, 5))
+}
+
+def commitOffset(partition: Int, offset: Int): Unit = {
+  val offsetsToCommit = 
reader.getStartOffset.asInstanceOf[TextSocketOffset]
+.offsets.updated(partition, offset)
+  reader.commit(TextSocketOffset(offsetsToCommit))
+  assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets 
== offsetsToCommit)
+}
+  }
+
+  test("continuous data - invalid commit") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val reader = new TextSocketContinuousReader(
+  new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
+"port" -> serverThread.port.toString).asJava))
+reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5
+// ok to commit same offset
+reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5
+assertThrows[IllegalStateException] {
+  reader.commit(TextSocketOffset(List(6, 6)))
+}
+  }
+
+  test("continuous data with timestamp") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val reader = new TextSocketContinuousReader(
+  new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
+"includeTimestamp" -> "true",
+"port" -> serverThread.port.toString).asJava))
+reader.setStartOffset(Optional.empty())
+val tasks = reader.planRowInputPartitions()
+assert(tasks.size == 2)
+
+val numRecords = 4
+import org.apache.spark.sql.Row
--- End diff --

Looks like unused import


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206384495
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+TextSocketOffset(Seria

[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21721
  
Looks like we would we also need to add SourceProgress and SinkProgress 
into mima exclude list.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21199
  
@arunmahadevan Thanks for rebasing. I'll take a look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206349942
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+
+/**
+ * A mix in interface for {@link DataSourceWriter}. Data source writers 
can implement this
--- End diff --

OK got your intention. I think it makes sense. I'm OK with all three 
options and personally prefer 1 or 2 if the intention is to mix-in, but let's 
see committers' feedback on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20859: [SPARK-23702][SS] Forbid watermarks on both sides of sta...

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/20859
  
How would we like to handle this patch? I guess we add feature on handling 
multiple watermarks in #21701 so based on the direction this patch might be 
going to be abandoned. IMHO I'm not 100% sure we have clear use cases for 
defining multiple watermarks for one source.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18410: [SPARK-20971][SS] purge metadata log in FileStreamSource

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/18410
  
Looks like this PR is not needed, since `CompactibleFileStreamLog` also 
takes care of metadata log.


https://github.com/apache/spark/commits/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21199
  
@arunmahadevan 
Sorry I forgot to review this so far. Could you fix merge conflicts? I'd 
pull the code to the local and review since the code diff is not small.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/20675
  
Looks like the patch is outdated, and when continuous query supports 
shuffled stateful operators, implementing task level retry is not that trivial. 
To get correct result of aggregation, when one of task fails at epoch N, all 
the tasks and states should be restored to epoch N. 

I definitely agree that it would be ideal to have stable task level retry, 
just wondering this patch would work with follow-up tasks for continuous mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206010782
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+
+/**
+ * A mix in interface for {@link DataSourceWriter}. Data source writers 
can implement this
--- End diff --

If we intend creating a new interface as mix-in, we may not need to create 
individual interfaces for each DataSourceReader and DataSourceWriter. We could 
have only one interface and let DataSourceReader and DataSourceWriter add such 
mix-in interface.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206010370
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -143,18 +150,50 @@ trait ProgressReporter extends Logging {
 }
 logDebug(s"Execution stats: $executionStats")
 
+// extracts custom metrics from readers and writers
+def extractMetrics(getMetrics: () => CustomMetrics,
+  onInvalidMetrics: (Exception) => Unit): 
Option[JValue] = {
+  val metrics = getMetrics()
+  if (metrics != null) {
--- End diff --

We could de-indent via `return early`: it would be simpler cause there's 
nothing but returning `None` if metrics is null, and style guide has such case 
as one of exceptional case where return statement is preferred.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206009928
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -143,18 +150,50 @@ trait ProgressReporter extends Logging {
 }
 logDebug(s"Execution stats: $executionStats")
 
+// extracts custom metrics from readers and writers
+def extractMetrics(getMetrics: () => CustomMetrics,
+  onInvalidMetrics: (Exception) => Unit): 
Option[JValue] = {
+  val metrics = getMetrics()
+  if (metrics != null) {
+try {
+  Some(parse(metrics.json()))
+} catch {
+  case ex: Exception => onInvalidMetrics(ex)
--- End diff --


https://github.com/databricks/scala-style-guide#exception-handling-try-vs-try

According to the guide, this line needs to be replaced with `case 
NonFatal(e) =>`, and I'd place `onInvalidMetrics` and `None` to same 
indentation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
Thanks @zsxwing for reviewing! Addressed review comments. Please take a 
look at again. Thanks in advance!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21222#discussion_r205961782
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---
@@ -116,6 +175,30 @@ package object debug {
 }
   }
 
+  implicit class DebugStreamQuery(query: StreamingQuery) extends Logging {
+def debug(): Unit = {
+  unwrapStreamingQueryWrapper(query) match {
+case w: StreamExecution =>
+  if (w.lastExecution == null) {
+debugPrint("No physical plan. Waiting for data.")
+  } else {
+val executedPlan = w.lastExecution.executedPlan
+if 
(executedPlan.find(_.isInstanceOf[WriteToContinuousDataSourceExec]).isDefined) {
+  debugPrint("Debug on continuous mode is not supported.")
--- End diff --

`debug()` will run query shortly to collect information: for micro-batch it 
will be 1 batch but in continuous mode it will never be finished.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21222#discussion_r205961840
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---
@@ -88,23 +100,70 @@ package object debug {
 }
   }
 
+  /**
+   * Get WholeStageCodegenExec subtrees and the codegen in a query plan 
into one String
+   *
+   * @param query the streaming query for codegen
+   * @return single String containing all WholeStageCodegen subtrees and 
corresponding codegen
+   */
+  def codegenString(query: StreamingQuery): String = {
+val msg = unwrapStreamingQueryWrapper(query) match {
+  case w: StreamExecution =>
+if (w.lastExecution != null) {
+  codegenString(w.lastExecution.executedPlan)
+} else {
+  "No physical plan. Waiting for data."
+}
+
+  case _ => "Only supported for StreamExecution."
+}
+msg
+  }
+
+  /**
+   * Get WholeStageCodegenExec subtrees and the codegen in a query plan
+   *
+   * @param query the streaming query for codegen
+   * @return Sequence of WholeStageCodegen subtrees and corresponding 
codegen
+   */
+  def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = {
+val planAndCodes = unwrapStreamingQueryWrapper(query) match {
+  case w: StreamExecution if w.lastExecution != null =>
+codegenStringSeq(w.lastExecution.executedPlan)
+
+  case _ => Seq.empty
+}
+planAndCodes
+  }
+
+  /* Helper function to reuse duplicated code block between batch and 
streaming. */
--- End diff --

Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21222#discussion_r205961761
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -829,6 +955,18 @@ class StreamSuite extends StreamTest {
   assert(query.exception.isEmpty)
 }
   }
+
+  private def waitForLastExecution(q: StreamExecution): Unit = {
--- End diff --

Thanks for letting me know! Addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21222#discussion_r205961813
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---
@@ -88,23 +100,70 @@ package object debug {
 }
   }
 
+  /**
+   * Get WholeStageCodegenExec subtrees and the codegen in a query plan 
into one String
+   *
+   * @param query the streaming query for codegen
+   * @return single String containing all WholeStageCodegen subtrees and 
corresponding codegen
+   */
+  def codegenString(query: StreamingQuery): String = {
+val msg = unwrapStreamingQueryWrapper(query) match {
+  case w: StreamExecution =>
+if (w.lastExecution != null) {
+  codegenString(w.lastExecution.executedPlan)
+} else {
+  "No physical plan. Waiting for data."
+}
+
+  case _ => "Only supported for StreamExecution."
+}
+msg
+  }
+
+  /**
+   * Get WholeStageCodegenExec subtrees and the codegen in a query plan
+   *
+   * @param query the streaming query for codegen
+   * @return Sequence of WholeStageCodegen subtrees and corresponding 
codegen
+   */
+  def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = {
+val planAndCodes = unwrapStreamingQueryWrapper(query) match {
+  case w: StreamExecution if w.lastExecution != null =>
+codegenStringSeq(w.lastExecution.executedPlan)
+
+  case _ => Seq.empty
+}
+planAndCodes
+  }
+
+  /* Helper function to reuse duplicated code block between batch and 
streaming. */
+  private def debugInternal(plan: SparkPlan): Unit = {
+val visited = new collection.mutable.HashSet[TreeNodeRef]()
+val debugPlan = plan transform {
+  case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) =>
+visited += new TreeNodeRef(s)
+DebugExec(s)
+}
+debugPrint(s"Results returned: ${debugPlan.execute().count()}")
+debugPlan.foreach {
+  case d: DebugExec => d.dumpStats()
+  case _ =>
+}
+  }
+
+  private def unwrapStreamingQueryWrapper(query: StreamingQuery): 
StreamingQuery = {
+query match {
+  case wrapper: StreamingQueryWrapper => wrapper.streamingQuery
+  case _ => query
--- End diff --

OK. I can apply your suggestion but not 100% sure about intention: 
propagate exception to the caller side (user code), or catch exception inside 
debug package and handle it. 
I'll apply latter case but please let me know when you intended former case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21222#discussion_r205961764
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -513,6 +514,131 @@ class StreamSuite extends StreamTest {
 }
   }
 
+  test("explain-continuous") {
+val inputData = ContinuousMemoryStream[Int]
+val df = inputData.toDS().map(_ * 2).filter(_ > 5)
+
+// Test `df.explain`
+val explain = ExplainCommand(df.queryExecution.logical, extended = 
false)
+val explainString =
+  spark.sessionState
+.executePlan(explain)
+.executedPlan
+.executeCollect()
+.map(_.getString(0))
+.mkString("\n")
+assert(explainString.contains("Filter"))
+assert(explainString.contains("MapElements"))
+assert(!explainString.contains("LocalTableScan"))
+
+// Test StreamingQuery.display
+val q = df.writeStream.queryName("memory_continuous_explain")
+  .outputMode(OutputMode.Update()).format("memory")
+  .trigger(Trigger.Continuous("1 seconds"))
+  .start()
+  .asInstanceOf[StreamingQueryWrapper]
+  .streamingQuery
+try {
+  // in continuous mode, the query will be run even there's no data
+  // sleep a bit to ensure initialization
+  waitForLastExecution(q)
+
+  val explainWithoutExtended = q.explainInternal(false)
+
+  print(explainWithoutExtended)
--- End diff --

Will fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
Add tests for StatefulOperatorsHelper itself as well. (Sorry for pushing 
commits multiple times which trigger multiple builds. It might be ideal if 
older test builds are terminated once newer test build for specific PR is just 
launched.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-19 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
Now I'd like to propose changing default behavior to apply new path but 
keeping backward compatibility, so applied it to the patch. I'm still open on 
decision to apply it as advanced option as first approach, and happy to roll 
back when we decide on that way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...

2018-07-19 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
My pleasure. Thanks for spending your time to review thoughtfully and merge 
this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...

2018-07-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
@tdas Addressed review comments. Please take a look again. Thanks in 
advance!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r203577783
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -270,11 +273,43 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 } else Iterator.empty
   }
 
+  /** This method is intended to be only used for unit test(s). DO NOT 
TOUCH ELEMENTS IN MAP! */
+  private[state] def getClonedLoadedMaps(): util.SortedMap[Long, MapType] 
= synchronized {
--- End diff --

Agreed. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r203577561
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
@@ -64,21 +66,143 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 require(!StateStore.isMaintenanceRunning)
   }
 
+  def updateVersionTo(
+  provider: StateStoreProvider,
+  currentVersion: Int,
+  targetVersion: Int): Int = {
+var newCurrentVersion = currentVersion
+for (i <- newCurrentVersion until targetVersion) {
+  newCurrentVersion = incrementVersion(provider, i)
+}
+require(newCurrentVersion === targetVersion)
+newCurrentVersion
+  }
+
+  def incrementVersion(provider: StateStoreProvider, currentVersion: Int): 
Int = {
+val store = provider.getStore(currentVersion)
+put(store, "a", currentVersion + 1)
+store.commit()
+currentVersion + 1
+  }
+
+  def checkLoadedVersions(
+  loadedMaps: util.SortedMap[Long, ProviderMapType],
+  count: Int,
+  earliestKey: Long,
+  latestKey: Long): Unit = {
+assert(loadedMaps.size() === count)
+assert(loadedMaps.firstKey() === earliestKey)
+assert(loadedMaps.lastKey() === latestKey)
+  }
+
+  def checkVersion(
+  loadedMaps: util.SortedMap[Long, ProviderMapType],
+  version: Long,
+  expectedData: Map[String, Int]): Unit = {
+
+val originValueMap = loadedMaps.get(version).asScala.map { entry =>
+  rowToString(entry._1) -> rowToInt(entry._2)
+}.toMap
+
+assert(originValueMap === expectedData)
+  }
+
+  test("retaining only two latest versions when 
MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 2") {
+val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
+  numOfVersToRetainInMemory = 2)
+
+var currentVersion = 0
+
+// commit the ver 1 : cache will have one element
+currentVersion = incrementVersion(provider, currentVersion)
+assert(getData(provider) === Set("a" -> 1))
+var loadedMaps = provider.getClonedLoadedMaps()
+checkLoadedVersions(loadedMaps, 1, 1L, 1L)
--- End diff --

Yeah I'd add 'L' everywhere if the type of literal number is long so that 
we don't rely on autocasting and be sure about the type explicitly, but no 
strong opinion about this. I can follow existing Spark preferences.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...

2018-07-17 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
@tdas Thanks for the detailed review! Addressed review comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-17 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r202933053
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -270,11 +273,42 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 } else Iterator.empty
   }
 
+  /** This method is intended to be only used for unit test(s). DO NOT 
TOUCH ELEMENTS IN MAP! */
+  private[state] def getClonedLoadedMaps(): util.SortedMap[Long, MapType] 
= synchronized {
+// shallow copy as a minimal guard
+loadedMaps.clone().asInstanceOf[util.SortedMap[Long, MapType]]
+  }
+
+  private def putStateIntoStateCache(newVersion: Long, map: MapType): Unit 
= synchronized {
+if (numberOfVersionsToRetainInMemory <= 0) {
+  if (loadedMaps.size() > 0) loadedMaps.clear()
+  return
+}
+
+while (loadedMaps.size() > numberOfVersionsToRetainInMemory) {
+  loadedMaps.remove(loadedMaps.lastKey())
+}
+
+val size = loadedMaps.size()
+if (size == numberOfVersionsToRetainInMemory) {
+  val versionIdForLastKey = loadedMaps.lastKey()
+  if (versionIdForLastKey > newVersion) {
+// this is the only case which put doesn't need
--- End diff --

Will update the comment to clarify a bit more. We just avoid the case when 
the element is being added to the last and required to be evicted right away.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-17 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r202932784
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
@@ -64,21 +64,122 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 require(!StateStore.isMaintenanceRunning)
   }
 
+  def updateVersionTo(provider: StateStoreProvider, currentVersion: => Int,
+  targetVersion: Int): Int = {
--- End diff --

Thanks for correcting style guide. Will fix.
Regarding `currentVersion: => Int` is somehow I was trying to reuse 
currentVersion and didn't roll back. Will fix.
And I agree it would be better to have `incrementVersion` to shorter the 
code. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21357
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-12 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
@arunmahadevan @jose-torres 


https://issues.apache.org/jira/browse/SPARK-24763?focusedCommentId=16541367=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16541367

I had a chance to test this patch with more kinds of use cases, and in 
overall enabling option shows on far or slightly better performance whereas it 
reduces state size according to the ratio of size of key-value pair. I'm now 
feeling that it would make sense to adopt new strategy to the default and use 
old behavior as fallback of supporting old app, but the numbers is for 
persuading committers and I still agree decision would be necessary from 
committer(s).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...

2018-07-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
@jose-torres Addressed review comment. Please take a look again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r201866930
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
@@ -99,43 +102,84 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))
 assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1))
 
-updateVersionTo(3)
+// this trigger exceeding cache and 1 will be evicted
+currentVersion = updateVersionTo(provider, currentVersion, 3)
 assert(getData(provider) === Set("a" -> 3))
 loadedMaps = provider.getClonedLoadedMaps()
-assert(loadedMaps.size() === 3)
+assert(loadedMaps.size() === 2)
 assert(loadedMaps.firstKey() === 3L)
-assert(loadedMaps.lastKey() === 1L)
+assert(loadedMaps.lastKey() === 2L)
 assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3))
 assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))
+  }
+
+  test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set 
to 1") {
+val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
+  numOfVersToRetainInMemory = 1)
+
+var currentVersion = 0
+
+def restoreOriginValues(map: provider.MapType): Map[String, Int] = {
--- End diff --

I've just allowed redundant function definition cause there's no way to use 
`provider.MapType` in parameter type unless provider is defined. If we really 
want to get rid of redundant function definition, we may have to change it to 
ConcurrentMap directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r201866279
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
@@ -64,6 +64,63 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 require(!StateStore.isMaintenanceRunning)
   }
 
+  test("retaining only latest configured size of versions in memory") {
--- End diff --

It is fairly easy to check whether reading from cache or reading from file 
with 
https://github.com/apache/spark/pull/21469/commits/c9aada520889b87ace0886805910f0d56d099bd2
 in #21469 since it introduces metrics for cache hit and cache miss, but not 
easy to check in this PR itself.

So I just rely on checking cache to ensure the data is correctly evicted 
and not available in cache as expected. Hope this is OK.

Btw, I caught a silly bug while adding tests to cover your suggestion. 
Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
I guess we would have to treat reducing state memory size to have worth to 
do: as described in above commit, we already optimized in 
HDFSBackedStateStoreProvider for reducing state store disk size (as well as 
network transfer) via not storing 4 bytes per each row (from both key and 
value). This approach would normally save more than previous optimization on 
value row, given key would have window information which contains two values: 
start and end.

The main issue on this approach for me is possible perf. impact on 
workloads. Hopefully the workload I've covered shows even slight perf. 
improvement but not sure for other workloads yet. I might say we need to 
consider changing default behavior when I have overall good backing numbers, 
but in any way, I'm sure I agree that deciding from committer(s) is necessary. 
Would we be better to initiate mail thread in dev. mailing list?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-07-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r201848371
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -825,6 +825,16 @@ object SQLConf {
 .intConf
 .createWithDefault(100)
 
+  val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION =
--- End diff --

And the default value of this is `false` so end users will be aware of 
existence of this option, and have a chance to read the explanation before 
setting this option to `true`.

We might elaborate a bit more on the config: tradeoff between reduced 
memory usage vs possible perf. hit and suggest running this in non-production 
before applying this to production. If we feel safer on elaborating more on 
this, I'm happy to update it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
@arunmahadevan 
I'm actually in favor of changing default behavior, just not 100% sure the 
result would be promising for exhaustive use cases. I might need to prepare 
more kinds of key/value pair (key size bigger than value size, key size smaller 
than value size, key size equals to value size, what else I'm missing here?) 
and run some tests and back it up with new numbers.

Btw, as you commented, there seems two approaches to identify the old and 
new format:

> looking at the fields in the row

Actually I tried to do it before (via checking count of fields in value 
row, since this patch reduces the count of fields in value row), and soon 
realized I can't do it because HDFSBackedStateStoreProvider relies on provided 
keySchema and valueSchema when serializing / deserializing rows, not leveraging 
UnsafeRow's serialization/deserialization mechanism (writeExternal/readExternal 
or write/read via Kyro), so it will just show undefined behavior if the schema 
doesn't match with actual rows, and we can't verify this.

Current approach saves cost to write/read two additional integers with 
sacrificing the way to verify the rows. If we would want to add the feature, 
state migration should be happened.

> introducing a row version to differentiate old vs new

We could do this via applying same approach in #21739 so this is valid, but 
query with old state format should do state migration (not easy to do since it 
should be done against multiple versions of states), or continue relying on old 
state format.

@jose-torres Could you please take a look at @arunmahadevan 's comment as 
well as this comment and comment yours? Thanks in advance!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-07-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r201829938
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -825,6 +825,16 @@ object SQLConf {
 .intConf
 .createWithDefault(100)
 
+  val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION =
--- End diff --

I'm sorry, but I'm not sure if I understand your suggestion correctly. I 
guess defining configuration to spark conf would be easier to guard against 
modification after starting query, via existing approach - adding conf to 
OffsetSeqMetadata - whereas I'm not sure we could guard against modification of 
query option. I might be missing something here.
Could you elaborate a bit more? Thanks in advance!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-07-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r201562690
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -239,8 +241,9 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
   @volatile private var valueSchema: StructType = _
   @volatile private var storeConf: StateStoreConf = _
   @volatile private var hadoopConf: Configuration = _
+  @volatile private var numberOfVersionsToRetainInMemory: Int = _
 
-  private lazy val loadedMaps = new mutable.HashMap[Long, MapType]
+  private lazy val loadedMaps = new util.TreeMap[Long, 
MapType](Ordering[Long].reverse)
--- End diff --

Just FYI: Referring java.util.TreeMap is unavoidable cause Scala doesn't 
support mutable SortedMap until Scala 2.12, hence needs additional changes for 
interop.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r201515401
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming.state;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * This class implements bounded {@link java.util.SortedMap} based on 
{@link java.util.TreeMap}.
+ *
+ * As TreeMap does, this implementation sorts elements in natural order, 
and cuts off
+ * smaller elements to retain at most bigger N elements.
+ *
+ * You can provide reversed order of comparator to retain smaller elements 
instead.
+ *
+ * This class is not thread-safe, so synchronization would be needed to 
use this concurrently.
+ *
+ * @param  key type
+ * @param  value type
+ */
+public final class BoundedSortedMap extends TreeMap {
--- End diff --

Moved the logic to HDFSBackedStateStoreProvider and removed 
BoundedSortedMap as well as test suite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-07-10 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
Now I'm thinking about removing "metricProviderLoaderCountOfVersionsInMap" 
and also removing StateStoreCustomAverageMetric, since the value doesn't look 
correct with stream-stream join which handles multiple states in an operation.

We may try out SQLMetric to extend AccumulatorV2[(kind of Metric class), 
(kind of Metric class)] which Metric class can handle merging multiple values 
correctly according to the kind of metric, but this is beyond the scope of PR, 
so just removing "metricProviderLoaderCountOfVersionsInMap" would be clearer 
for this patch for now.

What do you think, @arunmahadevan @jose-torres ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-07-10 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21622
  
Thanks for reviewing @arunmahadevan and @jose-torres ! Could we finalize 
review on #21469 to see a chance to include "providerLoadedMapSizeBytes" to 
here? Or is it OK to handle it with follow-up issue?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-07-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r201483544
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -825,6 +825,16 @@ object SQLConf {
 .intConf
 .createWithDefault(100)
 
+  val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION =
--- End diff --

This is not compatible with current stateful aggregation (definitely, 
that's the improvement of this patch) and there is no undo. So once end users 
enable the option in the query, the option must be enabled unless end users 
clear out checkpoint. (I've added the new option to OffsetSeqMetadata to 
remember the first setting like partition count).

I'm seeing performance on far or even slightly better on specific workload 
(publicized in description link), but I would say I cannot try out exhaustive 
workloads. I actually expected a tradeoff between performance vs state memory 
usage, so assuming if other workloads follow the tradeoff, end users may need 
to try out this option in their query with non-production environment (for 
example, staged) to ensure enabling option doesn't break their expectation of 
performance.

That's why I also make changes available as an option instead of modifying 
default behavior. If we apply this to the default behavior, we need to provide 
state migration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   >