[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23156 @gaborgsomogyi No problem :) When you get some other times please take a look at my other PRs as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23156 I think @jose-torres previously led the feature. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23156 I'd rather not jumping in something regarding continuous mode unless the overall design (including aggregation and join) of continuous mode is cleared and stabilized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23260: [SPARK-26311][YARN] New feature: custom log URL for stdo...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23260 @srowen For now executor log url is **static** in Spark, which forces Node Manager to be alive even after application is finished, in order to provide executor log in SHS. This situation can be happen when decommission happens a bit frequently, or when end users want a kind of elasticity against YARN cluster (not only decommissioning nodes, but also elasticity on YARN cluster itself - YARN has cluster id for RM which classifies the cluster which can be leveraged when dealing with multiple YARN clusters.) There's also similar change applied on Hadoop side. https://github.com/apache/hadoop/commit/5fe1dbf1959976d0dc5a8e614dd74836cfbee04c We are experimenting central log service which resolves above situation. At least the log url for centralized log service can't be same URL as NM webapp, we have to get flexibility of executor log URL. Hope it explains the rationalization well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23169: [SPARK-26103][SQL] Limit the length of debug strings for...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23169 Thanks for addressong review comments. It looks great overall. We may want to document the new config so that we can guide setting the value to lower when end users suffer from memory pressure due to long physical plans in UI pages. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23260: [SPARK-26311][YARN] New feature: custom log URL f...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/23260 [SPARK-26311][YARN] New feature: custom log URL for stdout/stderr ## What changes were proposed in this pull request? This patch proposes adding a new configuration on YARN mode: custom log URL. This will enable end users to point application logs to external log service which enables to serve logs when NodeManager becomes unavailable. Some pre-defined patterns are available for custom log URL to specify them like path variables. ## How was this patch tested? Manual test. Below run changes executor log URLs in UI pages. ``` ./bin/spark-submit --conf spark.yarn.custom.log.url="{{HttpScheme}}{{NodeHttpAddress}}/test/cluster/{{ClusterId}}/container/{{ContainerId}}/user/{{User}}/filename/{{FileName}}" --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-.jar ``` Example of stdout log url is below: `http://node-address:node-port`/test/cluster/`workload1`/container/`container_e08_1542798098040_0012_01_02`/user/`spark`/filename/`stdout` You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-26311 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23260.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23260 commit 65cc6a32729cccba340f66c766c7255be4d7f356 Author: Jungtaek Lim (HeartSaVioR) Date: 2018-12-08T06:32:46Z [SPARK-26311][YARN] New feature: custom log URL for stdout/stderr --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @zsxwing Please also take a look: I guess I addressed glob overlap issue as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @gaborgsomogyi @steveloughran Please take a look at 17b9b9a043ead0d448048c88b30f544228bd230b which just leverages GlobFilter. You may find that when the depth of archive path is more than 2, there's no chance for final destination to be picked up from FileStreamSource: so most of usual cases overlap will not happen, as well as Spark can determine this as only comparing depths. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 I'm now also playing with Hadoop glob relevant classes to check whether final destination matches source path glob pattern or not. * Looks like we can leverage `GlobPattern` but it is marked as `@Private`. * `GlobFilter` is `@Public` but it only checks against `path.getName()` so it would only compare with the last component. If we would like to leverage this, we should split all components and compare with multiple filters. Will update the code and test once I find a viable approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @gaborgsomogyi That's really huge... Could you share how you tested? Like which FS (local/HDFS/S3/etc), directory structure, count of files... That would help me understanding the impact and also help on testing manually when we deal with optimization. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @gaborgsomogyi @steveloughran OK. I'll change the approach to just check against final path for each moving. As @steveloughran stated, it may bring performance hit for each checking when dealing with object stores, so we may also need to provide a way to disable checking as well with caution. (Btw, if moving file in object store requires huge overhead rather than globing, slow globing may not be a big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @gaborgsomogyi @steveloughran `GlobExpander` only looks like handling `{}` pattern. We need to still deal with `*` and `?` which can't be expanded like this. It would only work if we would be OK with restricting descendants of multiple paths (for now we restrict descendants of one path), so while it would help fixing the bug of current patch, it might be still too restrictive. I think changing Hadoop version because of this costs too much. If we really would like to go, only viable solution is copying the code. (Actually we can also just reimplement it since its requirements are like a kind of assignment, though we may end up with similar code.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23169: [SPARK-26103][SQL] Limit the length of debug strings for...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23169 @DaveDeCaprio You might miss to roll back change in test. https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99632/testReport/org.apache.spark.sql.catalyst.trees/TreeNodeSuite/treeString_limits_plan_length/ I also think you need to add a new test with setting configuration to some value and see whether it works properly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23169: [SPARK-26103][SQL] Limit the length of debug strings for...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23169 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @zsxwing @gaborgsomogyi What we were trying to do is enforcing archive path so that moved files will not make overlap with source path. There may be same file name with different directory so I'm also trying to persist its own path in final archived path, which means archive files will not be placed in same directory. Based on above, I thought enforcing archive path with checking glob path is not easy to do, because without knowing final archive path (per file) we can't check it matches with glob pattern. That's why I just would rather restrict all subdirectories instead of finding a way to check against glob pattern. Actually I'm a bit afraid that we might be putting too much complexity on enforcing archive path. If we are OK with not enforcing archive path and just verify the final archive path doesn't overlap source path per each source file, it would be simple to do. We can make Spark not moving file and log warning message to let end users specify other directory. Would like to hear everyone's thought and idea. Thanks in advance! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238090190 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticate +clients through certificates. Please note 2-way authentication must be enabled on Kafka brokers. + - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos used for authentication but +because there is no encryption it's only for testing purposes. + + After delegation token successfully obtained Spark spreads it across nodes and renews it accordingly. + Delegation token uses `SCRAM` login module for authentication. + + When delegation token is available for example on an executor it can be overridden with JAAS login --- End diff -- nit: Removing `for example` sounds clearer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238089426 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticate --- End diff -- nit: authenticate -> authenticate`s` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238090666 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticate +clients through certificates. Please note 2-way authentication must be enabled on Kafka brokers. + - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos used for authentication but +because there is no encryption it's only for testing purposes. + + After delegation token successfully obtained Spark spreads it across nodes and renews it accordingly. --- End diff -- nit: `After obtaining delegation token successfully,` sounds more natural for me, but just adding `,` before `Spark` would be also fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238090476 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticate +clients through certificates. Please note 2-way authentication must be enabled on Kafka brokers. + - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos used for authentication but +because there is no encryption it's only for testing purposes. + + After delegation token successfully obtained Spark spreads it across nodes and renews it accordingly. + Delegation token uses `SCRAM` login module for authentication. + + When delegation token is available for example on an executor it can be overridden with JAAS login + configuration. +- **JAAS login configuration**: JAAS login configuration must be created and transferred to all --- End diff -- nit: `transferred` sounds like end users should send the files. `placed` sounds more general to allow how to let JAAS conf file be available at any way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @zsxwing Yeah, it would be ideal we can enforce `archivePath` to which don't have any possibility to match against source path (glob), so my approach was to find directory which is the base directory without having glob in ancestor, and `archive path + base directory of source path` doesn't belong to sub-directory of found directory. For example, suppose source path is `/a/b/c/*/ef?/*/g/h/*/i`, then base directory of source path would be `/a/b/c`, and `archive path + base directory of source path` should not belong to sub-directory of `/a/b/c`. (My code has a bug for finding the directory so need to fix it.) This is not an elegant approach and the approach has false-positive, ending up restricting the archive path which actually doesn't make overlap (too restrict), but it would guarantee two paths never overlap. (So no need to re-check when renaming file.) I guess the approach might be reasonable because in practice end users would avoid themselves have to think about complicated case on overlaps, and just isolate two paths. What do you think about this approach? cc. @gaborgsomogyi Could you also help validating my approach? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @zsxwing Btw, how do you think about addressing background move/deletion (I had thought and @gaborgsomogyi also suggested as well) into separate issue? I guess putting more feature would let you spend more time to review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237481604 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + } +} + +def remove(entry: FileEntry): Unit = { + val curPath = new Path(entry.path) --- End diff -- I just modified existing UT to have space and % in directory name as well as file name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @zsxwing Thanks for the detailed review! Addressed review comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237342362 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) --- End diff -- Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237342346 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -100,6 +101,36 @@ class FileStreamSource( logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + ensureNoOverlapBetweenSourceAndArchivePath() + + private def ensureNoOverlapBetweenSourceAndArchivePath(): Unit = { +@tailrec +def removeGlob(path: Path): Path = { + if (path.getName.contains("*")) { +removeGlob(path.getParent) + } else { +path + } +} + +sourceOptions.sourceArchiveDir match { + case None => + case Some(archiveDir) => +val sourceUri = removeGlob(qualifiedBasePath).toUri +val archiveUri = new Path(archiveDir).toUri + +val sourcePath = sourceUri.getPath +val archivePath = archiveUri.getPath --- End diff -- Nice finding. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237342072 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + } +} + +def remove(entry: FileEntry): Unit = { + val curPath = new Path(entry.path) --- End diff -- Yeah... actually I was somewhat confused I have to escape/unescape for path. Thanks for suggestion. Will address and add a new unit test for testing it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237341854 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + } +} + +def remove(entry: FileEntry): Unit = { + val curPath = new Path(entry.path) + try { +logDebug(s"Removing completed file $curPath") +fs.delete(curPath, false) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +val logOffset = FileStreamSourceOffset(end).logOffset +metadataLog.get(logOffset) match { --- End diff -- Ah I didn't indicate that. Thanks for letting me know! Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237341425 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) --- End diff -- Yeah, I guess the patch prevents the case if it works like my expectation, but I'm also in favor of defensive programming and logging would be better for end users. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237340952 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala --- @@ -74,6 +76,39 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * The archive directory to move completed files. The option will be only effective when + * "cleanSource" is set to "archive". + * + * Note that the completed file will be moved to this archive directory with respecting to + * its own path. + * + * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive + * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt". + */ + val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir") + + /** + * Defines how to clean up completed files. Available options are "archive", "delete", "no_op". + */ + val cleanSource: CleanSourceMode.Value = { +val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString) --- End diff -- OK will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237340938 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -100,6 +101,36 @@ class FileStreamSource( logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + ensureNoOverlapBetweenSourceAndArchivePath() --- End diff -- Ah yes missed it. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237340601 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" +cleanSource: option to clean up completed files after processing. +Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op". +When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again. +Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt" +NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation. +NOTE 2: The source path should not be used from multiple queries when enabling this option, because source files will be moved or deleted which behavior may impact the other queries. --- End diff -- Nice finding. I missed the case which multiple sources in same query refer same file directory. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23169: [SPARK-26103][SQL] Limit the length of debug stri...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/23169#discussion_r237305318 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1610,6 +1610,12 @@ object SQLConf { """ "... N more fields" placeholder.""") .intConf .createWithDefault(25) + + val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.debug.maxPlanLength") --- End diff -- I'm not sure `debug` is right. You know this patch should help UI to reduce memory usage which is not a debug. If we specify `debug` here, end users would interpret as there's a debug mode. Same as description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23169: [SPARK-26103][SQL] Limit the length of debug stri...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/23169#discussion_r237304214 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.io.Writer + +class WriterSizeException(val attemptedSize: Long, val charLimit: Long) extends Exception( + s"Attempted to write $attemptedSize characters to a writer that is limited to $charLimit") + +/** + * This class is used to control the size of generated writers. Guarantees that the total number + * of characters written will be less than the specified size. + * + * Checks size before writing and throws a WriterSizeException if the total size would count the + * limit. + */ +class SizeLimitedWriter(underlying: Writer, charLimit: Long) extends Writer { + + var charsWritten: Long = 0 + + override def write(cbuf: Array[Char], off: Int, len: Int): Unit = { --- End diff -- I'd rather make this writing the content as much as possible (with `...`), or let WriterSizeException contains relevant information to help caller be able to call this again with smaller length. In worst case, if first physical plan is huge we end up only showing `...`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23169: [SPARK-26103][SQL] Limit the length of debug stri...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/23169#discussion_r237301700 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.io.Writer + +class WriterSizeException(val attemptedSize: Long, val charLimit: Long) extends Exception( + s"Attempted to write $attemptedSize characters to a writer that is limited to $charLimit") + +/** + * This class is used to control the size of generated writers. Guarantees that the total number + * of characters written will be less than the specified size. + * + * Checks size before writing and throws a WriterSizeException if the total size would count the + * limit. + */ +class SizeLimitedWriter(underlying: Writer, charLimit: Long) extends Writer { + + var charsWritten: Long = 0 --- End diff -- Looks like it should not be exposed outside of class. Let's guard it as `private`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23169: [SPARK-26103][SQL] Limit the length of debug stri...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/23169#discussion_r237309191 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala --- @@ -202,6 +202,26 @@ package object util extends Logging { /** Shorthand for calling truncatedString() without start or end strings. */ def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "") + /** Whether we have warned about plan string truncation yet. */ + private val planSizeWarningPrinted = new AtomicBoolean(false) + + def withSizeLimitedWriter[T](writer: Writer)(f: (Writer) => T): Option[T] = { +try { + val limited = new SizeLimitedWriter(writer, SQLConf.get.maxPlanStringLength) + Some(f(limited)) +} +catch { + case e: WriterSizeException => +writer.write("...") --- End diff -- nit: Would we want to restrict string's length to maxPlanStringLength including `...`? I think exceeding defined length by max 3 chars is not a big deal but just to double check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23169: [SPARK-26103][SQL] Limit the length of debug stri...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/23169#discussion_r237307829 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala --- @@ -595,4 +596,14 @@ class TreeNodeSuite extends SparkFunSuite { val expected = Coalesce(Stream(Literal(1), Literal(3))) assert(result === expected) } + + test("toString() tree depth") { --- End diff -- `treeString` sounds right to me rather than `toString`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23142: [SPARK-26170][SS] Add missing metrics in FlatMapGroupsWi...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23142 cc. @tdas @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23142: [SPARK-26170][SS] Add missing metrics in FlatMapGroupsWi...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23142 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23142: [SPARK-26170][SS] Add missing metrics in FlatMapG...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/23142 [SPARK-26170][SS] Add missing metrics in FlatMapGroupsWithState ## What changes were proposed in this pull request? This patch addresses measuring possible metrics in StateStoreWriter to FlatMapGroupsWithStateExec. Please note that some metrics like time to remove elements are not addressed because they are coupled with state function. ## How was this patch tested? Manually tested with https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala. Snapshots below: ![screen shot 2018-11-26 at 4 13 40 pm](https://user-images.githubusercontent.com/1317309/48999346-b5f7b400-f199-11e8-89c7-8795f13470d6.png) ![screen shot 2018-11-26 at 4 13 54 pm](https://user-images.githubusercontent.com/1317309/48999347-b5f7b400-f199-11e8-91ef-ef0b2f816b2e.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-26170 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23142.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23142 commit 56f39cc5838c3f609c8657639ac3a45991fde99f Author: Jungtaek Lim (HeartSaVioR) Date: 2018-11-26T07:33:08Z SPARK-26170 Add missing metrics in FlatMapGroupsWithState --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23103: [SPARK-26121] [Structured Streaming] Allow users to defi...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23103 LGTM. Btw, IMHO, TODOs @zouzias described would be better to be addressed at once since documentation is easy to be forgotten. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @gaborgsomogyi Thanks for taking care, but I guess I can manage it. I'll ask for help when I can't go back to this one. This patch (latest change) hasn't get any feedback on committers yet so let's not rush on this and wait for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @gaborgsomogyi Thanks for reviewing! I addressed your review comments except asynchronous cleanup, which might be able to break down to separated issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @gaborgsomogyi Yeah I also thought about the idea (commented above) but I've lost focus on other task. Given that smaller patch is better to be reviewed easily and current patch works well (except overheads on cleaning in same thread), would we split this up and address it to another issue? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235632761 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" +cleanSource: option to clean up completed files after processing. +Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op". +When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again. --- End diff -- Yeah I guess you're right. I'll add a logic to check in initialization on FileStreamSource. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235632872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala --- @@ -74,6 +76,43 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * The archive directory to move completed files. The option will be only effective when + * "cleanSource" is set to "archive". + * + * Note that the completed file will be moved to this archive directory with respecting to + * its own path. + * + * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive + * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt". + */ + val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir") + + /** + * Defines how to clean up completed files. Available options are "archive", "delete", "no_op". + */ + val cleanSource: CleanSourceMode.Value = { +val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString) + .toUpperCase(Locale.ROOT) + +val matchedModeOpt = CleanSourceMode.values.find(_.toString == modeStrOption) +if (matchedModeOpt.isEmpty) { --- End diff -- Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235632809 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +258,64 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + if (!fs.exists(newPath.getParent)) { --- End diff -- Nice finding. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23076: [SPARK-26103][SQL] Added maxDepth to limit the length of...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/23076 I'm seeing both sides of needs: while I think dumping full plan into file is a good feature for debugging specific issue, retaining full plans for representing them to UI page have been a headache and three regarding issues ([SPARK-23904](https://issues.apache.org/jira/browse/SPARK-23904), [SPARK-25380](https://issues.apache.org/jira/browse/SPARK-25380), [SPARK-26103](https://issues.apache.org/jira/browse/SPARK-26103)) are filed in 3 months which doesn't look like a thing we can say end users should take a workaround. One thing we may be aware is that huge plan is not generated not only from nested join, but also from lots of columns, like SPARK-23904. For SPARK-25380 we are not aware of which parts generate huge plan. So we might feel easier and flexible to just truncate to specific size rather than applying conditions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 @zsxwing @dongjoon-hyun @steveloughran Thanks all for the valuable feedback! I applied review comments. While I covered the new feature with new UTs, I'm yet to test this manually with HDFS. I'll find the time to do manual test in next week. For cloud storages, TBH, it's not easy for me to do manual test against them, so I'd wish to lean on reviewers' eyes and experiences. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r232869187 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" --- End diff -- This looks like beyond of this PR: we can address it in separate PR. Could you raise another one? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r231717554 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" + +renameCompletedFiles: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage. --- End diff -- Totally agreed, and that matches the option 3 I've proposed. And option 1 would not affect much on critical path in a batch since rename operations will be enqueued and background thread will take care. For option 1, guaranteeing makes the thing being complicated. If we are OK to NOT guarantee that all processed files are renamed, we can take the renaming in background (like option 1) without handling backpressure, and simply drop the requests in queue with logging if the size is beyond the threshold or JVM is shutting down. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r231695749 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" + +renameCompletedFiles: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage. --- End diff -- @dongjoon-hyun For Storm, it renames input file twice, 1. in process 2. completed (actually it is not a rename, but move to archive directory). HDFS spout is created at 2015 which I don't expect there's deep consideration on cloud storage. For Flink I have no idea, I'll explore how they handle it. I think the feature is just an essential thing in ETL situation: a comment in JIRA clearly shows why the feature is needed. https://issues.apache.org/jira/browse/SPARK-20568?focusedCommentId=16356929&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16356929 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r231429484 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" + +renameCompletedFiles: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage. --- End diff -- Hi @dongjoon-hyun , thanks for pointing out good point! I was being concerned about only filesystem/HDFS case and not familiar with cloud environment. I guess we have possible options here: 1. Rename in background thread. For option 1, we may want to restrict the max files to enqueue, and when it reaches the max we may handle some of them synchronously. And we also may need to postpone JVM shutdown until all enqueued files are renamed. 2. Provide additional option: delete (options are mutually exclusive) Actually the actions end users are expected to take are 1. moving to archive directory (with compression or not) 2. delete periodically. If moving/renaming require non-trivial cost, end users may want to just delete files directly without backing up. 3. Document the overhead to description of option. While we can not clearly say how much the cost is, we can explain the fact the cleanup operation may affect processing of batch. Provided options are not mutually exclusive. cc. to @steveloughran - I think you're expert on cloud storage: could you provide your thought on this? also cc. to @zsxwing in case of missing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 cc. @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 @zsxwing Given that Spark 2.4 vote passes, could we revisit and make progress on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22952 I feel the patch is simple to skip verifying manually against HDFS, but I'll try to spin up HDFS cluster and test this manually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/22952 [SPARK-20568][SS] Rename files which are completed in previous batch ## What changes were proposed in this pull request? This patch adds the option to rename files which are completed in previous batch, so that end users can clean up processed files to save their storage. It is only applied to "micro-batch", since for batch all input files must be kept to get same result across multiple query executions. ## How was this patch tested? Added UT, manually tested with Mac local. (The logic is very simple so not sure we need to verify with HDFS manually.) You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-20568 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22952.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22952 commit 8a1d2e187c667833b2de8eb4cba2fa04dca9c6ff Author: Jungtaek Lim Date: 2018-11-05T04:32:51Z SPARK-20568 Rename files which are completed in previous batch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r225752604 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r224320793 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_DELEGATION_TOKEN_ENABLED, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( --- End diff -- Having an utility trait or utility singleton object could reduce the overkill, but personally I'd be OK on allowing 5~10 lines of duplication. If we are likely to leverage Scala reflection other than catalyst continuously (HBaseDelegationTokenProvider does it for two times), we could have utility class for that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r224323537 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -647,4 +647,42 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED = +ConfigBuilder("spark.kafka.delegation.token.enabled") + .doc("Set to 'true' for obtaining delegation token from kafka.") + .booleanConf + .createWithDefault(false) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = +ConfigBuilder("spark.kafka.bootstrap.servers") --- End diff -- And I would rather say it should be a flag to enable/disable on delegation token. Not all end users who use secured Kafka cluster want to leverage delegation token. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r224334764 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r224322849 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -647,4 +647,42 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED = +ConfigBuilder("spark.kafka.delegation.token.enabled") + .doc("Set to 'true' for obtaining delegation token from kafka.") + .booleanConf + .createWithDefault(false) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = +ConfigBuilder("spark.kafka.bootstrap.servers") --- End diff -- While it is not possible to provide relevant configuration to source/sink, pre-defining Kafka related configurations one-by-one in here feels me as being too coupled with Kafka. It might also give confusion on where to put configuration on Kafka source/sink: this configuration must be only used for delegation token, but I can't indicate it from both configuration name as well as its doc. My 2 cents is just reserving prefix `spark.kafka.token` or similar, and leave a comment and don't define anything here. Would like to hear how committers think about how to add external configurations on Spark conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r224338353 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222840402 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend
[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22633#discussion_r222832931 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java --- @@ -0,0 +1,64 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package test.org.apache.spark.sql.streaming; + +import java.io.File; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.test.TestSparkSession; +import org.apache.spark.util.Utils; + +public class JavaDataStreamReaderWriterSuite { + private SparkSession spark; + private String input; + + @Before + public void setUp() { +spark = new TestSparkSession(); +input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString(); + } + + @After + public void tearDown() { +Utils.deleteRecursively(new File(input)); +spark.stop(); +spark = null; + } + + @Test + public void testForeachBatchAPI() { --- End diff -- MINOR: I guess it will be duplicated effort on both Scala and Java suite, but IMHO adding sanity check wouldn't hurt much and prevent further possible misses. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 @gaborgsomogyi Yeah... I'm just waiting for it. Btw I proposed solution on SPARK-10816 as well and it is also waiting for response. I'm going to work on another item or review others so that I can avoid being blocked by Spark 2.4 RC. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 Kindly ask for reviewing. Please never mind when you're busy with fixing bugs on Spark 2.4 RC. @gaborgsomogyi I guess I left two things for committer decision: 1. define soft boundary and log when pooled objects exceed the boundary 2. documentation. Do you have more to review? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22579: [SPARK-25429][SQL] Use Set instead of Array to im...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22579#discussion_r221177626 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -83,7 +83,7 @@ class SQLAppStatusListener( // track of the metrics knows which accumulators to look at. val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList --- End diff -- If we are going to make Set, I guess we don't need to sort it, and may also don't need to convert to List and Set again. Does changing List to Set affect any content on UI page? Just wanted to double check why we have been sorting the accumulator ids, and whether this patch breaks the intention or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 Just rebased. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 @gaborgsomogyi Totally makes sense. Let me address while the patch is reviewed by committers. I may get recommendations to rename the config or even more, so addressing documentation would be the last part. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22331: [SPARK-25331][SS] Make FileStreamSink ignore part...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22331#discussion_r219399313 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StagingFileCommitProtocol.scala --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.hadoop.fs.{FileAlreadyExistsException, FileContext, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +class StagingFileCommitProtocol(jobId: String, path: String) + extends FileCommitProtocol with Serializable with Logging + with ManifestCommitProtocol { + private var stagingDir: Option[Path] = None --- End diff -- Looks like you're using Option but always call `.get` without any checking. In `setupTask` it is fine since assignment is placed in there, but in `newTaskTempFile` we may be better to guard with `require` which achieves fail-fast and let `.get` always succeed later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 According to the discussion on SPARK-10816, I'm holding up effort to improve and plan to discuss further from JIRA issue. I guess someone interested for this patch can still review or try this out and share feedback. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r219367280 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -18,222 +18,247 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.io.Closeable import java.util.concurrent.TimeoutException import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException} import org.apache.kafka.common.TopicPartition -import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread} + +/** + * This class simplifies the usages of Kafka consumer in Spark SQL Kafka connector. + * + * NOTE: Like KafkaConsumer, this class is not thread-safe. + * NOTE for contributors: It is possible for the instance to be used from multiple callers, + * so all the methods should not rely on current cursor and use seek manually. + */ +private[kafka010] class InternalKafkaConsumer( +val topicPartition: TopicPartition, +val kafkaParams: ju.Map[String, Object]) extends Closeable with Logging { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + private val consumer = createConsumer -private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. - * - * If the record is invisible (either a - * transaction message, or an aborted message when the consumer's `isolation.level` is - * `read_committed`), it will be skipped and this method will try to fetch next available record - * within [offset, untilOffset). - * - * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will - * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this - * method will try to fetch next available record within [offset, untilOffset). - * - * When this method tries to skip offsets due to either invisible messages or data loss and - * reaches `untilOffset`, it will return `null`. + * Poll messages from Kafka starting from `offset` and returns a pair of "list of consumer record" + * and "offset after poll". The list of consumer record may be empty if the Kafka consumer fetches + * some messages but all of them are not visible messages (either transaction messages, + * or aborted messages when `isolation.level` is `read_committed`). * - * @param offset the offset to fetch. - * @param untilOffsetthe max offset to fetch. Exclusive. - * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. - * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at - * offset if available, or throw exception.when `failOnDataLoss` is `false`, - * this method will either return record at offset if available, or return - * the next earliest available record less than untilOffset, or null. It - * will not throw any exception. + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. */ - def get( - offset: Long, - untilOffset: Long, - pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss) + def fetch(offset: Long, pollTimeoutMs: Long) + : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = { +// Seek to the offset because we may call seekToBeginning or seekToEnd before this. +seek(offset) +val p = consumer.poll(pollTimeoutMs) +val r = p.records(topicPartition) +logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") +val offsetAfterPoll = consumer.position(topicPartition) +logDebug(s"Offset changed from $offset to $of
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 If we are fine with ignoring the optimal delta of state, or OK with addressing it in follow-up issue (it should be addressed in same release version to avoid having state V1, V2, etc...), I think the only TODO is writing javadoc as well as deduplicate some codes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 @arunmahadevan We may want to be aware is that the requirement is pretty different from other streaming frameworks like Flink, which normally set a long period of checkpoint interval and do a full snapshot (though it supports incremental checkpoint, which deals with how it minimizes amount of storing data). Here in Spark, we are expecting smaller batch interval, and Spark deals with the requirement as storing "delta" of state change. The behavior brings concern about the strategy of how we store and how we remove the state. Let's say we have 3 rows in group in batch result and there're also 3 rows in same group in state, and we want to replace state with new batch result. For full snapshot removing 3 rows first and putting 3 rows may not matter much, but with delta approach, we should compare them side-by-side and bring less changes on state. The difference is not trivial one for session window, because arbitrary changes are required: for example, two different sessions in state can be merged later when late events come in, then we should have to overwrite one and remove others. Some new sessions can be created as well as existing session, and we want to overwrite session if the new output session is originated from old state, and append session if not. For other window, it is just a "put" because there's no group and we are just safe to put (overwrite if any, and without evict there's no need to remove). The different requirements between time window and session window are not easy to combine into one. That's what I realized the difficulty of state part for session window, and that's why I feel I need to make change on streaming part. For batch part current patch is doing OK. Btw, we can assume `AssignWindows` as `TimeWindowing` and `SessionWindowing` as we are logically assign rows to individual window. So unless we would like to support custom window like dynamic gap session window, I think we can address it later whenever needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 Please review the general approach and direction first. I'm planning to spend time to rewrite streaming part to tightly integrate logic with state so that updating state is going to be minimized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22482 The patch is a bit huge, so I'm not sure we would be better to squash commits into one before reviewing. Two TODOs are left hence marking the patch as WIP, but closer to be a complete patch: 1. Optimal implementation of state for session window. It borrowed the state implementation from streaming join since it fits the necessary concept of state for session window, but it may not be optimal one so I'm going to see we can have better implementation. 2. Javadoc (Maybe structured streaming guide doc too?) I didn't add javadoc yet to speed up POC and actual development, but to complete the patch I guess I need to write javadoc for new classes as well as methods (maybe). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22482: WIP - [SPARK-10816][SS] Support session window na...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/22482 WIP - [SPARK-10816][SS] Support session window natively ## What changes were proposed in this pull request? This patch proposes native support of session window, like Spark has been supporting for time window. Please refer the attached doc in [SPARK-10816](https://issues.apache.org/jira/browse/SPARK-10816) for more details on rationalization, concepts, and limitation, etc. In point of end users' view, only the change is addition of "session" SQL function. End users could define query with session window as replacing "window" function to "session" function, and "window" column to "session" column. After then the patch will provide same experience with time window. Internally, this patch will change the physical plan of aggregation a bit: if there's session function being used in query, it will sort the input rows as "grouping keys" + "session", and merge overlapped sessions into one with applying aggregations, so it's like a sort based aggregation but the unit of group is grouping keys + session. Due to handle late event, there's a case multiple session windows co-exist per key which are not yet to evict. This patch handles the case via borrowing state implementation from streaming join which can handle multiple values for given key. ## How was this patch tested? Many UTs are added to verify session window queries for both batch and streaming. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-10816 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22482.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22482 commit a1af74611df7dd5b979fc1a288de96e0b3d415da Author: Jungtaek Lim Date: 2018-09-04T23:10:47Z WIP nothing worked, just recording the progress commit be502485047283e203933a4d78e3b580a0c567df Author: Jungtaek Lim Date: 2018-09-06T04:36:11Z WIP not working yet... lots of implementations needed commit 7c60c0ad922ddacf025ad4762b85d06ab7cb258f Author: Jungtaek Lim Date: 2018-09-06T13:31:08Z WIP Finished implementing UpdatingSessionIterator commit 4e8c260a6e6b73b9bcd347ca242b8e77aedf8d1e Author: Jungtaek Lim Date: 2018-09-07T08:35:32Z WIP add verification on precondition "rows in iterator are sorted by key" commit 39069ded62dc5836b0b0f7c8ec7fb8ce869e5292 Author: Jungtaek Lim Date: 2018-09-08T04:36:46Z Rename SymmetricHashJoinStateManager to MultiValuesStateManager * This will be also used from session window state as well commit c2716340e008000e1fcc5e4d3fcf9befa419ff77 Author: Jungtaek Lim Date: 2018-09-08T04:41:37Z Move package of UpdatingSessionIterator commit df4cffd5fd1ea82be509f1cd97e5fc3a7ef8acb6 Author: Jungtaek Lim Date: 2018-09-10T05:52:28Z WIP add MergingSortWithMultiValuesStateIterator, now integrating with stateful operators (WIP...) commit 79e32b918c3db41c7d6c1c1d55276d3f696746d5 Author: Jungtaek Lim Date: 2018-09-13T06:54:37Z WIP the first version of working one! Still have lots of TODOs and FIXMEs to go commit fb7aa17488e5753c5460f383e1b0f4bedca6dee8 Author: Jungtaek Lim Date: 2018-09-13T08:13:45Z Add more explanations commit 9f41b9d6e7960031c52603bd1da9aeca747e1dfb Author: Jungtaek Lim Date: 2018-09-13T08:49:01Z Silly bugfix & block session window for batch query as of now We can enable it but there're lots of approaches on aggregations in batch side... * AggUtils.planAggregateWithoutDistinct * AggUtils.planAggregateWithOneDistinct * RewriteDistinctAggregates * AggregateInPandasExec So unless we are sure which things to support, just block them for now... commit 0a62b1f0c274859061c0f3ab2c63450052985ac7 Author: Jungtaek Lim Date: 2018-09-13T09:28:34Z More works: majorly split out updating session to individual physical node * we will leverage such node for batch case if we want commit acb5a0c42641041ca3adae2c9f2293b4dfa837cf Author: Jungtaek Lim Date: 2018-09-13T09:38:00Z Fix a silly bug and also add check for session window against batch query commit 1b6502c92231b7aaa9d0d6f620a5bcc624b862ec Author: Jungtaek Lim Date: 2018-09-13T11:30:15Z WIP Fixed eviction on update mode commit fec9a8ae5c1d421322738bd474fcb5508421f51a Author: Jungtaek Lim Date: 2018-09-13T12:48:07Z WIP found root reason of broken UT... fixed it commit c87e4eebcc53c81328d52e4d4ea270bc
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r218955883 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.ConcurrentHashMap + +import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} +import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey + +/** + * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. + * + * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on + * the class, and same contract applies: after using the borrowed object, you must either call + * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object + * should be destroyed. + * + * The soft capacity of pool is determined by "spark.sql.kafkaConsumerCache.capacity" config value, + * and the pool will have reasonable default value if the value is not provided. + * (The instance will do its best effort to respect soft capacity but it can exceed when there's + * a borrowing request and there's neither free space nor idle object to clear.) + * + * This class guarantees that no caller will get pooled object once the object is borrowed and + * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] + * unless caller shares the object to multiple threads. + */ +private[kafka010] class InternalKafkaConsumerPool( +objectFactory: ObjectFactory, +poolConfig: PoolConfig) { + + // the class is intended to have only soft capacity + assert(poolConfig.getMaxTotal < 0) + + private lazy val pool = { +val internalPool = new GenericKeyedObjectPool[CacheKey, InternalKafkaConsumer]( + objectFactory, poolConfig) + internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener) +internalPool + } + + /** + * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no idle object for the key, + * the pool will create the [[InternalKafkaConsumer]] object. + * + * If the pool doesn't have idle object for the key and also exceeds the soft capacity, + * pool will try to clear some of idle objects. + * + * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise + * the object will be kept in pool as active object. + */ + def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { +updateKafkaParamForKey(key, kafkaParams) + +if (getTotal == poolConfig.getSoftMaxTotal()) { + pool.clearOldest() +} + +pool.borrowObject(key) + } + + /** Returns borrowed object to the pool. */ + def returnObject(consumer: InternalKafkaConsumer): Unit = { +pool.returnObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates (destroy) borrowed object to the pool. */ + def invalidateObject(consumer: InternalKafkaConsumer): Unit = { +pool.invalidateObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates all idle consumers for the key */ + def invalidateKey(key: CacheKey): Unit = { +pool.clear(key) + } + + /** + * Closes the keyed object pool. Once the pool is closed, + * bo
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r218777053 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.ConcurrentHashMap + +import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} +import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey + +/** + * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. + * + * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on + * the class, and same contract applies: after using the borrowed object, you must either call + * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object + * should be destroyed. + * + * The soft capacity of pool is determined by "spark.sql.kafkaConsumerCache.capacity" config value, + * and the pool will have reasonable default value if the value is not provided. + * (The instance will do its best effort to respect soft capacity but it can exceed when there's + * a borrowing request and there's neither free space nor idle object to clear.) + * + * This class guarantees that no caller will get pooled object once the object is borrowed and + * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] + * unless caller shares the object to multiple threads. + */ +private[kafka010] class InternalKafkaConsumerPool( +objectFactory: ObjectFactory, +poolConfig: PoolConfig) { + + // the class is intended to have only soft capacity + assert(poolConfig.getMaxTotal < 0) + + private lazy val pool = { +val internalPool = new GenericKeyedObjectPool[CacheKey, InternalKafkaConsumer]( + objectFactory, poolConfig) + internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener) +internalPool + } + + /** + * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no idle object for the key, + * the pool will create the [[InternalKafkaConsumer]] object. + * + * If the pool doesn't have idle object for the key and also exceeds the soft capacity, + * pool will try to clear some of idle objects. + * + * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise + * the object will be kept in pool as active object. + */ + def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { +updateKafkaParamForKey(key, kafkaParams) + +if (getTotal == poolConfig.getSoftMaxTotal()) { + pool.clearOldest() +} + +pool.borrowObject(key) + } + + /** Returns borrowed object to the pool. */ + def returnObject(consumer: InternalKafkaConsumer): Unit = { +pool.returnObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates (destroy) borrowed object to the pool. */ + def invalidateObject(consumer: InternalKafkaConsumer): Unit = { +pool.invalidateObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates all idle consumers for the key */ + def invalidateKey(key: CacheKey): Unit = { +pool.clear(key) + } + + /** + * Closes the keyed object pool. Once the pool is closed, + * bo
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 > just wondering why org.apache.spark.sql.kafka010.CachedKafkaProducer uses com.google.common.cache.LoadingCache? Because KafkaProducer is thread-safe unless it enables transaction, hence encouraged for multiple tasks to use concurrently. For consumer it is not thread-safe so we guarded with custom logic, and this patch proposes to guard with Apache Commons Pool. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 Now vote for Spark 2.4 is in progress. If we are not in stand-by mode for any blocker issues for Spark 2.4 RC, I'd be really happy if someone could revisit this and continue reviewing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/22138 Regarding metrics in FetchedDataPool, I just add basic metrics so that tests can leverage on verification. I was adding numActive as well as numIdle, but tracking and measuring them needs more resources, hence I'd postpone it unless someone asks me to add. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r215867141 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -18,222 +18,247 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.io.Closeable import java.util.concurrent.TimeoutException import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException} import org.apache.kafka.common.TopicPartition -import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread} + +/** + * This class simplifies the usages of Kafka consumer in Spark SQL Kafka connector. + * + * NOTE: Like KafkaConsumer, this class is not thread-safe. + * NOTE for contributors: It is possible for the instance to be used from multiple callers, + * so all the methods should not rely on current cursor and use seek manually. + */ +private[kafka010] class InternalKafkaConsumer( +val topicPartition: TopicPartition, +val kafkaParams: ju.Map[String, Object]) extends Closeable with Logging { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + private val consumer = createConsumer -private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. - * - * If the record is invisible (either a - * transaction message, or an aborted message when the consumer's `isolation.level` is - * `read_committed`), it will be skipped and this method will try to fetch next available record - * within [offset, untilOffset). - * - * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will - * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this - * method will try to fetch next available record within [offset, untilOffset). - * - * When this method tries to skip offsets due to either invisible messages or data loss and - * reaches `untilOffset`, it will return `null`. + * Poll messages from Kafka starting from `offset` and returns a pair of "list of consumer record" + * and "offset after poll". The list of consumer record may be empty if the Kafka consumer fetches + * some messages but all of them are not visible messages (either transaction messages, + * or aborted messages when `isolation.level` is `read_committed`). * - * @param offset the offset to fetch. - * @param untilOffsetthe max offset to fetch. Exclusive. - * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. - * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at - * offset if available, or throw exception.when `failOnDataLoss` is `false`, - * this method will either return record at offset if available, or return - * the next earliest available record less than untilOffset, or null. It - * will not throw any exception. + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. */ - def get( - offset: Long, - untilOffset: Long, - pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss) + def fetch(offset: Long, pollTimeoutMs: Long) + : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = { +// Seek to the offset because we may call seekToBeginning or seekToEnd before this. +seek(offset) +val p = consumer.poll(pollTimeoutMs) +val r = p.records(topicPartition) +logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") +val offsetAfterPoll = consumer.position(topicPartition) +logDebug(s"Offset changed from $offset to $of
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r215818860 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala --- @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.test.SharedSQLContext + +class FetchedPoolSuite extends SharedSQLContext { + type Record = ConsumerRecord[Array[Byte], Array[Byte]] + + private val dummyBytes = "dummy".getBytes + + test("acquire fresh one") { +val dataPool = FetchedDataPool.build + +val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) + +assert(dataPool.getCache.get(cacheKey).isEmpty) + +val data = dataPool.acquire(cacheKey, 0) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(dataPool.getCache(cacheKey).head.inUse) + +data.withNewPoll(testRecords(0, 5).listIterator, 5) + +dataPool.release(cacheKey, data) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(!dataPool.getCache(cacheKey).head.inUse) + +dataPool.shutdown() + } + + test("acquire fetched data from multiple keys") { +val dataPool = FetchedDataPool.build + +val cacheKeys = (0 to 10).map { partId => + CacheKey("testgroup", new TopicPartition("topic", partId)) +} + +assert(dataPool.getCache.size === 0) +cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) } + +val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0))) + +assert(dataPool.getCache.size === cacheKeys.size) +cacheKeys.map { key => + assert(dataPool.getCache(key).size === 1) + assert(dataPool.getCache(key).head.inUse) +} + +dataList.map { case (_, data) => + data.withNewPoll(testRecords(0, 5).listIterator, 5) +} + +dataList.foreach { case (key, data) => + dataPool.release(key, data) +} + +assert(dataPool.getCache.size === cacheKeys.size) +cacheKeys.map { key => + assert(dataPool.getCache(key).size === 1) + assert(!dataPool.getCache(key).head.inUse) +} + +dataPool.shutdown() + } + + test("continuous use of fetched data from single key") { +val dataPool = FetchedDataPool.build + +val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) + +assert(dataPool.getCache.get(cacheKey).isEmpty) + +val data = dataPool.acquire(cacheKey, 0) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(dataPool.getCache(cacheKey).head.inUse) + +data.withNewPoll(testRecords(0, 5).listIterator, 5) + +(0 to 3).foreach { _ => data.next() } + +dataPool.release(cacheKey, data) + +// suppose next batch + +val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData) + +assert(data.eq(data2)) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(dataPool.getCache(cacheKey).head.inUse) + +dataPool.release(cacheKey, data2) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(!dataPool.getCache(cacheKey).head.inUse) + +dataPool.shutdown() + } + + test("multiple tasks referring same key continuously using fetched data") { +val dataPool = FetchedDataPool.build +
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r215637613 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -18,222 +18,247 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.io.Closeable import java.util.concurrent.TimeoutException import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException} import org.apache.kafka.common.TopicPartition -import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread} + +/** + * This class simplifies the usages of Kafka consumer in Spark SQL Kafka connector. + * + * NOTE: Like KafkaConsumer, this class is not thread-safe. + * NOTE for contributors: It is possible for the instance to be used from multiple callers, + * so all the methods should not rely on current cursor and use seek manually. + */ +private[kafka010] class InternalKafkaConsumer( +val topicPartition: TopicPartition, +val kafkaParams: ju.Map[String, Object]) extends Closeable with Logging { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + private val consumer = createConsumer -private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. - * - * If the record is invisible (either a - * transaction message, or an aborted message when the consumer's `isolation.level` is - * `read_committed`), it will be skipped and this method will try to fetch next available record - * within [offset, untilOffset). - * - * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will - * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this - * method will try to fetch next available record within [offset, untilOffset). - * - * When this method tries to skip offsets due to either invisible messages or data loss and - * reaches `untilOffset`, it will return `null`. + * Poll messages from Kafka starting from `offset` and returns a pair of "list of consumer record" + * and "offset after poll". The list of consumer record may be empty if the Kafka consumer fetches + * some messages but all of them are not visible messages (either transaction messages, + * or aborted messages when `isolation.level` is `read_committed`). * - * @param offset the offset to fetch. - * @param untilOffsetthe max offset to fetch. Exclusive. - * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. - * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at - * offset if available, or throw exception.when `failOnDataLoss` is `false`, - * this method will either return record at offset if available, or return - * the next earliest available record less than untilOffset, or null. It - * will not throw any exception. + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. */ - def get( - offset: Long, - untilOffset: Long, - pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss) + def fetch(offset: Long, pollTimeoutMs: Long) + : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = { +// Seek to the offset because we may call seekToBeginning or seekToEnd before this. +seek(offset) +val p = consumer.poll(pollTimeoutMs) +val r = p.records(topicPartition) +logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") +val offsetAfterPoll = consumer.position(topicPartition) +logDebug(s"Offset changed from $offset to $of
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r215635068 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.ConcurrentHashMap + +import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} +import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey + +/** + * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. + * + * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on + * the class, and same contract applies: after using the borrowed object, you must either call + * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object + * should be destroyed. + * + * The soft capacity of pool is determined by "spark.sql.kafkaConsumerCache.capacity" config value, + * and the pool will have reasonable default value if the value is not provided. + * (The instance will do its best effort to respect soft capacity but it can exceed when there's + * a borrowing request and there's neither free space nor idle object to clear.) + * + * This class guarantees that no caller will get pooled object once the object is borrowed and + * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] + * unless caller shares the object to multiple threads. + */ +private[kafka010] class InternalKafkaConsumerPool( +objectFactory: ObjectFactory, +poolConfig: PoolConfig) { + + // the class is intended to have only soft capacity + assert(poolConfig.getMaxTotal < 0) + + private lazy val pool = { +val internalPool = new GenericKeyedObjectPool[CacheKey, InternalKafkaConsumer]( + objectFactory, poolConfig) + internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener) +internalPool + } + + /** + * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no idle object for the key, + * the pool will create the [[InternalKafkaConsumer]] object. + * + * If the pool doesn't have idle object for the key and also exceeds the soft capacity, + * pool will try to clear some of idle objects. + * + * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise + * the object will be kept in pool as active object. + */ + def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { +updateKafkaParamForKey(key, kafkaParams) + +if (getTotal == poolConfig.getSoftMaxTotal()) { + pool.clearOldest() +} + +pool.borrowObject(key) + } + + /** Returns borrowed object to the pool. */ + def returnObject(consumer: InternalKafkaConsumer): Unit = { +pool.returnObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates (destroy) borrowed object to the pool. */ + def invalidateObject(consumer: InternalKafkaConsumer): Unit = { +pool.invalidateObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates all idle consumers for the key */ + def invalidateKey(key: CacheKey): Unit = { +pool.clear(key) + } + + /** + * Closes the keyed object pool. Once the pool is closed, + * bo
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r215313888 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.ConcurrentHashMap + +import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} +import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey + +/** + * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. + * + * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on + * the class, and same contract applies: after using the borrowed object, you must either call + * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object + * should be destroyed. + * + * The soft capacity of pool is determined by "spark.sql.kafkaConsumerCache.capacity" config value, + * and the pool will have reasonable default value if the value is not provided. + * (The instance will do its best effort to respect soft capacity but it can exceed when there's + * a borrowing request and there's neither free space nor idle object to clear.) + * + * This class guarantees that no caller will get pooled object once the object is borrowed and + * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] + * unless caller shares the object to multiple threads. + */ +private[kafka010] class InternalKafkaConsumerPool( +objectFactory: ObjectFactory, +poolConfig: PoolConfig) { + + // the class is intended to have only soft capacity + assert(poolConfig.getMaxTotal < 0) + + private lazy val pool = { +val internalPool = new GenericKeyedObjectPool[CacheKey, InternalKafkaConsumer]( + objectFactory, poolConfig) + internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener) +internalPool + } + + /** + * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no idle object for the key, + * the pool will create the [[InternalKafkaConsumer]] object. + * + * If the pool doesn't have idle object for the key and also exceeds the soft capacity, + * pool will try to clear some of idle objects. + * + * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise + * the object will be kept in pool as active object. + */ + def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { +updateKafkaParamForKey(key, kafkaParams) + +if (getTotal == poolConfig.getSoftMaxTotal()) { + pool.clearOldest() +} + +pool.borrowObject(key) + } + + /** Returns borrowed object to the pool. */ + def returnObject(consumer: InternalKafkaConsumer): Unit = { +pool.returnObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates (destroy) borrowed object to the pool. */ + def invalidateObject(consumer: InternalKafkaConsumer): Unit = { +pool.invalidateObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates all idle consumers for the key */ + def invalidateKey(key: CacheKey): Unit = { +pool.clear(key) + } + + /** + * Closes the keyed object pool. Once the pool is closed, + * bo
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r215313215 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala --- @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.test.SharedSQLContext + +class FetchedPoolSuite extends SharedSQLContext { + type Record = ConsumerRecord[Array[Byte], Array[Byte]] + + private val dummyBytes = "dummy".getBytes + + test("acquire fresh one") { +val dataPool = FetchedDataPool.build + +val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) + +assert(dataPool.getCache.get(cacheKey).isEmpty) + +val data = dataPool.acquire(cacheKey, 0) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(dataPool.getCache(cacheKey).head.inUse) + +data.withNewPoll(testRecords(0, 5).listIterator, 5) + +dataPool.release(cacheKey, data) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(!dataPool.getCache(cacheKey).head.inUse) + +dataPool.shutdown() + } + + test("acquire fetched data from multiple keys") { +val dataPool = FetchedDataPool.build + +val cacheKeys = (0 to 10).map { partId => + CacheKey("testgroup", new TopicPartition("topic", partId)) +} + +assert(dataPool.getCache.size === 0) +cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) } + +val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0))) + +assert(dataPool.getCache.size === cacheKeys.size) +cacheKeys.map { key => + assert(dataPool.getCache(key).size === 1) + assert(dataPool.getCache(key).head.inUse) +} + +dataList.map { case (_, data) => + data.withNewPoll(testRecords(0, 5).listIterator, 5) +} + +dataList.foreach { case (key, data) => + dataPool.release(key, data) +} + +assert(dataPool.getCache.size === cacheKeys.size) +cacheKeys.map { key => + assert(dataPool.getCache(key).size === 1) + assert(!dataPool.getCache(key).head.inUse) +} + +dataPool.shutdown() + } + + test("continuous use of fetched data from single key") { +val dataPool = FetchedDataPool.build + +val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) + +assert(dataPool.getCache.get(cacheKey).isEmpty) + +val data = dataPool.acquire(cacheKey, 0) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(dataPool.getCache(cacheKey).head.inUse) + +data.withNewPoll(testRecords(0, 5).listIterator, 5) + +(0 to 3).foreach { _ => data.next() } + +dataPool.release(cacheKey, data) + +// suppose next batch + +val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData) + +assert(data.eq(data2)) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(dataPool.getCache(cacheKey).head.inUse) + +dataPool.release(cacheKey, data2) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(!dataPool.getCache(cacheKey).head.inUse) + +dataPool.shutdown() + } + + test("multiple tasks referring same key continuously using fetched data") { +val dataPool = FetchedDataPool.build +