[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r201477277
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming.state;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * This class implements bounded {@link java.util.SortedMap} based on 
{@link java.util.TreeMap}.
+ *
+ * As TreeMap does, this implementation sorts elements in natural order, 
and cuts off
+ * smaller elements to retain at most bigger N elements.
+ *
+ * You can provide reversed order of comparator to retain smaller elements 
instead.
+ *
+ * This class is not thread-safe, so synchronization would be needed to 
use this concurrently.
+ *
+ * @param  key type
+ * @param  value type
+ */
+public final class BoundedSortedMap extends TreeMap {
--- End diff --

I just handled it in HDFSBackedStateStoreProvider and refactored out 
afterwards cause this makes HDFSBackedStateStoreProvider code clearer (I feel 
HDFSBackedStateStoreProvider is less structurized, and I've a patch #21357 to 
refactor a bit), but I agree with you that this might be used only once for 
HDFSBackedStateStoreProvider. I'll handle it in HDFSBackedStateStoreProvider. 
Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-07-09 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r200934841
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -53,7 +54,30 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
 
   import testImplicits._
 
-  test("simple count, update mode") {
+  val confAndTestNamePostfixMatrix = List(
--- End diff --

OK. I'd like to wait for other reviewers regarding opinions/suggestions on 
this. Let me keep this as it is until then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-07-09 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r200923851
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -53,7 +54,30 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
 
   import testImplicits._
 
-  test("simple count, update mode") {
+  val confAndTestNamePostfixMatrix = List(
--- End diff --

`withSQLConf` looks like used widely between SQL unit tests, and does 
additional work (SparkSession.setActiveSession), so I'm not sure it will work 
technically same. Moreover, we need to run same test "multiple times", with 
changing configuration.

Could you propose your code if you don't really mind? Thanks in advance!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
cc. @tdas @zsxwing @jose-torres @jerryshao @arunmahadevan @HyukjinKwon


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-07-09 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21733

[SPARK-24763][SS] Remove redundant key data from value in streaming 
aggregation

* add option to configure enabling new feature: remove redundant key data 
from value
* modify code to respect new option (turning on/off feature)
* modify tests to run tests with both on/off
* Add guard in OffsetSeqMetadata to prevent modifying option after 
executing query

## What changes were proposed in this pull request?

This patch proposes a new flag option for stateful aggregation: remove 
redundant key data from value.
Enabling new option runs similar with current, and uses less memory for 
state according to key/value fields of state operator.

Please refer below link to see detailed perf. test result: 

https://issues.apache.org/jira/browse/SPARK-24763?focusedCommentId=16536539=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16536539

Since the state between enabling the option and disabling the option is not 
compatible, the option is set to 'disable' by default (to ensure backward 
compatibility), and OffsetSeqMetadata would prevent modifying the option after 
executing query.

## How was this patch tested?

Modify unit tests to cover both disabling option and enabling option.
Also did manual tests to see whether propose patch improves state memory 
usage.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24763

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21733.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21733


commit 2a9cc496bb7f832b75b0090ef9a612f4fbc0f206
Author: Jungtaek Lim 
Date:   2018-07-08T09:37:12Z

[SPARK-24763][SS] Remove redundant key data from value in streaming 
aggregation

* add option to configure enabling new feature: remove redundant key data 
from value
* modify code to respect new option (turning on/off feature)
* modify tests to run tests with both on/off
* Add guard in OffsetSeqMetadata to prevent modifying option after 
executing query




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21622#discussion_r200554917
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 ---
@@ -39,6 +42,23 @@ class MetricsReporter(
   registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
   registerGauge("latency", 
_.durationMs.get("triggerExecution").longValue(), 0L)
 
+  private val timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+
+  registerGauge("eventTime-watermark",
+progress => 
convertStringDateToMillis(progress.eventTime.get("watermark")), 0L)
+
+  registerGauge("states-rowsTotal", 
_.stateOperators.map(_.numRowsTotal).sum, 0L)
+  registerGauge("states-usedBytes", 
_.stateOperators.map(_.memoryUsedBytes).sum, 0L)
+
--- End diff --

We can add more metrics like "providerLoadedMapSizeBytes" after adopting 
SPARK-24441, so that actual memory usage of state store provider could be 
tracked via time-series manner.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
@tedyu Thanks for the suggestion. Published the result to the mail thread.


https://lists.apache.org/thread.html/323ab22fea87c14a2f92e58e7a810aa37cbdf00b9ab81448ee967976@%3Cdev.spark.apache.org%3E

I've only written a short summary of the result (since mail may not be a 
good format to describe detailed numbers rather than markdown) and spend more 
time to explain the rationalization of my recent issues so that all of them are 
being covered together. I'll wait more a couple of days, and try to put 
detailed numbers if things are not started reviewing until then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21721
  
Though I haven't take a look yet, I would like to see this feature 
(mentioned from 
https://github.com/apache/spark/pull/21622#issuecomment-399677099) and happy to 
see this being implemented!

While I love the feature, I agree with @jose-torres that it is going to be 
a new public API (part of Datasource V2) so worth to discuss regarding the API 
itself before having specific implementation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
I would like to add numbers to pursuade how much this patch is helpful for 
end users of Apache Spark.

I crafted and published a project which implements some stateful use cases 
with IoT Trucking example.

https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming

With running apps with I can see that cache (loadedMaps) in 
HDFSBackedStateStoreProvider consumes much more memory than one version of 
state. It's not like 10~30% but more than 1500% and even more than 8000% in 
specific case based on the update ratio of state.
(Capturing overall map size of provider requires applying the patch #21469 )

Below table is the result of the query, publishing query status to Kafka 
topic and query these data via Spark SQL.
https://gist.github.com/HeartSaVioR/9d53b39052d4779a4c77e71ff7e989a3

> Before applying the patch (`spark.sql.streaming.minBatchesToRetain` set 
to default value 100)

* stream-stream join (IotTruckingAppJoinedAbnormalEvents.scala)

batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | 
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
319 | 765456 | 2632 | 185499903 | 3307747279 | 17.8315310439811928

* window aggregation (IotTruckingAppMovingAggregationsOnSpeed.scala)

batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | 
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
142 | 184 | 138 | 72103 | 6214927 | 86.1951236425668835

* deduplication (IotTruckingAppDistinctPairDriverAndTruck.scala)

batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | 
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
634 | 598 | 0 | 136279 | 6587839 | 48.3408228707284321

> After applying this patch 
(`spark.sql.streaming.maxBatchesToRetainInMemory` set to default value 2)

* stream-stream join (IotTruckingAppJoinedAbnormalEvents.scala)

batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | 
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
127 | 298452 | 4170 | 71023679 | 84454399 | 1.1891020035726395

* window aggregation (IotTruckingAppMovingAggregationsOnSpeed.scala)

batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | 
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
132 | 184 | 138 | 72319 | 162647 | 2.2490216955433565

* deduplication (IotTruckingAppDistinctPairDriverAndTruck.scala)

batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | 
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
133 | 598 | 0 | 136079 | 227863 | 1.6744905532815497


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21718
  
I'm aware of this issue and have it in my backlog, but for now it doesn't 
look like easy to address in efficient way. I'll propose an approach for 
rescaling state when I get one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21718
  
It has been fairly easy to rescale partitions before stateful operators 
came into play. For structured streaming, it is now not a trivial thing, cause 
rescaling partitions should also handle rescaling of state which is stored to 
disk. Rescaling state may require reading whole states and redistribute via 
hash function, and resave to disk again. That's why SS stores previous conf. 
and force using it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
@tedyu Thanks for the detailed review comments. Addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r200249847
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -240,7 +244,11 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
   @volatile private var storeConf: StateStoreConf = _
   @volatile private var hadoopConf: Configuration = _
 
-  private lazy val loadedMaps = new mutable.HashMap[Long, MapType]
+  // taking default value first: this will be updated by init method with 
configuration
+  @volatile private var numberOfVersionsRetainInMemory: Int = 2
--- End diff --

Will fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r200249732
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming.state;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This class implements bounded {@link java.util.SortedMap} based on 
{@link java.util.TreeMap}.
+ *
+ * As TreeMap does, this implementation sorts elements in natural order, 
and cuts off
+ * smaller elements to retain at most bigger N elements.
+ *
+ * You can provide reversed order of comparator to retain smaller elements 
instead.
+ *
+ * This class is not thread-safe, so synchronization would be needed to 
use this concurrently.
+ *
+ * @param  key type
+ * @param  value type
+ */
+public class BoundedSortedMap extends TreeMap {
+
+  private final int limit;
+
+  /**
+   * Constructor
+   *
+   * @param comparator comparator instance to compare between keys
+   * @param limit  bounded size
+   */
+  public BoundedSortedMap(Comparator comparator, int limit) {
+super(comparator);
+this.limit = limit;
+  }
+
+  @Override
+  public void putAll(Map map) {
+for (Map.Entry entry : map.entrySet()) {
--- End diff --

Thanks for the great suggestion. While we can't assume that map's type is 
SortedMap, looks like we could check the type of map in runtime and apply your 
suggestion. Will apply it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r200249261
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming.state;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This class implements bounded {@link java.util.SortedMap} based on 
{@link java.util.TreeMap}.
+ *
+ * As TreeMap does, this implementation sorts elements in natural order, 
and cuts off
+ * smaller elements to retain at most bigger N elements.
+ *
+ * You can provide reversed order of comparator to retain smaller elements 
instead.
+ *
+ * This class is not thread-safe, so synchronization would be needed to 
use this concurrently.
+ *
+ * @param  key type
+ * @param  value type
+ */
+public class BoundedSortedMap extends TreeMap {
+
+  private final int limit;
+
+  /**
+   * Constructor
+   *
+   * @param comparator comparator instance to compare between keys
+   * @param limit  bounded size
+   */
+  public BoundedSortedMap(Comparator comparator, int limit) {
+super(comparator);
+this.limit = limit;
+  }
+
+  @Override
+  public void putAll(Map map) {
--- End diff --

Unfortunately this is inherited from Map interface so we can't modify its 
signature. 
And assuming that `put` is implemented correctly, this can guarantee the 
size of BoundedSortedMap, it defers `put` method to restrict map's size.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21673: SPARK-24697: Fix the reported start offsets in streaming...

2018-07-03 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21673
  
@arunmahadevan We'd be better to respect style guide on pull request: 
please change title to include let JIRA issue number being guided with `[]` and 
also add `[SS]`.

http://spark.apache.org/contributing.html

> The PR title should be of the form [SPARK-][COMPONENT] Title, where 
SPARK- is the relevant JIRA number, COMPONENT is one of the PR categories 
shown at spark-prs.appspot.com and Title may be the JIRA’s title or a more 
specific title describing the PR itself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-07-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: SPARK-24717 Split out min retain version of state for me...

2018-07-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
Missing new line in EOF for two new Java files. Just addressed.
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: SPARK-24717 Split out min retain version of state for me...

2018-07-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
cc. @tdas @zsxwing @jose-torres @jerryshao @arunmahadevan @HyukjinKwon


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: SPARK-24717 Split out min retain version of state for me...

2018-07-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: SPARK-24717 Split out min retain version of state for me...

2018-07-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21700
  
Pasting JIRA issue description to explain why this patch is needed:

As default version of "spark.sql.streaming.minBatchesToRetain" is set to 
high (100), which doesn't require strictly 100x of memory, but I'm seeing 10x ~ 
80x of memory consumption for various workloads. In addition, in some cases, 
requiring 2x of memory is even unacceptable, so we should split out 
configuration for memory and let users adjust to trade-off between memory usage 
vs cache miss (building state from files).

In normal case, default value '2' would cover both cases: success and 
restoring failure with less than or around 2x of memory usage, and '1' would 
only cover success case but no longer require more than 1x of memory. In 
extreme case, user can set the value to '0' to completely disable the map cache 
to maximize executor memory (covers #21500).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: SPARK-24717 Split out min retain version of state...

2018-07-02 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21700

SPARK-24717 Split out min retain version of state for memory in 
HDFSBackedStateStoreProvider

## What changes were proposed in this pull request?

This patch proposes breaking down configuration of retaining batch size on 
state into two pieces: files and in memory (cache). While this patch reuses 
existing configuration for files, it introduces new configuration, 
"spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of 
batch to retain in memory.

This patch also introduces BoundedSortedMap to retain at most first N 
elements (sorted by key) which can be leveraged in loadedMaps in 
HDFSBackedStateStoreProvider.

## How was this patch tested?

Apply this patch on top of SPARK-24441 
(https://github.com/apache/spark/pull/21469), and manually tested to ensure 
overall size of state is around 2x or less instead of 10x ~ 80x according to 
various workloads.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24717

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21700.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21700


commit 22f0e220f661b5457584ef83b1ecddc18212fa73
Author: Jungtaek Lim 
Date:   2018-07-02T22:04:49Z

SPARK-24717 Split out min retain version of state for memory in 
HDFSBackedStateStoreProvider

* introduce BoundedSortedMap which implements bounded size of sorted map
  * only first N elements will be retained
* replace loadedMaps to BoundedSortedMap to retain only N versions of states
  * no need to cleanup in maintenance phase
* introduce new configuration: 
spark.sql.streaming.minBatchesToRetainInMemory




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-07-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
Rebased to fix conflict, and added new commit (last one: c9aada5) to 
represent cache hit / miss count in HDFS state provider. This is actually 
helpful for SPARK-24717 to determine proper value of configuration, but with 
this commit SPARK-24717 should be on top of this PR, so just added it here to 
avoid rebase hell.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...

2018-06-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21622#discussion_r198300792
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 ---
@@ -39,6 +42,23 @@ class MetricsReporter(
   registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
   registerGauge("latency", 
_.durationMs.get("triggerExecution").longValue(), 0L)
 
+  private val timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+
+  registerGauge("eventTime-watermark",
+s => convertStringDateToMillis(s.eventTime.get("watermark")), 0L)
--- End diff --

1. will address
2. We don't know whether the map will be empty when calling 
`registerGauge`, and once we register the metric, `getValue` in Gauge is called 
from Dropwizard so I'm not sure we can control whether reporting the value or 
not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...

2018-06-25 Thread HeartSaVioR
Github user HeartSaVioR closed the pull request at:

https://github.com/apache/spark/pull/21617


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21617: [SPARK-24634][SS] Add a new metric regarding number of r...

2018-06-25 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21617
  
Abandoning the patch. While I think the JIRA issue is still valid, looks 
like we should address watermark issue to have correct number of late events. 
Thanks for reviewing @jose-torres @arunmahadevan .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...

2018-06-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21617#discussion_r197986093
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,13 @@ class StateOperatorProgress private[sql](
   def prettyJson: String = pretty(render(jsonValue))
 
   private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
-new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes)
+new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, numLateInputRows)
 
   private[sql] def jsonValue: JValue = {
 ("numRowsTotal" -> JInt(numRowsTotal)) ~
 ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
-("memoryUsedBytes" -> JInt(memoryUsedBytes))
+("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+("numLateInputRows" -> JInt(numLateInputRows))
--- End diff --

@arunmahadevan Ah yes got it. If we would want to have accurate number we 
need to filter out late events from the first time anyway. I guess we may need 
to defer addressing this until we change the behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...

2018-06-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21617#discussion_r197981651
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,13 @@ class StateOperatorProgress private[sql](
   def prettyJson: String = pretty(render(jsonValue))
 
   private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
-new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes)
+new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, numLateInputRows)
 
   private[sql] def jsonValue: JValue = {
 ("numRowsTotal" -> JInt(numRowsTotal)) ~
 ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
-("memoryUsedBytes" -> JInt(memoryUsedBytes))
+("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+("numLateInputRows" -> JInt(numLateInputRows))
--- End diff --

@arunmahadevan 

> Here you are measuring the number of "keys" filtered out of the state 
store since they have crossed the late threshold correct ?

No, it is based on "input" rows which are filtered out due to watermark 
threshold. Note that the meaning of "input" is relative, cause it doesn't 
represent for input rows in overall query, but represents for input rows in 
state operator.

> Its better if we could rather expose the actual number of events that 
were late.

I guess the comment is based on missing thing, but I would think that it 
would be correct that we filtered out late events from the first phase of query 
(not from state operator) so that we can get correct count of late events. For 
now filters affect the count.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21617: [SPARK-24634][SS] Add a new metric regarding number of r...

2018-06-25 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21617
  
@jose-torres 
Yes, you're right. They would be the rows which applies other 
transformation and filtering, not origin input rows. I just haven't find proper 
alternative word than "input row" since in point of state operator's view, 
they're input rows.

Btw, as I described in the JIRA, my final goal is pushing late events to 
side-output (as Beam and Flink represented) but being stuck with couple of 
concerns (Please correct me anytime if I'm missing here):

1. Which events to push?

Query can have couple of transformations before reaching stateful operator 
and being filtered out due to watermark. This is not ideal and I guess that's 
you said as "aren't necessarily the input rows".  

Ideally we would be better to provide origin input rows, rather than 
transformed one, but then we should put major restriction on watermark: `Filter 
with watermark` should be applied in data reader (or having a filter just after 
data reader), which means input rows itself should have timestamp field. 

We can't apply transformation(s) to populate/manipulate timestamp field, 
and timestamp field **must not** be modified during transformations. For 
example, Flink provides timestamp assigner to extract timestamp value from 
input stream, and reserved field name `rowtime` is used for timestamp field.

2. Does the nature of RDD support multiple outputs?

I have been struggling on this, but as far as my understanding is correct, 
RDD itself doesn't support multiple outputs, as the nature of RDD. For me, this 
looks like major difference between pull model vs push model, cause in push 
model which other streaming frameworks use, defining another output stream is 
really straightforward, just like adding remote listener, whereas I'm not sure 
how it can be clearly defined in pull model. I also googled about multiple 
outputs on RDD (as someone could have struggled before) but no luck.

The alternative approaches I can imagine are kinds of workarounds: RPC, 
listener bus, callback function. Nothing can define another stream within 
current DAG, and I'm also not sure that we can create DataFrame based on the 
data and let end users compose another query. 

It would be really helpful if you can think about better alternatives and 
share.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-06-23 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21622
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-06-23 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21622
  
I think we may want to add metrics regarding sources and sinks as well, but 
the format of offset information or other metadata information can be different 
between sources and sinks.
Not sure about more preferred approach: 1. define general format of 
information for source/sink 2. let individual source/sink manage metric as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-06-23 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21622
  
cc. @tdas @zsxwing @jose-torres @jerryshao @arunmahadevan @HyukjinKwon


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...

2018-06-23 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21622

[SPARK-24637][SS] Add metrics regarding state and watermark to dropwizard 
metrics

## What changes were proposed in this pull request?

The patch adds metrics regarding state and watermark to dropwizard metrics, 
so that watermark and state rows/size can be tracked via time-series manner.

## How was this patch tested?

Manually tested with CSV metric sink.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24637

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21622.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21622


commit 147c98a94140bae505116f5af4d616dcf8d85eab
Author: Jungtaek Lim 
Date:   2018-06-23T08:04:55Z

SPARK-24637 Add metrics regarding state and watermark to dropwizard metrics




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21617: [SPARK-24634][SS] Add a new metric regarding number of r...

2018-06-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21617
  
cc. @tdas @zsxwing @jose-torres @jerryshao @arunmahadevan @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...

2018-06-22 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21617

[SPARK-24634][SS] Add a new metric regarding number of rows later than 
watermark

## What changes were proposed in this pull request?

This adds a new metric to count the number of rows arrived later than 
watermark. 

The metric will be exposed to two places: 
1. streaming query listener -`numLateInputRows` in `stateOperators`
2. SQL tab in UI - `number of rows which are later than watermark` in state 
operator exec

Please refer https://issues.apache.org/jira/browse/SPARK-24634 to see 
rationalization of the issue.

## How was this patch tested?

Modified existing UTs.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24634

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21617.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21617


commit ff1b89553acc7ea3a19b586457dd295255047377
Author: Jungtaek Lim 
Date:   2018-06-23T02:34:16Z

SPARK-24634 Add a new metric regarding number of rows later than watermark

* This adds a new metric to count the number of rows arrived later than 
watermark




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197004935
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -98,6 +98,10 @@ class ContinuousDataSourceRDD(
   override def getPreferredLocations(split: Partition): Seq[String] = {
 
split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations()
   }
+
+  override def clearDependencies(): Unit = {
+throw new IllegalStateException("Continuous RDDs cannot be 
checkpointed")
--- End diff --

I'm wondering the method can be called in normal situation: when continuous 
query is terminated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197000483
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
--- End diff --

What if we have multiple repartitions which one meets the case and others 
are not? I'm not sure we are restricting repartition operations to be only once.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196999896
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD(
 numPartitions: Int,
 queueSize: Int = 1024,
 numShuffleWriters: Int = 1,
-epochIntervalMs: Long = 1000)
+epochIntervalMs: Long = 1000,
+val endpointNames: Seq[String] = 
Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}"))
   extends RDD[UnsafeRow](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
 (0 until numPartitions).map { partIndex =>
-  ContinuousShuffleReadPartition(partIndex, queueSize, 
numShuffleWriters, epochIntervalMs)
+  ContinuousShuffleReadPartition(
+partIndex, endpointNames(partIndex), queueSize, numShuffleWriters, 
epochIntervalMs)
--- End diff --

This effectively asserting numPartitions to be 1, otherwise it will throw 
exception.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196999687
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD(
 numPartitions: Int,
 queueSize: Int = 1024,
 numShuffleWriters: Int = 1,
-epochIntervalMs: Long = 1000)
+epochIntervalMs: Long = 1000,
+val endpointNames: Seq[String] = 
Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}"))
--- End diff --

Same here: if possible it might be better to have complete code rather than 
just working with such assumption.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196999745
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
--- End diff --

We are addressing only the specific case that number of partitions is 1, 
but we could have some assertion for that and try to write complete code so 
that we don't modify it again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
adding cc. to @zsxwing since he has been reviewing PRs for SS so far.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21357
  
adding cc. to @zsxwing since he has been reviewing PRs for SS so far.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
adding cc. to @zsxwing since he has been reviewing PRs for SS so far.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21595: [MINOR][SQL] Remove invalid comment from SparkStrategies

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21595
  
@HyukjinKwon @hvanhovell Thanks for reviewing and merging!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...

2018-06-19 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21388
  
I just provided new patch to remove the comment, as it looks like no longer 
preferred option.
https://github.com/apache/spark/pull/21595

Closing this one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21388: [SPARK-24336][SQL] Support 'pass through' transfo...

2018-06-19 Thread HeartSaVioR
Github user HeartSaVioR closed the pull request at:

https://github.com/apache/spark/pull/21388


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21595: [MINOR][SQL] Remove invalid comment from SparkStr...

2018-06-19 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21595

[MINOR][SQL] Remove invalid comment from SparkStrategies

## What changes were proposed in this pull request?

This patch is removing invalid comment from SparkStrategies, given that 
TODO-like comment is no longer preferred one as the comment: 
https://github.com/apache/spark/pull/21388#issuecomment-396856235

Removing invalid comment will prevent contributors to spend their times 
which is not going to be merged.

## How was this patch tested?

N/A

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark 
MINOR-remove-invalid-comment-on-spark-strategies

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21595.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21595


commit 8afb36b20aab1bbd1f6a5cf902aef7e0c04c8353
Author: Jungtaek Lim 
Date:   2018-06-20T01:48:17Z

[MINOR][SQL] Remove invalid comment from SparkStrategies

* The option is no longer preferred one as below comment
  * https://github.com/apache/spark/pull/21388#issuecomment-396856235
* Removing this to prevent contributors to waste their times




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...

2018-06-19 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21357
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [SPARK-24396] [SS] [PYSPARK] Add Structured Strea...

2018-06-14 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r195632189
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +843,170 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+@since(2.4)
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserialized copy of the provided object. 
Hence, it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) is done after the 
`open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> def print_row(row):
+... print(row)
+...
+>>> writer = sdf.writeStream.foreach(print_row)
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partit

[GitHub] spark pull request #21477: [SPARK-24396] [SS] [PYSPARK] Add Structured Strea...

2018-06-14 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r195625921
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +843,170 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+@since(2.4)
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserialized copy of the provided object. 
Hence, it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) is done after the 
`open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> def print_row(row):
+... print(row)
+...
+>>> writer = sdf.writeStream.foreach(print_row)
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partit

[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...

2018-06-13 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21388
  
@hvanhovell 
To be honest, I found the rationalization of the issue from a comment in 
Spark code: 


https://github.com/apache/spark/blob/4c388bccf1bcac8f833fd9214096dd164c3ea065/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L496-L497

and I thought the comment makes sense: it would be beneficial if we just 
couple matching pair of (LogicalPlan, SparkPlan) for the cases which don't 
require some transformations while transforming. 

For the first time, I tried my best to stick with compile-time things, but 
realized it is not possible to achieve without runtime reflection (at least for 
me) after couple of hours. So another couple of hours were spent on resolving.

I have no strong opinion to adopt reflection on planner (so happy to see 
the approach got rejected), but if we agree it cannot be handled without 
reflection, the origin comment should be removed, or describing limitations on 
addressing it so that others might try out with avoiding limitations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21357
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r194613720
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { 
self: SparkPlan =>
 val storeMetrics = store.metrics
 longMetric("numTotalStateRows") += storeMetrics.numKeys
 longMetric("stateMemory") += storeMetrics.memoryUsedBytes
-storeMetrics.customMetrics.foreach { case (metric, value) =>
-  longMetric(metric.name) += value
+storeMetrics.customMetrics.foreach {
+  case (metric: StateStoreCustomAverageMetric, value) =>
+longMetric(metric.name).set(value * 1.0d)
--- End diff --

We would be better to think about the actual benefit of exposing the value, 
rather than how to expose the value to somewhere. If we define it as count and 
do aggregation as summation, the aggregated value will be `(partition count * 
versions)` which might be hard for end users to find the meaning from the value.

I'm afraid that exposing this to StreamingQuery as average is not trivial, 
especially SQLMetric is defined as `AccumulatorV2[Long, Long]` so only single 
Long value can be passed. Under the restriction, we couldn't define `merge` 
operation for `average metric`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21357
  
Kindly ping again to @tdas 

And cc. to @jose-torres @jerryshao @HyukjinKwon @arunmahadevan for 
reviewing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
Kindly ping again to @tdas


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
Kindly ping again to @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
Kindly ping again to @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21506
  
Kindly ping again to @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r194585044
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { 
self: SparkPlan =>
 val storeMetrics = store.metrics
 longMetric("numTotalStateRows") += storeMetrics.numKeys
 longMetric("stateMemory") += storeMetrics.memoryUsedBytes
-storeMetrics.customMetrics.foreach { case (metric, value) =>
-  longMetric(metric.name) += value
+storeMetrics.customMetrics.foreach {
+  case (metric: StateStoreCustomAverageMetric, value) =>
+longMetric(metric.name).set(value * 1.0d)
--- End diff --

If my understanding is right, the metric object (return of `longMetric()`) 
is different between each task, so the object will be different for each batch 
and each task. (TaskMetric is serialized and deserialized so it can't be shared 
between tasks.)

And actually the metric values are not aggregated into an SQLMetric object. 
The values are just aggregated and represented in SQLAppStatusListener.


https://github.com/apache/spark/blob/f5af86ea753c446df59a0a8c16c685224690d633/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala#L162-L174


https://github.com/apache/spark/blob/f5af86ea753c446df59a0a8c16c685224690d633/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala#L147-L160

https://user-images.githubusercontent.com/1317309/41263432-024efe4a-6e22-11e8-92f9-24d1f73776a9.png;>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@aalobaidi 
I just would like to see the benefit of unloading the version of state 
which is expected to be read from the next batch. Totally I agree current 
mechanism of cache is excessive, but we can still avoid reloading in every 
batch. Are you considering multiple stages which executor is encouraged to 
clean up memory as much as it can, despite of redundant reloading state?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r194563959
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -247,6 +253,14 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
   private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf)
   private lazy val sparkConf = 
Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
 
+  private lazy val metricProviderLoaderMapSizeBytes: 
StateStoreCustomSizeMetric =
+StateStoreCustomSizeMetric("providerLoadedMapSizeBytes",
+  "estimated size of states cache in provider")
+
+  private lazy val metricProviderLoaderCountOfVersionsInMap: 
StateStoreCustomAverageMetric =
--- End diff --

This should be average to show `min, med, max` in SQL metrics UI, as I 
pasted capture of UI before. Summing them all doesn't give meaningful value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
After enabling option, I've observed small expected latency whenever 
starting batch per each partition per each batch. Median/average was 4~50 ms 
for my case, but max latency was a bit higher than 700 ms.

Please note that state size in my experiment is not that super huge, so if 
partition has much bigger state the latency could be much higher: 

```
memory used by state total (min, med, max): 812.6 KB (2.1 KB, 4.1 KB, 4.1 
KB)
time to commit changes total (min, med, max): 13.5 s (21 ms, 35 ms, 449 ms)
total time to remove rows total (min, med, max): 22 ms (22 ms, 22 ms, 22 ms)
number of updated state rows: 5,692
total time to update rows total (min, med, max): 1.4 s (3 ms, 5 ms, 42 ms)
```

As I explained earlier, loading the last version from files brings 
avoidable latency.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-11 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21506
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21506#discussion_r194295068
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 if (loadedCurrentVersionMap.isDefined) {
   return loadedCurrentVersionMap.get
 }
-val snapshotCurrentVersionMap = readSnapshotFile(version)
-if (snapshotCurrentVersionMap.isDefined) {
-  synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
-  return snapshotCurrentVersionMap.get
-}
 
-// Find the most recent map before this version that we can.
-// [SPARK-22305] This must be done iteratively to avoid stack overflow.
-var lastAvailableVersion = version
-var lastAvailableMap: Option[MapType] = None
-while (lastAvailableMap.isEmpty) {
-  lastAvailableVersion -= 1
+logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
+  "Reading snapshot file and delta files if needed..." +
+  "Note that this is normal for the first batch of starting query.")
 
-  if (lastAvailableVersion <= 0) {
-// Use an empty map for versions 0 or less.
-lastAvailableMap = Some(new MapType)
-  } else {
-lastAvailableMap =
-  synchronized { loadedMaps.get(lastAvailableVersion) }
-.orElse(readSnapshotFile(lastAvailableVersion))
+val (result, elapsedMs) = Utils.timeTakenMs {
+  val snapshotCurrentVersionMap = readSnapshotFile(version)
+  if (snapshotCurrentVersionMap.isDefined) {
+synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
+return snapshotCurrentVersionMap.get
+  }
+
+  // Find the most recent map before this version that we can.
+  // [SPARK-22305] This must be done iteratively to avoid stack 
overflow.
+  var lastAvailableVersion = version
+  var lastAvailableMap: Option[MapType] = None
+  while (lastAvailableMap.isEmpty) {
+lastAvailableVersion -= 1
+
+if (lastAvailableVersion <= 0) {
+  // Use an empty map for versions 0 or less.
+  lastAvailableMap = Some(new MapType)
+} else {
+  lastAvailableMap =
+synchronized { loadedMaps.get(lastAvailableVersion) }
+  .orElse(readSnapshotFile(lastAvailableVersion))
+}
+  }
+
+  // Load all the deltas from the version after the last available one 
up to the target version.
+  // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+  val resultMap = new MapType(lastAvailableMap.get)
+  for (deltaVersion <- lastAvailableVersion + 1 to version) {
+updateFromDeltaFile(deltaVersion, resultMap)
   }
-}
 
-// Load all the deltas from the version after the last available one 
up to the target version.
-// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
-val resultMap = new MapType(lastAvailableMap.get)
-for (deltaVersion <- lastAvailableVersion + 1 to version) {
-  updateFromDeltaFile(deltaVersion, resultMap)
+  synchronized { loadedMaps.put(version, resultMap) }
+  resultMap
 }
 
-synchronized { loadedMaps.put(version, resultMap) }
-resultMap
+logWarning(s"Loading state for $version takes $elapsedMs ms.")
--- End diff --

Changed log level to DEBUG.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21506#discussion_r194293481
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 if (loadedCurrentVersionMap.isDefined) {
   return loadedCurrentVersionMap.get
 }
-val snapshotCurrentVersionMap = readSnapshotFile(version)
-if (snapshotCurrentVersionMap.isDefined) {
-  synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
-  return snapshotCurrentVersionMap.get
-}
 
-// Find the most recent map before this version that we can.
-// [SPARK-22305] This must be done iteratively to avoid stack overflow.
-var lastAvailableVersion = version
-var lastAvailableMap: Option[MapType] = None
-while (lastAvailableMap.isEmpty) {
-  lastAvailableVersion -= 1
+logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
+  "Reading snapshot file and delta files if needed..." +
+  "Note that this is normal for the first batch of starting query.")
 
-  if (lastAvailableVersion <= 0) {
-// Use an empty map for versions 0 or less.
-lastAvailableMap = Some(new MapType)
-  } else {
-lastAvailableMap =
-  synchronized { loadedMaps.get(lastAvailableVersion) }
-.orElse(readSnapshotFile(lastAvailableVersion))
+val (result, elapsedMs) = Utils.timeTakenMs {
+  val snapshotCurrentVersionMap = readSnapshotFile(version)
+  if (snapshotCurrentVersionMap.isDefined) {
+synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
+return snapshotCurrentVersionMap.get
+  }
+
+  // Find the most recent map before this version that we can.
+  // [SPARK-22305] This must be done iteratively to avoid stack 
overflow.
+  var lastAvailableVersion = version
+  var lastAvailableMap: Option[MapType] = None
+  while (lastAvailableMap.isEmpty) {
+lastAvailableVersion -= 1
+
+if (lastAvailableVersion <= 0) {
+  // Use an empty map for versions 0 or less.
+  lastAvailableMap = Some(new MapType)
+} else {
+  lastAvailableMap =
+synchronized { loadedMaps.get(lastAvailableVersion) }
+  .orElse(readSnapshotFile(lastAvailableVersion))
+}
+  }
+
+  // Load all the deltas from the version after the last available one 
up to the target version.
+  // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+  val resultMap = new MapType(lastAvailableMap.get)
+  for (deltaVersion <- lastAvailableVersion + 1 to version) {
+updateFromDeltaFile(deltaVersion, resultMap)
   }
-}
 
-// Load all the deltas from the version after the last available one 
up to the target version.
-// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
-val resultMap = new MapType(lastAvailableMap.get)
-for (deltaVersion <- lastAvailableVersion + 1 to version) {
-  updateFromDeltaFile(deltaVersion, resultMap)
+  synchronized { loadedMaps.put(version, resultMap) }
+  resultMap
 }
 
-synchronized { loadedMaps.put(version, resultMap) }
-resultMap
+logWarning(s"Loading state for $version takes $elapsedMs ms.")
--- End diff --

I just thought about making a pair between warning message above and this, 
but once we are guiding end users to turn on DEBUG level to see information 
regarding addition latencies, turning this to DEBUG would be also OK.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21506#discussion_r194293251
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 if (loadedCurrentVersionMap.isDefined) {
   return loadedCurrentVersionMap.get
 }
-val snapshotCurrentVersionMap = readSnapshotFile(version)
-if (snapshotCurrentVersionMap.isDefined) {
-  synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
-  return snapshotCurrentVersionMap.get
-}
 
-// Find the most recent map before this version that we can.
-// [SPARK-22305] This must be done iteratively to avoid stack overflow.
-var lastAvailableVersion = version
-var lastAvailableMap: Option[MapType] = None
-while (lastAvailableMap.isEmpty) {
-  lastAvailableVersion -= 1
+logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
+  "Reading snapshot file and delta files if needed..." +
+  "Note that this is normal for the first batch of starting query.")
 
-  if (lastAvailableVersion <= 0) {
-// Use an empty map for versions 0 or less.
-lastAvailableMap = Some(new MapType)
-  } else {
-lastAvailableMap =
-  synchronized { loadedMaps.get(lastAvailableVersion) }
-.orElse(readSnapshotFile(lastAvailableVersion))
+val (result, elapsedMs) = Utils.timeTakenMs {
--- End diff --

Yup right. Most of the code change is just wrapping codes into timeTakenMs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21506
  
cc. @tdas @jose-torres @jerryshao @arunmahadevan @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@jose-torres No problem. I expect there would be some inactive moment in 
Spark community during spark summit. Addressed comment regarding renaming.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@aalobaidi 
When starting batch, latest version state is being read to start a new 
version of state. If the state should be restored from snapshot as well as 
delta files, it will incur huge latency on restoring.

#21506 logs messages when loading state requires dealing with (remote) 
filesystem. That's why I suggest to merge my patch and run your case again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21504
  
Test failures were from kafka.

retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-08 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21504
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r193945288
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  try {
+sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach 
{ classNames =>
+  Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+sparkSession.sparkContext.conf).foreach(listener => {
+addListener(listener)
+logInfo(s"Registered listener ${listener.getClass.getName}")
--- End diff --

Either debug or info is fine for me, since it would add just couple of log 
lines only once.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@aalobaidi 
One thing you may want to be aware is that in point of executor's view, 
executor must load at least 1 version of state in memory regardless of caching 
versions. I guess you may get better result if you unload entire cache but 
leaving the last version you just committed. Cache miss will occur for one of 
three cases `2. committed but batch failed afterwards` but it will happen 
rarely and still better than cache miss from two of three cases (2 and 3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@aalobaidi 
You can also merge #21506 (maybe with changing log level or modify the 
patch to set message to INFO level) and see latencies on loading state, 
snapshotting, cleaning up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21506
  
There're plenty of other debug messages which might hide the log messages 
added from this patch. Would we want to log them with INFO instead of DEBUG?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-07 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21506

[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in 
HDFSBackedStateStoreProvider

## What changes were proposed in this pull request?

This patch measures and logs elapsed time for each operation which 
communicate with file system (mostly remote HDFS in production) in 
HDFSBackedStateStoreProvider to help investigating any latency issue.

## How was this patch tested?

Manually tested.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24485

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21506.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21506


commit d84f98fc978262f4165f78b3b223b8bb3151f735
Author: Jungtaek Lim 
Date:   2018-06-07T14:14:46Z

[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in 
HDFSBackedStateStoreProvider




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193740695
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (

[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
Retaining versions of state is also relevant to do snapshotting the last 
version in files: HDFSBackedStateStoreProvider doesn't snapshot if the version 
doesn't exist in loadedMaps. So we may want to check whether this option also 
works with current approach of snapshotting.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@TomaszGaweda @aalobaidi 
Please correct me if I'm missing here.

From every start of batch, state store loads previous version of state so 
that it can be read and written. If we unload all the version "after 
committing" the cache will no longer contain previous version of state and it 
will try to load the state via reading files, adding huge latency on starting 
batch. That's why I stated about three cases before to avoid loading state from 
files when starting a new batch.

Please apply #21469 manually and see how much HDFSBackedStateStoreProvider 
consumes memory due to storing multiple versions (it will show the state size 
on the latest version as well as overall state size in cache). Please also 
observe and provide numbers of latency to show how much it is and how much it 
will be after the patch. We always have to ask ourselves that we are addressing 
the issue correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r193622940
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 ---
@@ -231,7 +231,7 @@ class StreamingQueryListenerSuite extends StreamTest 
with BeforeAndAfter {
   test("event ordering") {
 val listener = new EventCollector
 withListenerAdded(listener) {
-  for (i <- 1 to 100) {
+  for (i <- 1 to 50) {
--- End diff --

After the patch this test starts failing: it just means there's more time 
needed to run this loop 100 times, and doesn't mean the logic is broken. 
Decreasing number works for me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@arunmahadevan 
Added custom metrics in state store to streaming query status as well. You 
can see `providerLoadedMapSize` is added to `stateOperators.customMetrics` in 
below output.

I have to exclude `providerLoadedMapCountOfVersions` from the list, since 
average metric is implemented a bit tricky and doesn't look like easy to 
aggregate for streaming query status. 
We may want to reimplement SQLMetric and subclasses to make sure everything 
works correctly without any tricky approach, but that doesn't look like trivial 
to address and I think this is out of scope on this PR.

```
18/06/06 22:51:23 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "7564a0b7-e3b2-4d53-b246-b774ab04e586",
  "runId" : "8dd34784-080c-4f86-afaf-ac089902252d",
  "name" : null,
  "timestamp" : "2018-06-06T13:51:15.467Z",
  "batchId" : 4,
  "numInputRows" : 547,
  "inputRowsPerSecond" : 67.15776550030694,
  "processedRowsPerSecond" : 65.94333936106088,
  "durationMs" : {
"addBatch" : 7944,
"getBatch" : 1,
"getEndOffset" : 0,
"queryPlanning" : 61,
"setOffsetRange" : 5,
"triggerExecution" : 8295,
"walCommit" : 158
  },
  "eventTime" : {
"avg" : "2018-06-06T13:51:10.313Z",
"max" : "2018-06-06T13:51:14.250Z",
"min" : "2018-06-06T13:51:07.098Z",
"watermark" : "2018-06-06T13:50:36.676Z"
  },
  "stateOperators" : [ {
"numRowsTotal" : 20,
"numRowsUpdated" : 16,
"memoryUsedBytes" : 26679,
"customMetrics" : {
  "providerLoadedMapSize" : 181911
}
  } ],
  "sources" : [ {
"description" : "KafkaV2[Subscribe[apachelogs-v2]]",
"startOffset" : {
  "apachelogs-v2" : {
"2" : 489056,
"4" : 489053,
"1" : 489055,
"3" : 489051,
"0" : 489053
  }
},
"endOffset" : {
  "apachelogs-v2" : {
"2" : 489056,
"4" : 489053,
"1" : 489055,
"3" : 489051,
"0" : 489053
  }
},
"numInputRows" : 547,
"inputRowsPerSecond" : 67.15776550030694,
"processedRowsPerSecond" : 65.94333936106088
  } ],
  "sink" : {
"description" : 
"org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@60999714"
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21497#discussion_r193374662
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -256,6 +246,66 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
 }
   }
 
+  test("verify ServerThread only accepts the first connection") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
--- End diff --

Thanks for guiding, addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21497#discussion_r193372564
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -256,6 +246,66 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
 }
   }
 
+  test("verify ServerThread only accepts the first connection") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
--- End diff --

Yeah actually I blindly copied the code line in the file. Agreed it would 
be better to use the key.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193304316
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -20,10 +20,48 @@ package org.apache.spark.sql
 import org.apache.spark.annotation.InterfaceStability
 
 /**
- * A class to consume data generated by a `StreamingQuery`. Typically this 
is used to send the
- * generated data to external systems. Each partition will use a new 
deserialized instance, so you
- * usually should do all the initialization (e.g. opening a connection or 
initiating a transaction)
- * in the `open` method.
+ * The abstract class for writing custom logic to process data generated 
by a query.
+ * This is often used to write the output of a streaming query to 
arbitrary storage systems.
--- End diff --

Ah yes my bad. I confused this as python.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193285667
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193286066
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193286932
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
--- End diff --

We might feel more convenient with `with` statement, and renaming `file` to 
`f` or `fw` or so. Please ignore if there's specific reason not to use `with` 
statement.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193284839
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
--- End diff --

> any initialization for writing data (e.g. opening a connection or 
starting a transaction) be done open after the `open(...)` method has been 
called

`be done open` seems a bit odd. If we can polish the sentence it would be 
better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193289099
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
+file.write("%s\n" % str(event))
+file.close()
+
+def stop_all(self):
+for q in self.spark._wrapped.streams.active:
+q.stop()
+
+def __getstate__(self):
+return (self.open_events_dir, self.process_events_dir, 
self.close_events_dir)
+
+def __setstate__(self, state):
+self.open_events_dir, self.process_events_dir, 
self.close_events_dir = state
+
+def test_streaming_foreach_with_simple_function(self):
+tester = self.ForeachWriterTester(self.spark)
+
+def foreach_func(row):
+tester.write_process_event(row)
+
+tester.run_streaming_query_on_writer(foreach_func, 2)
+self.assertEqual(len(tester.process_events()), 2)
+
+def test_streaming_foreach_with_basic_open_process_close(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return True
+
+def process(self, row):
+tester.write_process_event(row)
+
+def close(self, error):
+tester.write_close_event(error)
+
+tester.run_streaming_query_on_writer(ForeachWriter(), 2)
+
+open_events = tester.open_events()
+self.assertEqual(len(open_events), 2)
+self.assertSetEqual(set([e['epoch'] for e in open_events]), {0, 1})
+
+self.assertEqual(len(tester.process_events()), 2)
+
+close_events = tester.close_events()
+self.assertEqual(len(close_events), 2)
+self.assertSetEqual(set([e['error'] for e in close_events]), 
{'None'})
+
+def test_streaming_foreach_with_open_returning_false(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return False
+
+def process(self, row):
+tester.write_process_event(row)
+
+d

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193284293
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
--- End diff --

nit: deserialized` `copy (space)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193289567
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -20,10 +20,48 @@ package org.apache.spark.sql
 import org.apache.spark.annotation.InterfaceStability
 
 /**
- * A class to consume data generated by a `StreamingQuery`. Typically this 
is used to send the
- * generated data to external systems. Each partition will use a new 
deserialized instance, so you
- * usually should do all the initialization (e.g. opening a connection or 
initiating a transaction)
- * in the `open` method.
+ * The abstract class for writing custom logic to process data generated 
by a query.
+ * This is often used to write the output of a streaming query to 
arbitrary storage systems.
--- End diff --

Looks like doc is duplicated between `foreach()` and `ForeachWriter`. I'm 
not sure how we can leave some reference on Python doc instead of duplicating 
content, but even Python doc doesn't support some kind of reference, some part 
of content seems to be OK to be placed to either place, not both.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193291809
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python._
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{NextIterator, Utils}
+
+class PythonForeachWriter(func: PythonFunction, schema: StructType)
+  extends ForeachWriter[UnsafeRow] {
+
+  private lazy val context = TaskContext.get()
+  private lazy val buffer = new PythonForeachWriter.UnsafeRowBuffer(
+context.taskMemoryManager, new 
File(Utils.getLocalDir(SparkEnv.get.conf)), schema.fields.length)
+  private lazy val inputRowIterator = buffer.iterator
+
+  private lazy val inputByteIterator = {
+EvaluatePython.registerPicklers()
+val objIterator = inputRowIterator.map { row => 
EvaluatePython.toJava(row, schema) }
+new SerDeUtil.AutoBatchedPickler(objIterator)
+  }
+
+  private lazy val pythonRunner = {
+val conf = SparkEnv.get.conf
+val bufferSize = conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
+PythonRunner(func, bufferSize, reuseWorker)
+  }
+
+  private lazy val outputIterator =
+pythonRunner.compute(inputByteIterator, context.partitionId(), context)
+
+  override def open(partitionId: Long, version: Long): Boolean = {
+outputIterator  // initialize everything
+TaskContext.get.addTaskCompletionListener { _ => buffer.close() }
+true
+  }
+
+  override def process(value: UnsafeRow): Unit = {
+buffer.add(value)
+  }
+
+  override def close(errorOrNull: Throwable): Unit = {
+buffer.allRowsAdded()
+if (outputIterator.hasNext) outputIterator.next() // to throw python 
exception if there was one
+  }
+}
+
+object PythonForeachWriter {
+
+  /**
+   * A buffer that is designed for the sole purpose of buffering 
UnsafeRows in PythonForeahWriter.
--- End diff --

nit: PythonForeachWriter


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21497#discussion_r193277616
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -35,10 +34,11 @@ import 
org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
-import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
 
+
--- End diff --

Thanks for letting me know. Addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
I agree that current cache approach may consume excessive memory 
unnecessarily, and that's also same to my finding in #21469. 

The issue is not that simple however, because in micro-batch mode, each 
batch should read previous version of state, otherwise it should read from file 
system, in worst case seeking and reading multiple files in remote file system. 
So previous version of state is encouraged to be available in memory.

There're three cases here (please add if I'm missing here): 1. fail before 
commit 2. committed but batch failed afterwards 3. committed and batch 
succeeds. It might be better to think about all the cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
@arunmahadevan 
Yes, before the patch Spark connects to socket server twice: one for 
getting schema, and another one for reading data.

And `-k` flag is only supported for specific distribution, and that's why I 
had to set breakpoint and started nc again after temp reader is stopped.

For example, in my local dev. (macOS 10.12.6), netcat doesn't support -k 
flag.

```
netcat (The GNU Netcat) 0.7.1
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   >