[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r201477277 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.streaming.state; + +import java.util.Comparator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}. + * + * As TreeMap does, this implementation sorts elements in natural order, and cuts off + * smaller elements to retain at most bigger N elements. + * + * You can provide reversed order of comparator to retain smaller elements instead. + * + * This class is not thread-safe, so synchronization would be needed to use this concurrently. + * + * @param key type + * @param value type + */ +public final class BoundedSortedMap extends TreeMap { --- End diff -- I just handled it in HDFSBackedStateStoreProvider and refactored out afterwards cause this makes HDFSBackedStateStoreProvider code clearer (I feel HDFSBackedStateStoreProvider is less structurized, and I've a patch #21357 to refactor a bit), but I agree with you that this might be used only once for HDFSBackedStateStoreProvider. I'll handle it in HDFSBackedStateStoreProvider. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r200934841 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala --- @@ -53,7 +54,30 @@ class StreamingAggregationSuite extends StateStoreMetricsTest import testImplicits._ - test("simple count, update mode") { + val confAndTestNamePostfixMatrix = List( --- End diff -- OK. I'd like to wait for other reviewers regarding opinions/suggestions on this. Let me keep this as it is until then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r200923851 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala --- @@ -53,7 +54,30 @@ class StreamingAggregationSuite extends StateStoreMetricsTest import testImplicits._ - test("simple count, update mode") { + val confAndTestNamePostfixMatrix = List( --- End diff -- `withSQLConf` looks like used widely between SQL unit tests, and does additional work (SparkSession.setActiveSession), so I'm not sure it will work technically same. Moreover, we need to run same test "multiple times", with changing configuration. Could you propose your code if you don't really mind? Thanks in advance! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 cc. @tdas @zsxwing @jose-torres @jerryshao @arunmahadevan @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21733 [SPARK-24763][SS] Remove redundant key data from value in streaming aggregation * add option to configure enabling new feature: remove redundant key data from value * modify code to respect new option (turning on/off feature) * modify tests to run tests with both on/off * Add guard in OffsetSeqMetadata to prevent modifying option after executing query ## What changes were proposed in this pull request? This patch proposes a new flag option for stateful aggregation: remove redundant key data from value. Enabling new option runs similar with current, and uses less memory for state according to key/value fields of state operator. Please refer below link to see detailed perf. test result: https://issues.apache.org/jira/browse/SPARK-24763?focusedCommentId=16536539=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16536539 Since the state between enabling the option and disabling the option is not compatible, the option is set to 'disable' by default (to ensure backward compatibility), and OffsetSeqMetadata would prevent modifying the option after executing query. ## How was this patch tested? Modify unit tests to cover both disabling option and enabling option. Also did manual tests to see whether propose patch improves state memory usage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24763 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21733.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21733 commit 2a9cc496bb7f832b75b0090ef9a612f4fbc0f206 Author: Jungtaek Lim Date: 2018-07-08T09:37:12Z [SPARK-24763][SS] Remove redundant key data from value in streaming aggregation * add option to configure enabling new feature: remove redundant key data from value * modify code to respect new option (turning on/off feature) * modify tests to run tests with both on/off * Add guard in OffsetSeqMetadata to prevent modifying option after executing query --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21622#discussion_r200554917 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,6 +42,23 @@ class MetricsReporter( registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + private val timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + registerGauge("eventTime-watermark", +progress => convertStringDateToMillis(progress.eventTime.get("watermark")), 0L) + + registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 0L) + registerGauge("states-usedBytes", _.stateOperators.map(_.memoryUsedBytes).sum, 0L) + --- End diff -- We can add more metrics like "providerLoadedMapSizeBytes" after adopting SPARK-24441, so that actual memory usage of state store provider could be tracked via time-series manner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 @tedyu Thanks for the suggestion. Published the result to the mail thread. https://lists.apache.org/thread.html/323ab22fea87c14a2f92e58e7a810aa37cbdf00b9ab81448ee967976@%3Cdev.spark.apache.org%3E I've only written a short summary of the result (since mail may not be a good format to describe detailed numbers rather than markdown) and spend more time to explain the rationalization of my recent issues so that all of them are being covered together. I'll wait more a couple of days, and try to put detailed numbers if things are not started reviewing until then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21721 Though I haven't take a look yet, I would like to see this feature (mentioned from https://github.com/apache/spark/pull/21622#issuecomment-399677099) and happy to see this being implemented! While I love the feature, I agree with @jose-torres that it is going to be a new public API (part of Datasource V2) so worth to discuss regarding the API itself before having specific implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 I would like to add numbers to pursuade how much this patch is helpful for end users of Apache Spark. I crafted and published a project which implements some stateful use cases with IoT Trucking example. https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming With running apps with I can see that cache (loadedMaps) in HDFSBackedStateStoreProvider consumes much more memory than one version of state. It's not like 10~30% but more than 1500% and even more than 8000% in specific case based on the update ratio of state. (Capturing overall map size of provider requires applying the patch #21469 ) Below table is the result of the query, publishing query status to Kafka topic and query these data via Spark SQL. https://gist.github.com/HeartSaVioR/9d53b39052d4779a4c77e71ff7e989a3 > Before applying the patch (`spark.sql.streaming.minBatchesToRetain` set to default value 100) * stream-stream join (IotTruckingAppJoinedAbnormalEvents.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 319 | 765456 | 2632 | 185499903 | 3307747279 | 17.8315310439811928 * window aggregation (IotTruckingAppMovingAggregationsOnSpeed.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 142 | 184 | 138 | 72103 | 6214927 | 86.1951236425668835 * deduplication (IotTruckingAppDistinctPairDriverAndTruck.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 634 | 598 | 0 | 136279 | 6587839 | 48.3408228707284321 > After applying this patch (`spark.sql.streaming.maxBatchesToRetainInMemory` set to default value 2) * stream-stream join (IotTruckingAppJoinedAbnormalEvents.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 127 | 298452 | 4170 | 71023679 | 84454399 | 1.1891020035726395 * window aggregation (IotTruckingAppMovingAggregationsOnSpeed.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 132 | 184 | 138 | 72319 | 162647 | 2.2490216955433565 * deduplication (IotTruckingAppDistinctPairDriverAndTruck.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 133 | 598 | 0 | 136079 | 227863 | 1.6744905532815497 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21718 I'm aware of this issue and have it in my backlog, but for now it doesn't look like easy to address in efficient way. I'll propose an approach for rescaling state when I get one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21718 It has been fairly easy to rescale partitions before stateful operators came into play. For structured streaming, it is now not a trivial thing, cause rescaling partitions should also handle rescaling of state which is stored to disk. Rescaling state may require reading whole states and redistribute via hash function, and resave to disk again. That's why SS stores previous conf. and force using it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 @tedyu Thanks for the detailed review comments. Addressed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r200249847 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -240,7 +244,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit @volatile private var storeConf: StateStoreConf = _ @volatile private var hadoopConf: Configuration = _ - private lazy val loadedMaps = new mutable.HashMap[Long, MapType] + // taking default value first: this will be updated by init method with configuration + @volatile private var numberOfVersionsRetainInMemory: Int = 2 --- End diff -- Will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r200249732 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.streaming.state; + +import java.util.Comparator; +import java.util.Map; +import java.util.TreeMap; + +/** + * This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}. + * + * As TreeMap does, this implementation sorts elements in natural order, and cuts off + * smaller elements to retain at most bigger N elements. + * + * You can provide reversed order of comparator to retain smaller elements instead. + * + * This class is not thread-safe, so synchronization would be needed to use this concurrently. + * + * @param key type + * @param value type + */ +public class BoundedSortedMap extends TreeMap { + + private final int limit; + + /** + * Constructor + * + * @param comparator comparator instance to compare between keys + * @param limit bounded size + */ + public BoundedSortedMap(Comparator comparator, int limit) { +super(comparator); +this.limit = limit; + } + + @Override + public void putAll(Map map) { +for (Map.Entry entry : map.entrySet()) { --- End diff -- Thanks for the great suggestion. While we can't assume that map's type is SortedMap, looks like we could check the type of map in runtime and apply your suggestion. Will apply it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r200249261 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.streaming.state; + +import java.util.Comparator; +import java.util.Map; +import java.util.TreeMap; + +/** + * This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}. + * + * As TreeMap does, this implementation sorts elements in natural order, and cuts off + * smaller elements to retain at most bigger N elements. + * + * You can provide reversed order of comparator to retain smaller elements instead. + * + * This class is not thread-safe, so synchronization would be needed to use this concurrently. + * + * @param key type + * @param value type + */ +public class BoundedSortedMap extends TreeMap { + + private final int limit; + + /** + * Constructor + * + * @param comparator comparator instance to compare between keys + * @param limit bounded size + */ + public BoundedSortedMap(Comparator comparator, int limit) { +super(comparator); +this.limit = limit; + } + + @Override + public void putAll(Map map) { --- End diff -- Unfortunately this is inherited from Map interface so we can't modify its signature. And assuming that `put` is implemented correctly, this can guarantee the size of BoundedSortedMap, it defers `put` method to restrict map's size. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21673: SPARK-24697: Fix the reported start offsets in streaming...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21673 @arunmahadevan We'd be better to respect style guide on pull request: please change title to include let JIRA issue number being guided with `[]` and also add `[SS]`. http://spark.apache.org/contributing.html > The PR title should be of the form [SPARK-][COMPONENT] Title, where SPARK- is the relevant JIRA number, COMPONENT is one of the PR categories shown at spark-prs.appspot.com and Title may be the JIRAâs title or a more specific title describing the PR itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: SPARK-24717 Split out min retain version of state for me...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 Missing new line in EOF for two new Java files. Just addressed. Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: SPARK-24717 Split out min retain version of state for me...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 cc. @tdas @zsxwing @jose-torres @jerryshao @arunmahadevan @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: SPARK-24717 Split out min retain version of state for me...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: SPARK-24717 Split out min retain version of state for me...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 Pasting JIRA issue description to explain why this patch is needed: As default version of "spark.sql.streaming.minBatchesToRetain" is set to high (100), which doesn't require strictly 100x of memory, but I'm seeing 10x ~ 80x of memory consumption for various workloads. In addition, in some cases, requiring 2x of memory is even unacceptable, so we should split out configuration for memory and let users adjust to trade-off between memory usage vs cache miss (building state from files). In normal case, default value '2' would cover both cases: success and restoring failure with less than or around 2x of memory usage, and '1' would only cover success case but no longer require more than 1x of memory. In extreme case, user can set the value to '0' to completely disable the map cache to maximize executor memory (covers #21500). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: SPARK-24717 Split out min retain version of state...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21700 SPARK-24717 Split out min retain version of state for memory in HDFSBackedStateStoreProvider ## What changes were proposed in this pull request? This patch proposes breaking down configuration of retaining batch size on state into two pieces: files and in memory (cache). While this patch reuses existing configuration for files, it introduces new configuration, "spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of batch to retain in memory. This patch also introduces BoundedSortedMap to retain at most first N elements (sorted by key) which can be leveraged in loadedMaps in HDFSBackedStateStoreProvider. ## How was this patch tested? Apply this patch on top of SPARK-24441 (https://github.com/apache/spark/pull/21469), and manually tested to ensure overall size of state is around 2x or less instead of 10x ~ 80x according to various workloads. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24717 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21700.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21700 commit 22f0e220f661b5457584ef83b1ecddc18212fa73 Author: Jungtaek Lim Date: 2018-07-02T22:04:49Z SPARK-24717 Split out min retain version of state for memory in HDFSBackedStateStoreProvider * introduce BoundedSortedMap which implements bounded size of sorted map * only first N elements will be retained * replace loadedMaps to BoundedSortedMap to retain only N versions of states * no need to cleanup in maintenance phase * introduce new configuration: spark.sql.streaming.minBatchesToRetainInMemory --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 Rebased to fix conflict, and added new commit (last one: c9aada5) to represent cache hit / miss count in HDFS state provider. This is actually helpful for SPARK-24717 to determine proper value of configuration, but with this commit SPARK-24717 should be on top of this PR, so just added it here to avoid rebase hell. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21622#discussion_r198300792 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,6 +42,23 @@ class MetricsReporter( registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + private val timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + registerGauge("eventTime-watermark", +s => convertStringDateToMillis(s.eventTime.get("watermark")), 0L) --- End diff -- 1. will address 2. We don't know whether the map will be empty when calling `registerGauge`, and once we register the metric, `getValue` in Gauge is called from Dropwizard so I'm not sure we can control whether reporting the value or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...
Github user HeartSaVioR closed the pull request at: https://github.com/apache/spark/pull/21617 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21617: [SPARK-24634][SS] Add a new metric regarding number of r...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21617 Abandoning the patch. While I think the JIRA issue is still valid, looks like we should address watermark issue to have correct number of late events. Thanks for reviewing @jose-torres @arunmahadevan . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21617#discussion_r197986093 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -48,12 +49,13 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = -new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) +new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numLateInputRows) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ -("memoryUsedBytes" -> JInt(memoryUsedBytes)) +("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ +("numLateInputRows" -> JInt(numLateInputRows)) --- End diff -- @arunmahadevan Ah yes got it. If we would want to have accurate number we need to filter out late events from the first time anyway. I guess we may need to defer addressing this until we change the behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21617#discussion_r197981651 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -48,12 +49,13 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = -new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) +new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numLateInputRows) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ -("memoryUsedBytes" -> JInt(memoryUsedBytes)) +("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ +("numLateInputRows" -> JInt(numLateInputRows)) --- End diff -- @arunmahadevan > Here you are measuring the number of "keys" filtered out of the state store since they have crossed the late threshold correct ? No, it is based on "input" rows which are filtered out due to watermark threshold. Note that the meaning of "input" is relative, cause it doesn't represent for input rows in overall query, but represents for input rows in state operator. > Its better if we could rather expose the actual number of events that were late. I guess the comment is based on missing thing, but I would think that it would be correct that we filtered out late events from the first phase of query (not from state operator) so that we can get correct count of late events. For now filters affect the count. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21617: [SPARK-24634][SS] Add a new metric regarding number of r...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21617 @jose-torres Yes, you're right. They would be the rows which applies other transformation and filtering, not origin input rows. I just haven't find proper alternative word than "input row" since in point of state operator's view, they're input rows. Btw, as I described in the JIRA, my final goal is pushing late events to side-output (as Beam and Flink represented) but being stuck with couple of concerns (Please correct me anytime if I'm missing here): 1. Which events to push? Query can have couple of transformations before reaching stateful operator and being filtered out due to watermark. This is not ideal and I guess that's you said as "aren't necessarily the input rows". Ideally we would be better to provide origin input rows, rather than transformed one, but then we should put major restriction on watermark: `Filter with watermark` should be applied in data reader (or having a filter just after data reader), which means input rows itself should have timestamp field. We can't apply transformation(s) to populate/manipulate timestamp field, and timestamp field **must not** be modified during transformations. For example, Flink provides timestamp assigner to extract timestamp value from input stream, and reserved field name `rowtime` is used for timestamp field. 2. Does the nature of RDD support multiple outputs? I have been struggling on this, but as far as my understanding is correct, RDD itself doesn't support multiple outputs, as the nature of RDD. For me, this looks like major difference between pull model vs push model, cause in push model which other streaming frameworks use, defining another output stream is really straightforward, just like adding remote listener, whereas I'm not sure how it can be clearly defined in pull model. I also googled about multiple outputs on RDD (as someone could have struggled before) but no luck. The alternative approaches I can imagine are kinds of workarounds: RPC, listener bus, callback function. Nothing can define another stream within current DAG, and I'm also not sure that we can create DataFrame based on the data and let end users compose another query. It would be really helpful if you can think about better alternatives and share. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21622 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21622 I think we may want to add metrics regarding sources and sinks as well, but the format of offset information or other metadata information can be different between sources and sinks. Not sure about more preferred approach: 1. define general format of information for source/sink 2. let individual source/sink manage metric as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21622 cc. @tdas @zsxwing @jose-torres @jerryshao @arunmahadevan @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21622 [SPARK-24637][SS] Add metrics regarding state and watermark to dropwizard metrics ## What changes were proposed in this pull request? The patch adds metrics regarding state and watermark to dropwizard metrics, so that watermark and state rows/size can be tracked via time-series manner. ## How was this patch tested? Manually tested with CSV metric sink. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24637 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21622.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21622 commit 147c98a94140bae505116f5af4d616dcf8d85eab Author: Jungtaek Lim Date: 2018-06-23T08:04:55Z SPARK-24637 Add metrics regarding state and watermark to dropwizard metrics --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21617: [SPARK-24634][SS] Add a new metric regarding number of r...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21617 cc. @tdas @zsxwing @jose-torres @jerryshao @arunmahadevan @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21617 [SPARK-24634][SS] Add a new metric regarding number of rows later than watermark ## What changes were proposed in this pull request? This adds a new metric to count the number of rows arrived later than watermark. The metric will be exposed to two places: 1. streaming query listener -`numLateInputRows` in `stateOperators` 2. SQL tab in UI - `number of rows which are later than watermark` in state operator exec Please refer https://issues.apache.org/jira/browse/SPARK-24634 to see rationalization of the issue. ## How was this patch tested? Modified existing UTs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24634 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21617.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21617 commit ff1b89553acc7ea3a19b586457dd295255047377 Author: Jungtaek Lim Date: 2018-06-23T02:34:16Z SPARK-24634 Add a new metric regarding number of rows later than watermark * This adds a new metric to count the number of rows arrived later than watermark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197004935 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -98,6 +98,10 @@ class ContinuousDataSourceRDD( override def getPreferredLocations(split: Partition): Seq[String] = { split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations() } + + override def clearDependencies(): Unit = { +throw new IllegalStateException("Continuous RDDs cannot be checkpointed") --- End diff -- I'm wondering the method can be called in normal situation: when continuous query is terminated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197000483 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true --- End diff -- What if we have multiple repartitions which one meets the case and others are not? I'm not sure we are restricting repartition operations to be only once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196999896 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD( numPartitions: Int, queueSize: Int = 1024, numShuffleWriters: Int = 1, -epochIntervalMs: Long = 1000) +epochIntervalMs: Long = 1000, +val endpointNames: Seq[String] = Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}")) extends RDD[UnsafeRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { (0 until numPartitions).map { partIndex => - ContinuousShuffleReadPartition(partIndex, queueSize, numShuffleWriters, epochIntervalMs) + ContinuousShuffleReadPartition( +partIndex, endpointNames(partIndex), queueSize, numShuffleWriters, epochIntervalMs) --- End diff -- This effectively asserting numPartitions to be 1, otherwise it will throw exception. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196999687 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD( numPartitions: Int, queueSize: Int = 1024, numShuffleWriters: Int = 1, -epochIntervalMs: Long = 1000) +epochIntervalMs: Long = 1000, +val endpointNames: Seq[String] = Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}")) --- End diff -- Same here: if possible it might be better to have complete code rather than just working with such assumption. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196999745 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) --- End diff -- We are addressing only the specific case that number of partitions is 1, but we could have some assertion for that and try to write complete code so that we don't modify it again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 adding cc. to @zsxwing since he has been reviewing PRs for SS so far. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21357 adding cc. to @zsxwing since he has been reviewing PRs for SS so far. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 adding cc. to @zsxwing since he has been reviewing PRs for SS so far. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21595: [MINOR][SQL] Remove invalid comment from SparkStrategies
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21595 @HyukjinKwon @hvanhovell Thanks for reviewing and merging! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21388 I just provided new patch to remove the comment, as it looks like no longer preferred option. https://github.com/apache/spark/pull/21595 Closing this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21388: [SPARK-24336][SQL] Support 'pass through' transfo...
Github user HeartSaVioR closed the pull request at: https://github.com/apache/spark/pull/21388 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21595: [MINOR][SQL] Remove invalid comment from SparkStr...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21595 [MINOR][SQL] Remove invalid comment from SparkStrategies ## What changes were proposed in this pull request? This patch is removing invalid comment from SparkStrategies, given that TODO-like comment is no longer preferred one as the comment: https://github.com/apache/spark/pull/21388#issuecomment-396856235 Removing invalid comment will prevent contributors to spend their times which is not going to be merged. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark MINOR-remove-invalid-comment-on-spark-strategies Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21595.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21595 commit 8afb36b20aab1bbd1f6a5cf902aef7e0c04c8353 Author: Jungtaek Lim Date: 2018-06-20T01:48:17Z [MINOR][SQL] Remove invalid comment from SparkStrategies * The option is no longer preferred one as below comment * https://github.com/apache/spark/pull/21388#issuecomment-396856235 * Removing this to prevent contributors to waste their times --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21357 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [SPARK-24396] [SS] [PYSPARK] Add Structured Strea...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r195632189 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +843,170 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +@since(2.4) +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserialized copy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) is done after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> def print_row(row): +... print(row) +... +>>> writer = sdf.writeStream.foreach(print_row) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partit
[GitHub] spark pull request #21477: [SPARK-24396] [SS] [PYSPARK] Add Structured Strea...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r195625921 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +843,170 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +@since(2.4) +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserialized copy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) is done after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> def print_row(row): +... print(row) +... +>>> writer = sdf.writeStream.foreach(print_row) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partit
[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21388 @hvanhovell To be honest, I found the rationalization of the issue from a comment in Spark code: https://github.com/apache/spark/blob/4c388bccf1bcac8f833fd9214096dd164c3ea065/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L496-L497 and I thought the comment makes sense: it would be beneficial if we just couple matching pair of (LogicalPlan, SparkPlan) for the cases which don't require some transformations while transforming. For the first time, I tried my best to stick with compile-time things, but realized it is not possible to achieve without runtime reflection (at least for me) after couple of hours. So another couple of hours were spent on resolving. I have no strong opinion to adopt reflection on planner (so happy to see the approach got rejected), but if we agree it cannot be handled without reflection, the origin comment should be removed, or describing limitations on addressing it so that others might try out with avoiding limitations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21357 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r194613720 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => val storeMetrics = store.metrics longMetric("numTotalStateRows") += storeMetrics.numKeys longMetric("stateMemory") += storeMetrics.memoryUsedBytes -storeMetrics.customMetrics.foreach { case (metric, value) => - longMetric(metric.name) += value +storeMetrics.customMetrics.foreach { + case (metric: StateStoreCustomAverageMetric, value) => +longMetric(metric.name).set(value * 1.0d) --- End diff -- We would be better to think about the actual benefit of exposing the value, rather than how to expose the value to somewhere. If we define it as count and do aggregation as summation, the aggregated value will be `(partition count * versions)` which might be hard for end users to find the meaning from the value. I'm afraid that exposing this to StreamingQuery as average is not trivial, especially SQLMetric is defined as `AccumulatorV2[Long, Long]` so only single Long value can be passed. Under the restriction, we couldn't define `merge` operation for `average metric`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21357 Kindly ping again to @tdas And cc. to @jose-torres @jerryshao @HyukjinKwon @arunmahadevan for reviewing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 Kindly ping again to @tdas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 Kindly ping again to @tdas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21497 Kindly ping again to @tdas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21506 Kindly ping again to @tdas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r194585044 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => val storeMetrics = store.metrics longMetric("numTotalStateRows") += storeMetrics.numKeys longMetric("stateMemory") += storeMetrics.memoryUsedBytes -storeMetrics.customMetrics.foreach { case (metric, value) => - longMetric(metric.name) += value +storeMetrics.customMetrics.foreach { + case (metric: StateStoreCustomAverageMetric, value) => +longMetric(metric.name).set(value * 1.0d) --- End diff -- If my understanding is right, the metric object (return of `longMetric()`) is different between each task, so the object will be different for each batch and each task. (TaskMetric is serialized and deserialized so it can't be shared between tasks.) And actually the metric values are not aggregated into an SQLMetric object. The values are just aggregated and represented in SQLAppStatusListener. https://github.com/apache/spark/blob/f5af86ea753c446df59a0a8c16c685224690d633/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala#L162-L174 https://github.com/apache/spark/blob/f5af86ea753c446df59a0a8c16c685224690d633/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala#L147-L160 https://user-images.githubusercontent.com/1317309/41263432-024efe4a-6e22-11e8-92f9-24d1f73776a9.png;> --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 @aalobaidi I just would like to see the benefit of unloading the version of state which is expected to be read from the next batch. Totally I agree current mechanism of cache is excessive, but we can still avoid reloading in every batch. Are you considering multiple stages which executor is encouraged to clean up memory as much as it can, despite of redundant reloading state? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r194563959 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -247,6 +253,14 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) + private lazy val metricProviderLoaderMapSizeBytes: StateStoreCustomSizeMetric = +StateStoreCustomSizeMetric("providerLoadedMapSizeBytes", + "estimated size of states cache in provider") + + private lazy val metricProviderLoaderCountOfVersionsInMap: StateStoreCustomAverageMetric = --- End diff -- This should be average to show `min, med, max` in SQL metrics UI, as I pasted capture of UI before. Summing them all doesn't give meaningful value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 After enabling option, I've observed small expected latency whenever starting batch per each partition per each batch. Median/average was 4~50 ms for my case, but max latency was a bit higher than 700 ms. Please note that state size in my experiment is not that super huge, so if partition has much bigger state the latency could be much higher: ``` memory used by state total (min, med, max): 812.6 KB (2.1 KB, 4.1 KB, 4.1 KB) time to commit changes total (min, med, max): 13.5 s (21 ms, 35 ms, 449 ms) total time to remove rows total (min, med, max): 22 ms (22 ms, 22 ms, 22 ms) number of updated state rows: 5,692 total time to update rows total (min, med, max): 1.4 s (3 ms, 5 ms, 42 ms) ``` As I explained earlier, loading the last version from files brings avoidable latency. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21506 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21506#discussion_r194295068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit if (loadedCurrentVersionMap.isDefined) { return loadedCurrentVersionMap.get } -val snapshotCurrentVersionMap = readSnapshotFile(version) -if (snapshotCurrentVersionMap.isDefined) { - synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } - return snapshotCurrentVersionMap.get -} -// Find the most recent map before this version that we can. -// [SPARK-22305] This must be done iteratively to avoid stack overflow. -var lastAvailableVersion = version -var lastAvailableMap: Option[MapType] = None -while (lastAvailableMap.isEmpty) { - lastAvailableVersion -= 1 +logWarning(s"The state for version $version doesn't exist in loadedMaps. " + + "Reading snapshot file and delta files if needed..." + + "Note that this is normal for the first batch of starting query.") - if (lastAvailableVersion <= 0) { -// Use an empty map for versions 0 or less. -lastAvailableMap = Some(new MapType) - } else { -lastAvailableMap = - synchronized { loadedMaps.get(lastAvailableVersion) } -.orElse(readSnapshotFile(lastAvailableVersion)) +val (result, elapsedMs) = Utils.timeTakenMs { + val snapshotCurrentVersionMap = readSnapshotFile(version) + if (snapshotCurrentVersionMap.isDefined) { +synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } +return snapshotCurrentVersionMap.get + } + + // Find the most recent map before this version that we can. + // [SPARK-22305] This must be done iteratively to avoid stack overflow. + var lastAvailableVersion = version + var lastAvailableMap: Option[MapType] = None + while (lastAvailableMap.isEmpty) { +lastAvailableVersion -= 1 + +if (lastAvailableVersion <= 0) { + // Use an empty map for versions 0 or less. + lastAvailableMap = Some(new MapType) +} else { + lastAvailableMap = +synchronized { loadedMaps.get(lastAvailableVersion) } + .orElse(readSnapshotFile(lastAvailableVersion)) +} + } + + // Load all the deltas from the version after the last available one up to the target version. + // The last available version is the one with a full snapshot, so it doesn't need deltas. + val resultMap = new MapType(lastAvailableMap.get) + for (deltaVersion <- lastAvailableVersion + 1 to version) { +updateFromDeltaFile(deltaVersion, resultMap) } -} -// Load all the deltas from the version after the last available one up to the target version. -// The last available version is the one with a full snapshot, so it doesn't need deltas. -val resultMap = new MapType(lastAvailableMap.get) -for (deltaVersion <- lastAvailableVersion + 1 to version) { - updateFromDeltaFile(deltaVersion, resultMap) + synchronized { loadedMaps.put(version, resultMap) } + resultMap } -synchronized { loadedMaps.put(version, resultMap) } -resultMap +logWarning(s"Loading state for $version takes $elapsedMs ms.") --- End diff -- Changed log level to DEBUG. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21506#discussion_r194293481 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit if (loadedCurrentVersionMap.isDefined) { return loadedCurrentVersionMap.get } -val snapshotCurrentVersionMap = readSnapshotFile(version) -if (snapshotCurrentVersionMap.isDefined) { - synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } - return snapshotCurrentVersionMap.get -} -// Find the most recent map before this version that we can. -// [SPARK-22305] This must be done iteratively to avoid stack overflow. -var lastAvailableVersion = version -var lastAvailableMap: Option[MapType] = None -while (lastAvailableMap.isEmpty) { - lastAvailableVersion -= 1 +logWarning(s"The state for version $version doesn't exist in loadedMaps. " + + "Reading snapshot file and delta files if needed..." + + "Note that this is normal for the first batch of starting query.") - if (lastAvailableVersion <= 0) { -// Use an empty map for versions 0 or less. -lastAvailableMap = Some(new MapType) - } else { -lastAvailableMap = - synchronized { loadedMaps.get(lastAvailableVersion) } -.orElse(readSnapshotFile(lastAvailableVersion)) +val (result, elapsedMs) = Utils.timeTakenMs { + val snapshotCurrentVersionMap = readSnapshotFile(version) + if (snapshotCurrentVersionMap.isDefined) { +synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } +return snapshotCurrentVersionMap.get + } + + // Find the most recent map before this version that we can. + // [SPARK-22305] This must be done iteratively to avoid stack overflow. + var lastAvailableVersion = version + var lastAvailableMap: Option[MapType] = None + while (lastAvailableMap.isEmpty) { +lastAvailableVersion -= 1 + +if (lastAvailableVersion <= 0) { + // Use an empty map for versions 0 or less. + lastAvailableMap = Some(new MapType) +} else { + lastAvailableMap = +synchronized { loadedMaps.get(lastAvailableVersion) } + .orElse(readSnapshotFile(lastAvailableVersion)) +} + } + + // Load all the deltas from the version after the last available one up to the target version. + // The last available version is the one with a full snapshot, so it doesn't need deltas. + val resultMap = new MapType(lastAvailableMap.get) + for (deltaVersion <- lastAvailableVersion + 1 to version) { +updateFromDeltaFile(deltaVersion, resultMap) } -} -// Load all the deltas from the version after the last available one up to the target version. -// The last available version is the one with a full snapshot, so it doesn't need deltas. -val resultMap = new MapType(lastAvailableMap.get) -for (deltaVersion <- lastAvailableVersion + 1 to version) { - updateFromDeltaFile(deltaVersion, resultMap) + synchronized { loadedMaps.put(version, resultMap) } + resultMap } -synchronized { loadedMaps.put(version, resultMap) } -resultMap +logWarning(s"Loading state for $version takes $elapsedMs ms.") --- End diff -- I just thought about making a pair between warning message above and this, but once we are guiding end users to turn on DEBUG level to see information regarding addition latencies, turning this to DEBUG would be also OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21506#discussion_r194293251 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit if (loadedCurrentVersionMap.isDefined) { return loadedCurrentVersionMap.get } -val snapshotCurrentVersionMap = readSnapshotFile(version) -if (snapshotCurrentVersionMap.isDefined) { - synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } - return snapshotCurrentVersionMap.get -} -// Find the most recent map before this version that we can. -// [SPARK-22305] This must be done iteratively to avoid stack overflow. -var lastAvailableVersion = version -var lastAvailableMap: Option[MapType] = None -while (lastAvailableMap.isEmpty) { - lastAvailableVersion -= 1 +logWarning(s"The state for version $version doesn't exist in loadedMaps. " + + "Reading snapshot file and delta files if needed..." + + "Note that this is normal for the first batch of starting query.") - if (lastAvailableVersion <= 0) { -// Use an empty map for versions 0 or less. -lastAvailableMap = Some(new MapType) - } else { -lastAvailableMap = - synchronized { loadedMaps.get(lastAvailableVersion) } -.orElse(readSnapshotFile(lastAvailableVersion)) +val (result, elapsedMs) = Utils.timeTakenMs { --- End diff -- Yup right. Most of the code change is just wrapping codes into timeTakenMs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21506 cc. @tdas @jose-torres @jerryshao @arunmahadevan @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 @jose-torres No problem. I expect there would be some inactive moment in Spark community during spark summit. Addressed comment regarding renaming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 @aalobaidi When starting batch, latest version state is being read to start a new version of state. If the state should be restored from snapshot as well as delta files, it will incur huge latency on restoring. #21506 logs messages when loading state requires dealing with (remote) filesystem. That's why I suggest to merge my patch and run your case again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21504 Test failures were from kafka. retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21497 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21504 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193945288 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") --- End diff -- Either debug or info is fine for me, since it would add just couple of log lines only once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 @aalobaidi One thing you may want to be aware is that in point of executor's view, executor must load at least 1 version of state in memory regardless of caching versions. I guess you may get better result if you unload entire cache but leaving the last version you just committed. Cache miss will occur for one of three cases `2. committed but batch failed afterwards` but it will happen rarely and still better than cache miss from two of three cases (2 and 3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 @aalobaidi You can also merge #21506 (maybe with changing log level or modify the patch to set message to INFO level) and see latencies on loading state, snapshotting, cleaning up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21506 There're plenty of other debug messages which might hide the log messages added from this patch. Would we want to log them with INFO instead of DEBUG? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21506 [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider ## What changes were proposed in this pull request? This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue. ## How was this patch tested? Manually tested. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24485 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21506.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21506 commit d84f98fc978262f4165f78b3b223b8bb3151f735 Author: Jungtaek Lim Date: 2018-06-07T14:14:46Z [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193740695 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) be done open after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) is will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> writer = sdf.writeStream.foreach(lambda x: print(x)) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partition_id, epoch_id): +... print("Opened %d, %d" % (
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 Retaining versions of state is also relevant to do snapshotting the last version in files: HDFSBackedStateStoreProvider doesn't snapshot if the version doesn't exist in loadedMaps. So we may want to check whether this option also works with current approach of snapshotting. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 @TomaszGaweda @aalobaidi Please correct me if I'm missing here. From every start of batch, state store loads previous version of state so that it can be read and written. If we unload all the version "after committing" the cache will no longer contain previous version of state and it will try to load the state via reading files, adding huge latency on starting batch. That's why I stated about three cases before to avoid loading state from files when starting a new batch. Please apply #21469 manually and see how much HDFSBackedStateStoreProvider consumes memory due to storing multiple versions (it will show the state size on the latest version as well as overall state size in cache). Please also observe and provide numbers of latency to show how much it is and how much it will be after the patch. We always have to ask ourselves that we are addressing the issue correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r193622940 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala --- @@ -231,7 +231,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("event ordering") { val listener = new EventCollector withListenerAdded(listener) { - for (i <- 1 to 100) { + for (i <- 1 to 50) { --- End diff -- After the patch this test starts failing: it just means there's more time needed to run this loop 100 times, and doesn't mean the logic is broken. Decreasing number works for me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 @arunmahadevan Added custom metrics in state store to streaming query status as well. You can see `providerLoadedMapSize` is added to `stateOperators.customMetrics` in below output. I have to exclude `providerLoadedMapCountOfVersions` from the list, since average metric is implemented a bit tricky and doesn't look like easy to aggregate for streaming query status. We may want to reimplement SQLMetric and subclasses to make sure everything works correctly without any tricky approach, but that doesn't look like trivial to address and I think this is out of scope on this PR. ``` 18/06/06 22:51:23 INFO MicroBatchExecution: Streaming query made progress: { "id" : "7564a0b7-e3b2-4d53-b246-b774ab04e586", "runId" : "8dd34784-080c-4f86-afaf-ac089902252d", "name" : null, "timestamp" : "2018-06-06T13:51:15.467Z", "batchId" : 4, "numInputRows" : 547, "inputRowsPerSecond" : 67.15776550030694, "processedRowsPerSecond" : 65.94333936106088, "durationMs" : { "addBatch" : 7944, "getBatch" : 1, "getEndOffset" : 0, "queryPlanning" : 61, "setOffsetRange" : 5, "triggerExecution" : 8295, "walCommit" : 158 }, "eventTime" : { "avg" : "2018-06-06T13:51:10.313Z", "max" : "2018-06-06T13:51:14.250Z", "min" : "2018-06-06T13:51:07.098Z", "watermark" : "2018-06-06T13:50:36.676Z" }, "stateOperators" : [ { "numRowsTotal" : 20, "numRowsUpdated" : 16, "memoryUsedBytes" : 26679, "customMetrics" : { "providerLoadedMapSize" : 181911 } } ], "sources" : [ { "description" : "KafkaV2[Subscribe[apachelogs-v2]]", "startOffset" : { "apachelogs-v2" : { "2" : 489056, "4" : 489053, "1" : 489055, "3" : 489051, "0" : 489053 } }, "endOffset" : { "apachelogs-v2" : { "2" : 489056, "4" : 489053, "1" : 489055, "3" : 489051, "0" : 489053 } }, "numInputRows" : 547, "inputRowsPerSecond" : 67.15776550030694, "processedRowsPerSecond" : 65.94333936106088 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@60999714" } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21497#discussion_r193374662 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -256,6 +246,66 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("verify ServerThread only accepts the first connection") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { --- End diff -- Thanks for guiding, addressed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21497#discussion_r193372564 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -256,6 +246,66 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("verify ServerThread only accepts the first connection") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { --- End diff -- Yeah actually I blindly copied the code line in the file. Agreed it would be better to use the key. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193304316 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala --- @@ -20,10 +20,48 @@ package org.apache.spark.sql import org.apache.spark.annotation.InterfaceStability /** - * A class to consume data generated by a `StreamingQuery`. Typically this is used to send the - * generated data to external systems. Each partition will use a new deserialized instance, so you - * usually should do all the initialization (e.g. opening a connection or initiating a transaction) - * in the `open` method. + * The abstract class for writing custom logic to process data generated by a query. + * This is often used to write the output of a streaming query to arbitrary storage systems. --- End diff -- Ah yes my bad. I confused this as python. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193285667 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) be done open after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) is will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> writer = sdf.writeStream.foreach(lambda x: print(x)) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partition_id, epoch_id): +... print("Opened %d, %d" % (
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193286066 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) be done open after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) is will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> writer = sdf.writeStream.foreach(lambda x: print(x)) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partition_id, epoch_id): +... print("Opened %d, %d" % (
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193286932 --- Diff: python/pyspark/sql/tests.py --- @@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self): finally: q.stop() shutil.rmtree(tmpPath) +''' +class ForeachWriterTester: + +def __init__(self, spark): +self.spark = spark +self.input_dir = tempfile.mkdtemp() +self.open_events_dir = tempfile.mkdtemp() +self.process_events_dir = tempfile.mkdtemp() +self.close_events_dir = tempfile.mkdtemp() + +def write_open_event(self, partitionId, epochId): +self._write_event( +self.open_events_dir, +{'partition': partitionId, 'epoch': epochId}) + +def write_process_event(self, row): +self._write_event(self.process_events_dir, {'value': 'text'}) + +def write_close_event(self, error): +self._write_event(self.close_events_dir, {'error': str(error)}) + +def write_input_file(self): +self._write_event(self.input_dir, "text") + +def open_events(self): +return self._read_events(self.open_events_dir, 'partition INT, epoch INT') + +def process_events(self): +return self._read_events(self.process_events_dir, 'value STRING') + +def close_events(self): +return self._read_events(self.close_events_dir, 'error STRING') + +def run_streaming_query_on_writer(self, writer, num_files): +try: +sdf = self.spark.readStream.format('text').load(self.input_dir) +sq = sdf.writeStream.foreach(writer).start() +for i in range(num_files): +self.write_input_file() +sq.processAllAvailable() +sq.stop() +finally: +self.stop_all() + +def _read_events(self, dir, json): +rows = self.spark.read.schema(json).json(dir).collect() +dicts = [row.asDict() for row in rows] +return dicts + +def _write_event(self, dir, event): +import random +file = open(os.path.join(dir, str(random.randint(0, 10))), 'w') --- End diff -- We might feel more convenient with `with` statement, and renaming `file` to `f` or `fw` or so. Please ignore if there's specific reason not to use `with` statement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193284839 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a --- End diff -- > any initialization for writing data (e.g. opening a connection or starting a transaction) be done open after the `open(...)` method has been called `be done open` seems a bit odd. If we can polish the sentence it would be better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193289099 --- Diff: python/pyspark/sql/tests.py --- @@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self): finally: q.stop() shutil.rmtree(tmpPath) +''' +class ForeachWriterTester: + +def __init__(self, spark): +self.spark = spark +self.input_dir = tempfile.mkdtemp() +self.open_events_dir = tempfile.mkdtemp() +self.process_events_dir = tempfile.mkdtemp() +self.close_events_dir = tempfile.mkdtemp() + +def write_open_event(self, partitionId, epochId): +self._write_event( +self.open_events_dir, +{'partition': partitionId, 'epoch': epochId}) + +def write_process_event(self, row): +self._write_event(self.process_events_dir, {'value': 'text'}) + +def write_close_event(self, error): +self._write_event(self.close_events_dir, {'error': str(error)}) + +def write_input_file(self): +self._write_event(self.input_dir, "text") + +def open_events(self): +return self._read_events(self.open_events_dir, 'partition INT, epoch INT') + +def process_events(self): +return self._read_events(self.process_events_dir, 'value STRING') + +def close_events(self): +return self._read_events(self.close_events_dir, 'error STRING') + +def run_streaming_query_on_writer(self, writer, num_files): +try: +sdf = self.spark.readStream.format('text').load(self.input_dir) +sq = sdf.writeStream.foreach(writer).start() +for i in range(num_files): +self.write_input_file() +sq.processAllAvailable() +sq.stop() +finally: +self.stop_all() + +def _read_events(self, dir, json): +rows = self.spark.read.schema(json).json(dir).collect() +dicts = [row.asDict() for row in rows] +return dicts + +def _write_event(self, dir, event): +import random +file = open(os.path.join(dir, str(random.randint(0, 10))), 'w') +file.write("%s\n" % str(event)) +file.close() + +def stop_all(self): +for q in self.spark._wrapped.streams.active: +q.stop() + +def __getstate__(self): +return (self.open_events_dir, self.process_events_dir, self.close_events_dir) + +def __setstate__(self, state): +self.open_events_dir, self.process_events_dir, self.close_events_dir = state + +def test_streaming_foreach_with_simple_function(self): +tester = self.ForeachWriterTester(self.spark) + +def foreach_func(row): +tester.write_process_event(row) + +tester.run_streaming_query_on_writer(foreach_func, 2) +self.assertEqual(len(tester.process_events()), 2) + +def test_streaming_foreach_with_basic_open_process_close(self): +tester = self.ForeachWriterTester(self.spark) + +class ForeachWriter: +def open(self, partitionId, epochId): +tester.write_open_event(partitionId, epochId) +return True + +def process(self, row): +tester.write_process_event(row) + +def close(self, error): +tester.write_close_event(error) + +tester.run_streaming_query_on_writer(ForeachWriter(), 2) + +open_events = tester.open_events() +self.assertEqual(len(open_events), 2) +self.assertSetEqual(set([e['epoch'] for e in open_events]), {0, 1}) + +self.assertEqual(len(tester.process_events()), 2) + +close_events = tester.close_events() +self.assertEqual(len(close_events), 2) +self.assertSetEqual(set([e['error'] for e in close_events]), {'None'}) + +def test_streaming_foreach_with_open_returning_false(self): +tester = self.ForeachWriterTester(self.spark) + +class ForeachWriter: +def open(self, partitionId, epochId): +tester.write_open_event(partitionId, epochId) +return False + +def process(self, row): +tester.write_process_event(row) + +d
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193284293 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly --- End diff -- nit: deserialized` `copy (space) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193289567 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala --- @@ -20,10 +20,48 @@ package org.apache.spark.sql import org.apache.spark.annotation.InterfaceStability /** - * A class to consume data generated by a `StreamingQuery`. Typically this is used to send the - * generated data to external systems. Each partition will use a new deserialized instance, so you - * usually should do all the initialization (e.g. opening a connection or initiating a transaction) - * in the `open` method. + * The abstract class for writing custom logic to process data generated by a query. + * This is often used to write the output of a streaming query to arbitrary storage systems. --- End diff -- Looks like doc is duplicated between `foreach()` and `ForeachWriter`. I'm not sure how we can leave some reference on Python doc instead of duplicating content, but even Python doc doesn't support some kind of reference, some part of content seems to be OK to be placed to either place, not both. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193291809 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python._ +import org.apache.spark.internal.Logging +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.sql.ForeachWriter +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{NextIterator, Utils} + +class PythonForeachWriter(func: PythonFunction, schema: StructType) + extends ForeachWriter[UnsafeRow] { + + private lazy val context = TaskContext.get() + private lazy val buffer = new PythonForeachWriter.UnsafeRowBuffer( +context.taskMemoryManager, new File(Utils.getLocalDir(SparkEnv.get.conf)), schema.fields.length) + private lazy val inputRowIterator = buffer.iterator + + private lazy val inputByteIterator = { +EvaluatePython.registerPicklers() +val objIterator = inputRowIterator.map { row => EvaluatePython.toJava(row, schema) } +new SerDeUtil.AutoBatchedPickler(objIterator) + } + + private lazy val pythonRunner = { +val conf = SparkEnv.get.conf +val bufferSize = conf.getInt("spark.buffer.size", 65536) +val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) +PythonRunner(func, bufferSize, reuseWorker) + } + + private lazy val outputIterator = +pythonRunner.compute(inputByteIterator, context.partitionId(), context) + + override def open(partitionId: Long, version: Long): Boolean = { +outputIterator // initialize everything +TaskContext.get.addTaskCompletionListener { _ => buffer.close() } +true + } + + override def process(value: UnsafeRow): Unit = { +buffer.add(value) + } + + override def close(errorOrNull: Throwable): Unit = { +buffer.allRowsAdded() +if (outputIterator.hasNext) outputIterator.next() // to throw python exception if there was one + } +} + +object PythonForeachWriter { + + /** + * A buffer that is designed for the sole purpose of buffering UnsafeRows in PythonForeahWriter. --- End diff -- nit: PythonForeachWriter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21497#discussion_r193277616 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -35,10 +34,11 @@ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} -import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + --- End diff -- Thanks for letting me know. Addressed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 I agree that current cache approach may consume excessive memory unnecessarily, and that's also same to my finding in #21469. The issue is not that simple however, because in micro-batch mode, each batch should read previous version of state, otherwise it should read from file system, in worst case seeking and reading multiple files in remote file system. So previous version of state is encouraged to be available in memory. There're three cases here (please add if I'm missing here): 1. fail before commit 2. committed but batch failed afterwards 3. committed and batch succeeds. It might be better to think about all the cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21497 @arunmahadevan Yes, before the patch Spark connects to socket server twice: one for getting schema, and another one for reading data. And `-k` flag is only supported for specific distribution, and that's why I had to set breakpoint and started nc again after temp reader is stopped. For example, in my local dev. (macOS 10.12.6), netcat doesn't support -k flag. ``` netcat (The GNU Netcat) 0.7.1 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org