[GitHub] [spark] zhongjingxiong opened a new pull request #35142: [SPARK-37708][K8S] Provides basic images that support centos7
zhongjingxiong opened a new pull request #35142: URL: https://github.com/apache/spark/pull/35142 ### What changes were proposed in this pull request? To adapt to the production environment, CentOS 7 basic image is provided. ### Why are the changes needed? If we compile Python on centos and run this package on K8S, I submit it this way: `spark-submit \ --archives s3a://zhongjingxiong/python3.6.9.tgz#python3.6 \ --conf spark.pyspark.driver.python=python3.6/bin/python3 \ --conf spark.pyspark.python=python3.6/bin/python3 \ examples/src/main/python/pi.py 10 ` We will report dependency loss. `Unpacking an archive s3a://zhongjingxiong/python3.6.9.tgz#python3 from /tmp/spark-0002ed81-3a01-4e40-a9b7-826295b86ece/python3.6.9.tgz to /opt/spark/work-dir/./python3 Traceback (most recent call last): File "/tmp/spark-0002ed81-3a01-4e40-a9b7-826295b86ece/pi.py", line 21, in from pyspark.sql import SparkSession File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 121, in File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/__init__.py", line 42, in File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 27, in async def _ag(): File "/opt/spark/work-dir/python3/lib/python3.6/ctypes/__init__.py", line 7, in from _ctypes import Union, Structure, Array ImportError: libffi.so.6: cannot open shared object file: No such file or directory 22/01/07 22:56:11 INFO ShutdownHookManager: Shutdown hook called` But it works fine when running on YARN. Therefore, it is better to provide an image that supports centos to run Spark. ### Does this PR introduce _any_ user-facing change? Yes,this is a new docker file exposed to the customer. ### How was this patch tested? It has been deployed in the production environment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on pull request #34083: Add docs about using Shiv for packaging (similar to PEX)
zhouyejoe commented on pull request #34083: URL: https://github.com/apache/spark/pull/34083#issuecomment-1007900746 @mridulm I think we can reopen this until we have done enough testing around using Shiv here. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #34083: Add docs about using Shiv for packaging (similar to PEX)
mridulm commented on pull request #34083: URL: https://github.com/apache/spark/pull/34083#issuecomment-1007900266 Looks like this got dropped from our review radar @HyukjinKwon. Do you think this is valid ? If yes, we can reopen and review ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wzhfy commented on pull request #35137: [TEST] Fix test logic by getting sleep interval in task
wzhfy commented on pull request #35137: URL: https://github.com/apache/spark/pull/35137#issuecomment-1007888966 cc @Ngone51 @agrawaldevesh, could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wzhfy removed a comment on pull request #35137: [TEST] Fix test logic by getting sleep interval in task
wzhfy removed a comment on pull request #35137: URL: https://github.com/apache/spark/pull/35137#issuecomment-1007875631 cc @Ngone51 @agrawaldevesh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wzhfy commented on pull request #35137: [TEST] Fix test logic by getting sleep interval in task
wzhfy commented on pull request #35137: URL: https://github.com/apache/spark/pull/35137#issuecomment-1007875631 cc @Ngone51 @agrawaldevesh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #35141: [SPARK-37843][CORE] Suppress NoSuchFieldError at setMDCForTask
dongjoon-hyun closed pull request #35141: URL: https://github.com/apache/spark/pull/35141 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #35141: [SPARK-37843][CORE] Suppress NoSuchFieldError at setMDCForTask
dongjoon-hyun commented on pull request #35141: URL: https://github.com/apache/spark/pull/35141#issuecomment-1007867336 Thank you, @viirya . The following is the reproducer in Java 17/Maven/AppleSilicon combination. Java 17/SBT/AppleSilicon works fine without this issue. ``` $ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.SparkSubmitSuite test ... SparkSubmitSuite: - prints usage on empty input - prints usage with only --help - prints error with unrecognized options - handle binary specified but not class - handles arguments with --key=val - handles arguments to user program - handles arguments to user program with name collision - print the right queue name - SPARK-24241: do not fail fast if executor num is 0 when dynamic allocation is enabled - specify deploy mode through configuration - handles YARN cluster mode - handles YARN client mode - SPARK-33530: handles standalone mode with archives - handles standalone cluster mode - handles legacy standalone cluster mode - handles standalone client mode - handles mesos client mode - handles k8s cluster mode - automatically sets mainClass if primary resource is S3 JAR in client mode - automatically sets mainClass if primary resource is S3 JAR in cluster mode - error informatively when mainClass isn't set and S3 JAR doesn't exist - handles confs with flag equivalents - SPARK-21568 ConsoleProgressBar should be enabled only in shells 2022-01-07 16:54:31.508 - stderr> SLF4J: Class path contains multiple SLF4J bindings. 2022-01-07 16:54:31.508 - stderr> SLF4J: Found binding in [jar:file:/Users/dongjoon/.m2/repository/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class] 2022-01-07 16:54:31.508 - stderr> SLF4J: Found binding in [jar:file:/Users/dongjoon/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.17.1/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] 2022-01-07 16:54:31.509 - stderr> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 2022-01-07 16:54:31.509 - stderr> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2022-01-07 16:54:32.659 - stderr> Exception in thread "Executor task launch worker-0" java.lang.NoSuchFieldError: mdc 2022-01-07 16:54:32.659 - stderr>at org.apache.log4j.MDCFriend.fixForJava9(MDCFriend.java:11) 2022-01-07 16:54:32.659 - stderr>at org.slf4j.impl.Log4jMDCAdapter.(Log4jMDCAdapter.java:38) 2022-01-07 16:54:32.659 - stderr>at org.slf4j.impl.StaticMDCBinder.getMDCA(StaticMDCBinder.java:59) 2022-01-07 16:54:32.659 - stderr>at org.slf4j.MDC.bwCompatibleGetMDCAdapterFromBinder(MDC.java:99) 2022-01-07 16:54:32.659 - stderr>at org.slf4j.MDC.(MDC.java:108) 2022-01-07 16:54:32.659 - stderr>at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$setMDCForTask(Executor.scala:750) 2022-01-07 16:54:32.659 - stderr>at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:441) 2022-01-07 16:54:32.659 - stderr>at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 2022-01-07 16:54:32.659 - stderr>at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 2022-01-07 16:54:32.659 - stderr>at java.base/java.lang.Thread.run(Thread.java:833) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] c21 commented on a change in pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys
c21 commented on a change in pull request #35138: URL: https://github.com/apache/spark/pull/35138#discussion_r780617335 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -396,6 +396,16 @@ object SQLConf { .booleanConf .createWithDefault(true) + val REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS = +buildConf("spark.sql.join.requireAllJoinKeysAsPartitionKeys") Review comment: would this config take effect for all physical operators having 2 children and requiring `ClusteredDistribution`? Example like `CoGroupExec`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] c21 commented on a change in pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys
c21 commented on a change in pull request #35138: URL: https://github.com/apache/spark/pull/35138#discussion_r780605984 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala ## @@ -451,30 +468,24 @@ case class HashShuffleSpec( false } - override def canCreatePartitioning: Boolean = true + override def canCreatePartitioning: Boolean = { +// To avoid potential data skew, we don't allow `HashShuffleSpec` to create partitioning if +// the hash partition keys are not the full join keys (the cluster keys). Then the planner +// will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all +// the join keys. +if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS)) { + distribution.clustering.forall(x => partitioning.expressions.exists(_.semanticEquals(x))) Review comment: Do we need to require `partitioning.expressions` to be exactly same with `distribution.clustering` as well? e.g. for followed cases: ``` partitioning.expressions: [a, b] distribution.clustering: [b, a] ``` ``` partitioning.expressions: [a, b, a] distribution.clustering: [a, b] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #35130: [SPARK-37839][SQL] DS V2 supports partial aggregate push-down AVG
beliefer commented on pull request #35130: URL: https://github.com/apache/spark/pull/35130#issuecomment-1007849173 > What's the new algorithm? It's simple before: if agg funcs contain `GeneralAggregateFunc`, don't try partial pushdown. Othwewise, try complete pushdown first, then partial pushdown. Yes. But AVG is common used aggregate function, we hope try my best to pushdown. Maybe we could construct Arerage in DS V2, so we keep the simply. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #35130: [SPARK-37839][SQL] DS V2 supports partial aggregate push-down AVG
beliefer commented on a change in pull request #35130: URL: https://github.com/apache/spark/pull/35130#discussion_r780603780 ## File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala ## @@ -223,7 +225,11 @@ abstract class JdbcDialect extends Serializable with Logging{ case f: GeneralAggregateFunc if f.name() == "AVG" => assert(f.inputs().length == 1) val distinct = if (f.isDistinct) "DISTINCT " else "" -Some(s"AVG($distinct${f.inputs().head})") +if (supportCompletePushDown) { + Some(s"AVG($distinct${f.inputs().head})") +} else { + Some(s"Sum($distinct${f.inputs().head}), Count($distinct${f.inputs().head})") Review comment: If Spark cannot complete pushdown AVG to database, we still want partial pushdown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aa1371 commented on pull request #35083: [WIP][SPARK-37798] PySpark Pandas API: Cross and conditional merging
aa1371 commented on pull request #35083: URL: https://github.com/apache/spark/pull/35083#issuecomment-1007846355 @HyukjinKwon - thoughts on the PR in its current state? Also, is it possible to see the specific formatting issues caught by the black formatter in the CI? It seems to contain some different black configuration than when I run it dev/format-python locally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #35124: [WIP][SPARK-37398][PYTHON] Inline type hints for python/pyspark/ml/classification.py
AmplabJenkins commented on pull request #35124: URL: https://github.com/apache/spark/pull/35124#issuecomment-1007838795 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #35141: [SPARK-37843][CORE] Suppress NoSuchFieldError at setMDCForTask
dongjoon-hyun commented on pull request #35141: URL: https://github.com/apache/spark/pull/35141#issuecomment-1007837947 Could you review this, @viirya and @HyukjinKwon ? I observed this when I run maven Jenkins test on our community M1 machine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun opened a new pull request #35141: [SPARK-37843][CORE] Suppress NoSuchFieldError at setMDCForTask
dongjoon-hyun opened a new pull request #35141: URL: https://github.com/apache/spark/pull/35141 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #32365: [SPARK-35228][SQL] Add expression ToHiveString for keep consistent between hive/spark format in df.show
github-actions[bot] closed pull request #32365: URL: https://github.com/apache/spark/pull/32365 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #32289: [SPARK-33357][K8S] Support Spark application managing with SparkAppHandle on Kubernetes
github-actions[bot] closed pull request #32289: URL: https://github.com/apache/spark/pull/32289 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #34024: [SPARK-36784][SHUFFLE][WIP] Handle DNS issues on executor to prevent shuffle nodes from getting added to exclude list
github-actions[bot] commented on pull request #34024: URL: https://github.com/apache/spark/pull/34024#issuecomment-1007834093 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #33878: [SPARK-36303][SQL] Refactor fourthteenth set of 20 query execution errors to use error classes
github-actions[bot] closed pull request #33878: URL: https://github.com/apache/spark/pull/33878 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #31267: [SPARK-21195][CORE] MetricSystem should pick up dynamically registered metrics in sources
github-actions[bot] closed pull request #31267: URL: https://github.com/apache/spark/pull/31267 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #34083: Add docs about using Shiv for packaging (similar to PEX)
github-actions[bot] closed pull request #34083: URL: https://github.com/apache/spark/pull/34083 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #34141: [SPARK-33887][SQL] Allow insert overwrite same table with static partition if using dynamic partition overwrite mode
github-actions[bot] commented on pull request #34141: URL: https://github.com/apache/spark/pull/34141#issuecomment-1007834075 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
github-actions[bot] commented on pull request #33174: URL: https://github.com/apache/spark/pull/33174#issuecomment-1007834106 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #34108: [SPARK-36638][SQL][TEST] Generalize OptimizeSkewedJoin - correctness
github-actions[bot] closed pull request #34108: URL: https://github.com/apache/spark/pull/34108 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yangwwei commented on pull request #34672: [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured
yangwwei commented on pull request #34672: URL: https://github.com/apache/spark/pull/34672#issuecomment-1007823318 hi @tgravescs thanks for getting back on this one : ). I think @attilapiros already mentioned > But it got sidetracked and become stale. Based on this experience I am afraid the ideal solution is a bit far away in the future. @attilapiros please share your thoughts. I understand this may not be the best/elegant solution, but it is simple and it works. I do not think hacking more things in the remote shuffle service side is better, this is a common problem that should be handled on the Spark side. If there is a better, actionable solution, I would love to explore more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] shardulm94 commented on a change in pull request #33446: [SPARK-36215][SHUFFLE] Add logging for slow fetches to diagnose external shuffle service issues
shardulm94 commented on a change in pull request #33446: URL: https://github.com/apache/spark/pull/33446#discussion_r780556023 ## File path: core/src/main/scala/org/apache/spark/TestUtils.scala ## @@ -448,6 +451,34 @@ private[spark] object TestUtils { EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) file.getPath } + + /** + * A log4j-specific log capturing mechanism. Provided with a class name, it will add a new + * appender to the log for that class to capture the output. log4j must be properly configured + * on the classpath (e.g., with slf4j-log4j12) for this to work. This should be closed when it is + * done to remove the temporary appender. + */ + class Log4jCapture(val loggerName: String) extends AutoCloseable { Review comment: Using `SparkFunSuite.withLogAppender` now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] shardulm94 commented on a change in pull request #33446: [SPARK-36215][SHUFFLE] Add logging for slow fetches to diagnose external shuffle service issues
shardulm94 commented on a change in pull request #33446: URL: https://github.com/apache/spark/pull/33446#discussion_r780519077 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1214,6 +1214,29 @@ package object config { .checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.") .createWithDefault(Int.MaxValue) + private[spark] val REDUCER_SHUFFLE_FETCH_SLOW_LOG_THRESHOLD_MS = +ConfigBuilder("spark.reducer.shuffleFetchSlowLogThreshold.time") + .doc("When fetching blocks from an external shuffle service is slower than expected, the " + +"fetch will be logged to allow for subsequent investigation. A fetch is determined " + +"to be slow if it has a total duration of at least this value, and a transfer rate " + +// cannot reference val REDUCER_SHUFFLE_FETCH_SLOW_LOG_THRESHOLD_BPS since its uninitialized Review comment: Both these confs reference each other in the doc string, so atleast one of these confs will have to be referenced using a string. ## File path: docs/configuration.md ## @@ -2064,6 +2064,28 @@ Apart from these, the following properties are also available, and may be useful 1.2.0 + + spark.reducer.shuffleFetchSlowLogThreshold.time + value of spark.reducer.shuffleFetchSlowLogThreshold.time Review comment: Fixed ## File path: docs/configuration.md ## @@ -2064,6 +2064,28 @@ Apart from these, the following properties are also available, and may be useful 1.2.0 + + spark.reducer.shuffleFetchSlowLogThreshold.time + value of spark.reducer.shuffleFetchSlowLogThreshold.time + +When fetching blocks from an external shuffle service is slower than expected, the +fetch will be logged to allow for subsequent investigation. A fetch is determined +to be slow if it has a total duration of at least this value, and a transfer rate +less than spark.reducer.shuffleFetchSlowLogThreshold.bytesPerSec. + + 3.2.0 + + + spark.reducer.shuffleFetchSlowLogThreshold.bytesPerSec + value of spark.reducer.shuffleFetchSlowLogThreshold.bytesPerSec Review comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sathiyapk commented on a change in pull request #34729: [SPARK-37475][SQL] Add scale parameter to floor and ceil functions
sathiyapk commented on a change in pull request #34729: URL: https://github.com/apache/spark/pull/34729#discussion_r780514098 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala ## @@ -658,6 +669,62 @@ class MathExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val intResultsB: Seq[Int] = Seq(31400, 31420, 31416, 314159000, 314159300, 314159260) ++ Seq.fill(7)(314159265) +def doubleResultsFloor(i: Int): Any = { + val results = Seq(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3, +3.1, 3.14, 3.141, 3.1415, 3.14159, 3.141592) + if (i <= 6) results(i).toLong else results(i) +} + +def doubleResultsCeil(i: Int): Any = { + val results = Seq(100.0, 10.0, 1.0, 1000.0, 100.0, 10.0, +4L, 3.2, 3.15, 3.142, 3.1416, 3.1416, 3.141593) + if (i <= 6) results(i).toLong else results(i) +} + +def floatResultsFloor(i: Int): Any = { + val results = Seq(0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 3L, +3.1d, 3.14d, 3.141d, 3.1414d, 3.14149d, 3.141499d) + if (i <= 6) results(i).toLong else results(i) +} + +def floatResultsCeil(i: Int): Any = { + val results = Seq(100.0f, 10.0f, 1.0f, 1000.0f, 100.0f, 10.0f, +4L, 3.2d, 3.15d, 3.142d, 3.1415d, 3.1415d, 3.1415d) + if (i <= 6) results(i).toLong else results(i) +} + +def shortResultsFloor(i: Int): Any = { + val results = Seq[Long](0L, 0L, 3L, 31000L, +31400L, 31410L, 31415L) ++ Seq.fill[Long](7)(31415) + results(i) +} + +def shortResultsCeil(i: Int): Any = { Review comment: Sorry, i am not sure i got it right. Actually we test `RoundCeil` and `RoundFloor` directly for positive `scale` on big decimal, i can add long and double in that case, if you prefer. But we can't test `RoundCeil` and `RoundFloor` directly for negative `scale` because the casting to `Long` for negative scale happens one step before. May be if i put the casting in the `RoundCeil` and `RoundFloor` objects and let the `ExpressionBuilder` call these objects like it was before, we can able to test `RoundCeil` and `RoundFloor` directly for both negative and positive `scale`. What do you say ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sathiyapk commented on a change in pull request #34729: [SPARK-37475][SQL] Add scale parameter to floor and ceil functions
sathiyapk commented on a change in pull request #34729: URL: https://github.com/apache/spark/pull/34729#discussion_r780514098 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala ## @@ -658,6 +669,62 @@ class MathExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val intResultsB: Seq[Int] = Seq(31400, 31420, 31416, 314159000, 314159300, 314159260) ++ Seq.fill(7)(314159265) +def doubleResultsFloor(i: Int): Any = { + val results = Seq(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3, +3.1, 3.14, 3.141, 3.1415, 3.14159, 3.141592) + if (i <= 6) results(i).toLong else results(i) +} + +def doubleResultsCeil(i: Int): Any = { + val results = Seq(100.0, 10.0, 1.0, 1000.0, 100.0, 10.0, +4L, 3.2, 3.15, 3.142, 3.1416, 3.1416, 3.141593) + if (i <= 6) results(i).toLong else results(i) +} + +def floatResultsFloor(i: Int): Any = { + val results = Seq(0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 3L, +3.1d, 3.14d, 3.141d, 3.1414d, 3.14149d, 3.141499d) + if (i <= 6) results(i).toLong else results(i) +} + +def floatResultsCeil(i: Int): Any = { + val results = Seq(100.0f, 10.0f, 1.0f, 1000.0f, 100.0f, 10.0f, +4L, 3.2d, 3.15d, 3.142d, 3.1415d, 3.1415d, 3.1415d) + if (i <= 6) results(i).toLong else results(i) +} + +def shortResultsFloor(i: Int): Any = { + val results = Seq[Long](0L, 0L, 3L, 31000L, +31400L, 31410L, 31415L) ++ Seq.fill[Long](7)(31415) + results(i) +} + +def shortResultsCeil(i: Int): Any = { Review comment: Sorry, i am not sure i got it right. Actually we test `RoundCeil` and `RoundFloor` directly for positive `scale` on big decimal, i can add long and double in that case, if you prefer. But we can't test `RoundCeil` and `RoundFloor` directly for negative `scale` because the casting to `Long` for negative scale happens one step before. May be if put the casting in the `RoundCeil` and `RoundFloor` objects and let the `ExpressionBuilder` call these objects like it was before, we can able to test `RoundCeil` and `RoundFloor` directly for both negative and positive `scale`. What do you say ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cdegroc opened a new pull request #35140: [SPARK-37829][SQL] DataFrame.joinWith should return null rows for missing values
cdegroc opened a new pull request #35140: URL: https://github.com/apache/spark/pull/35140 ### What changes were proposed in this pull request? Add a unit test demonstrating the regression on `DataFrame.joinWith`. Revert [commit cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59) making the test pass. ### Why are the changes needed? Doing an outer-join using joinWith on DataFrames used to return missing values as null in Spark 2.4.8, but returns them as Rows with null values in Spark 3.0.0+. The regression has been introduced in [commit cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A unit test was added. Ran unit tests for the `sql-core` and `sql-catalyst` submodules with `./build/mvn clean package -pl sql/core,cql/catalyst` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #35113: [SPARK-37636][SQL] Migrate CREATE NAMESPACE to use V2 command by default
imback82 commented on a change in pull request #35113: URL: https://github.com/apache/spark/pull/35113#discussion_r780475265 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ## @@ -189,8 +189,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def createDatabase( dbDefinition: CatalogDatabase, - ignoreIfExists: Boolean): Unit = withClient { -client.createDatabase(dbDefinition, ignoreIfExists) + ignoreIfExists: Boolean): Unit = { +try { + withClient { +client.createDatabase(dbDefinition, ignoreIfExists) + } +} catch { + case e: AnalysisException if e.message.contains("already exists") => +throw new DatabaseAlreadyExistsException(dbDefinition.name) Review comment: Good idea! Let me try this approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cdegroc opened a new pull request #35139: [SPARK-37829][SQL] DataFrame.joinWith should return null rows for missing values
cdegroc opened a new pull request #35139: URL: https://github.com/apache/spark/pull/35139 ### What changes were proposed in this pull request? Add a unit test demonstrating the regression on `DataFrame.joinWith`. Update `ExpressionEncoder` to make the test pass (equivalent to reverting the [commit](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59) that introduced the issue). ### Why are the changes needed? Doing an outer-join using joinWith on DataFrames used to return missing values as null in Spark 2.4.8, but returns them as Rows with null values in Spark 3.0.0+. The regression has been introduced in [this commit](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A unit test was added. Ran unit tests for the `sql-core` and `sql-catalyst` submodules with `./build/mvn clean package -pl sql/core,cql/catalyst` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35132: [SPARK-37841][SQL] BasicWriteTaskStatsTracker should not try get status for a skipped file
cloud-fan commented on a change in pull request #35132: URL: https://github.com/apache/spark/pull/35132#discussion_r780459615 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala ## @@ -305,6 +305,12 @@ private[orc] class OrcOutputWriter( recordWriter.close(Reporter.NULL) } } + + /** + * If `recordWriterInstantiated` is false, the output file is not pretouched. Review comment: Alternatively, can we fix the Hive ORC data source to always write the file? This seems wrong to me. We should at least write one file even if the input query is empty, to record the output schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35132: [SPARK-37841][SQL] BasicWriteTaskStatsTracker should not try get status for a skipped file
cloud-fan commented on a change in pull request #35132: URL: https://github.com/apache/spark/pull/35132#discussion_r780458052 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala ## @@ -142,8 +142,13 @@ class BasicWriteTaskStatsTracker( numSubmittedFiles += 1 } - override def closeFile(filePath: String): Unit = { -updateFileStats(filePath) + override def closeFile(filePath: String, isPathCreated: Boolean): Unit = { +if (isPathCreated) { + updateFileStats(filePath) +} else { + logDebug(s"$filePath is not pre-touched by writer, skipping update file stats") Review comment: ```suggestion logDebug(s"$filePath is not created due to no data, skip updating file stats") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #35135: [SPARK-35442][SQL] Support propagate empty relation through aggregate
cloud-fan commented on pull request #35135: URL: https://github.com/apache/spark/pull/35135#issuecomment-1007644925 can you also address https://github.com/apache/spark/pull/32602#discussion_r780377367 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35113: [SPARK-37636][SQL] Migrate CREATE NAMESPACE to use V2 command by default
cloud-fan commented on a change in pull request #35113: URL: https://github.com/apache/spark/pull/35113#discussion_r780453457 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ## @@ -189,8 +189,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def createDatabase( dbDefinition: CatalogDatabase, - ignoreIfExists: Boolean): Unit = withClient { -client.createDatabase(dbDefinition, ignoreIfExists) + ignoreIfExists: Boolean): Unit = { +try { + withClient { +client.createDatabase(dbDefinition, ignoreIfExists) + } +} catch { + case e: AnalysisException if e.message.contains("already exists") => +throw new DatabaseAlreadyExistsException(dbDefinition.name) Review comment: You made a good point. But it's still a bit hacky that we wrap the exception multiple times. How about ``` def withClient(convertException: Throwable => AnalysisException, body: => T) ... def withClient(body: => T) = withClient(e => throw new AnalysisException..., body) ``` Then we can call the new overload of `withClient` in `createDatabase` and translate the hive exception directly. We can probably apply it in more places like `createTable` when we need to unify the error message further. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tgravescs edited a comment on pull request #34672: [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured
tgravescs edited a comment on pull request #34672: URL: https://github.com/apache/spark/pull/34672#issuecomment-1007641251 yes my concern is that its a config for something that isn't public. it doesn't make sense to me to have a public config for a non-public api and by itself without this 3rd party lib the config would not apply. I don't think my question was answered (can't you just open a port on one of your remote shuffle services and ignore the messages) and it sounds like it does work now with some hack (is that hack just setup and configs)? If so then I would much rather prefer a real solution like was already mentioned above because you can make it work now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tgravescs commented on pull request #34672: [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured
tgravescs commented on pull request #34672: URL: https://github.com/apache/spark/pull/34672#issuecomment-1007641251 yes my concern is that its a config for something that isn't public. it doesn't make sense to me to have a public config for a non-public api and by itself without this 3rd party lib the config would not apply. I don't think my question was answered and it sounds like it does work now with some hack (is that hack just setup and configs)? If so then I would much rather prefer a real solution like was already mentioned above because you can make it work now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
venkata91 commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r780452227 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler( executorFailureEpoch -= execId } shuffleFileLostEpoch -= execId + +if (pushBasedShuffleEnabled) { Review comment: Agree. Let me see how we can send an event when a new blockManager's host which is not already part of existing shuffleMergers and correspondingly fire an event to DAGScheduler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
venkata91 commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r780451587 ## File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala ## @@ -39,7 +39,9 @@ class StageInfo( val taskMetrics: TaskMetrics = null, private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty, private[spark] val shuffleDepId: Option[Int] = None, -val resourceProfileId: Int) { +val resourceProfileId: Int, +private[spark] var isPushBasedShuffleEnabled: Boolean = false, +private[spark] var shuffleMergerCount: Int = 0) { Review comment: Yes, currently there are no consumers with in Spark. But [[SPARK-36620][SHUFFLE] Add client side push based shuffle metrics](https://github.com/apache/spark/pull/34000) has to use these metrics and populate it which is currently ongoing. Do you think we can move this change as well to that PR or wait for this one to get merged and then correspondingly make the changes in the other PR to consume these metrics? cc @thejdeep -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys
cloud-fan commented on pull request #35138: URL: https://github.com/apache/spark/pull/35138#issuecomment-1007635990 cc @sunchao @c21 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys
cloud-fan commented on a change in pull request #35138: URL: https://github.com/apache/spark/pull/35138#discussion_r780448272 ## File path: sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q17.sf100/simplified.txt ## @@ -1,90 +1,97 @@ TakeOrderedAndProject [i_item_id,i_item_desc,s_state,store_sales_quantitycount,store_sales_quantityave,store_sales_quantitystdev,store_sales_quantitycov,as_store_returns_quantitycount,as_store_returns_quantityave,as_store_returns_quantitystdev,store_returns_quantitycov,catalog_sales_quantitycount,catalog_sales_quantityave,catalog_sales_quantitystdev,catalog_sales_quantitycov] Review comment: I checked it locally. Now the plan golden files are exactly the same with before #32875 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys
cloud-fan commented on a change in pull request #35138: URL: https://github.com/apache/spark/pull/35138#discussion_r780448272 ## File path: sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q17.sf100/simplified.txt ## @@ -1,90 +1,97 @@ TakeOrderedAndProject [i_item_id,i_item_desc,s_state,store_sales_quantitycount,store_sales_quantityave,store_sales_quantitystdev,store_sales_quantitycov,as_store_returns_quantitycount,as_store_returns_quantityave,as_store_returns_quantitystdev,store_returns_quantitycov,catalog_sales_quantitycount,catalog_sales_quantityave,catalog_sales_quantitystdev,catalog_sales_quantitycov] Review comment: I checked it locally. Now the plan golden files are exactly the same with the ones before #32875 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan opened a new pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys
cloud-fan opened a new pull request #35138: URL: https://github.com/apache/spark/pull/35138 ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/32875 . Basically https://github.com/apache/spark/pull/32875 did two improvements: 1. allow bucket join even if the bucket hash function is different from Spark's shuffle hash function 2. allow bucket join even if the hash partition keys are subset of join keys. The first improvement is the major target for implementing the SPIP "storage partition join". The second improvement is kind of a consequence of the framework refactor, which is not planned. This PR is to disable the second improvement by default, which may introduce perf regression if there are data skew without shuffle. We need more designs to enable this improvement, like checking the ndv. ### Why are the changes needed? Avoid perf regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zero323 commented on pull request #34439: [SPARK-37095][PYTHON] Inline type hints for files in python/pyspark/broadcast.py
zero323 commented on pull request #34439: URL: https://github.com/apache/spark/pull/34439#issuecomment-1007619850 Could you please resolve the conflicts @dchvn? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hiboyang commented on pull request #34672: [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured
hiboyang commented on pull request #34672: URL: https://github.com/apache/spark/pull/34672#issuecomment-1007611924 @tgravescs I am the author of Uber RSS. In terms of your question "I'm not familiar with the details of Uber RSS, can it just fake up a remote shuffle port and ignore what is sent? have you requested their version to support dynamic allocation?"... Uber RSS could support dynamic allocation on Kubernetes by some work around, e.g enabling shuffle tracking feature and set shuffle timeout to be zero. But it is a hack. The code change here will make that hack unnecessary. In terms of adding a config like "spark.shuffle.registration.enabled", any concern? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #35113: [SPARK-37636][SQL] Migrate CREATE NAMESPACE to use V2 command by default
imback82 commented on a change in pull request #35113: URL: https://github.com/apache/spark/pull/35113#discussion_r780418849 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ## @@ -189,8 +189,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def createDatabase( dbDefinition: CatalogDatabase, - ignoreIfExists: Boolean): Unit = withClient { -client.createDatabase(dbDefinition, ignoreIfExists) + ignoreIfExists: Boolean): Unit = { +try { + withClient { +client.createDatabase(dbDefinition, ignoreIfExists) + } +} catch { + case e: AnalysisException if e.message.contains("already exists") => +throw new DatabaseAlreadyExistsException(dbDefinition.name) Review comment: Two concerns if we move the logic to `withClient`: 1. How can we guarantee `org.apache.hadoop.hive.metastore.api.AlreadyExistsException` is thrown only by `createDatabase` but not for other calls like `createTable`? 2. Since db name is not available, we need to parse out the db name from the message. Are you OK with these? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.
gaborgsomogyi commented on a change in pull request #23348: URL: https://github.com/apache/spark/pull/23348#discussion_r780426592 ## File path: core/src/main/scala/org/apache/spark/deploy/security/README.md ## @@ -0,0 +1,249 @@ +# Delegation Token Handling In Spark + +This document aims to explain and demystify delegation tokens as they are used by Spark, since +this topic is generally a huge source of confusion. + + +## What are delegation tokens and why use them? + +Delegation tokens (DTs from now on) are authentication tokens used by some services to replace +Kerberos service tokens. Many services in the Hadoop ecosystem have support for DTs, since they +have some very desirable advantages over Kerberos tokens: + +* No need to distribute Kerberos credentials + +In a distributed application, distributing Kerberos credentials is tricky. Not all users have +keytabs, and when they do, it's generally frowned upon to distribute them over the network as +part of application data. + +DTs allow for a single place (e.g. the Spark driver) to require Kerberos credentials. That entity +can then distribute the DTs to other parts of the distributed application (e.g. Spark executors), +so they can authenticate to services. + +* A single token per service is used for authentication + +If Kerberos authentication were used, each client connection to a server would require a trip +to the KDC and generation of a service ticket. In a distributed system, the number of service +tickets can balloon pretty quickly when you think about the number of client processes (e.g. Spark +executors) vs. the number of service processes (e.g. HDFS DataNodes). That generates unnecessary +extra load on the KDC, and may even run into usage limits set up by the KDC admin. + +* DTs are only used for authentication + +DTs, unlike TGTs, can only be used to authenticate to the specific service for which they were +issued. You cannot use an existing DT to create new DTs or to create DTs for a different service. + +So in short, DTs are *not* Kerberos tokens. They are used by many services to replace Kerberos +authentication, or even other forms of authentication, although there is nothing (aside from +maybe implementation details) that ties them to Kerberos or any other authentication mechanism. + + +## Lifecycle of DTs + +DTs, unlike Kerberos tokens, are service-specific. There is no centralized location you contact +to create a DT for a service. So, the first step needed to get a DT is being able to authenticate +to the service in question. In the Hadoop ecosystem, that is generally done using Kerberos. + +This requires Kerberos credentials to be available somewhere for the application to use. The user +is generally responsible for providing those credentials, which is most commonly done by logging +in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" containing a TGT +(ticket granting ticket), which can then be used to request service tickets. + +There are other ways of obtaining TGTs, but, ultimately, you need a TGT to bootstrap the process. + +Once a TGT is available, the target service's client library can then be used to authenticate +to the service using the Kerberos credentials, and request the creation of a delegation token. +This token can now be sent to other processes and used to authenticate to different daemons +belonging to that service. + +And thus the first drawback of DTs becomes apparent: you need service-specific logic to create and +use them. + +Spark implements a (somewhat) pluggable, internal DT creation API. Support for new services can be +added by implementing a `HadoopDelegationTokenProvider` that is then called by Spark when generating +delegation tokens for an application. Spark makes the DTs available to code by stashing them in the +`UserGroupInformation` credentials, and it's up to the DT provider and the respective client library +to agree on how to use those tokens. + +Once they are created, the semantics of how DTs operate are also service-specific. But, in general, +they try to follow the semantics of Kerberos tokens: + +* A "renewable period (equivalent to TGT's "lifetime") which is for how long the DT is valid + before it requires renewal. +* A "max lifetime" (equivalent to TGT's "renewable life") which is for how long the DT can be + renewed. + +Once the token reaches its "max lifetime", a new one needs to be created by contacting the +appropriate service, restarting the above process. + + +## DT Renewal, Renewers, and YARN + +This is the most confusing part of DT handling, and part of it is because much of the system was +designed with MapReduce, and later YARN, in mind. + +As seen above, DTs need to be renewed periodically until they finally expire for good. An example of +this is the default configuration of HDFS services: delegation tokens are valid for up to 7 days, +and need to be renewed every 24 hours. If 24 hours pass
[GitHub] [spark] imback82 commented on a change in pull request #35113: [SPARK-37636][SQL] Migrate CREATE NAMESPACE to use V2 command by default
imback82 commented on a change in pull request #35113: URL: https://github.com/apache/spark/pull/35113#discussion_r780418849 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ## @@ -189,8 +189,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def createDatabase( dbDefinition: CatalogDatabase, - ignoreIfExists: Boolean): Unit = withClient { -client.createDatabase(dbDefinition, ignoreIfExists) + ignoreIfExists: Boolean): Unit = { +try { + withClient { +client.createDatabase(dbDefinition, ignoreIfExists) + } +} catch { + case e: AnalysisException if e.message.contains("already exists") => +throw new DatabaseAlreadyExistsException(dbDefinition.name) Review comment: Two concerns if we move the logic to `withClient`: 1. How can we guarantee `org.apache.hadoop.hive.metastore.api.AlreadyExistsException` is thrown by `createDatabase` but not for other calls like `createTable`? 2. Since db name is not available, we need to parse out the db name from the message. Are you OK with these? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on pull request #34951: [SPARK-37686][PYTHON][SQL] Use _invoke_function helpers for all pyspark.sql.functions
viirya commented on pull request #34951: URL: https://github.com/apache/spark/pull/34951#issuecomment-1007579476 Looks okay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maryannxue commented on a change in pull request #34062: [SPARK-36819][SQL] Don't insert redundant filters in case static partition pruning can be done
maryannxue commented on a change in pull request #34062: URL: https://github.com/apache/spark/pull/34062#discussion_r780403353 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala ## @@ -216,11 +216,53 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join } /** - * Search a filtering predicate in a given logical plan + * Returns whether an expression is likely to be selective in dynamic partition filtering. + * 1. the predicate is selective. + * 2. the filtering predicate must not be subset of join key. In case it is, then partition + * filter can be inferred statically in optimization phase, hence return false. */ - private def hasSelectivePredicate(plan: LogicalPlan): Boolean = { + private def isLikelySelectiveWithInferFiltersEnabled( + e: Expression, + joinKey: Expression): (Boolean, Boolean) = e match { +case Not(expr) => isLikelySelectiveWithInferFiltersEnabled(expr, joinKey) +case And(l, r) => + val (isSelectiveLeft, notSubsetOfJoinKeyLeft) = +isLikelySelectiveWithInferFiltersEnabled(l, joinKey) + val (isSelectiveRight, notSubsetOfJoinKeyRight) = +isLikelySelectiveWithInferFiltersEnabled(r, joinKey) + (isSelectiveLeft || isSelectiveRight, notSubsetOfJoinKeyLeft || notSubsetOfJoinKeyRight) +case Or(l, r) => + val (isSelectiveLeft, notSubsetOfJoinKeyLeft) = +isLikelySelectiveWithInferFiltersEnabled(l, joinKey) + val (isSelectiveRight, notSubsetOfJoinKeyRight) = +isLikelySelectiveWithInferFiltersEnabled(r, joinKey) + (isSelectiveLeft && isSelectiveRight && (notSubsetOfJoinKeyLeft || notSubsetOfJoinKeyRight), +notSubsetOfJoinKeyLeft || notSubsetOfJoinKeyRight) +case expr: StringRegexExpression => (true, !isSubsetOfJoinKey(expr, joinKey)) +case expr: BinaryComparison => (true, !isSubsetOfJoinKey(expr, joinKey)) +case expr: In => (true, !isSubsetOfJoinKey(expr, joinKey)) +case expr: InSet => (true, !isSubsetOfJoinKey(expr, joinKey)) +case expr: StringPredicate => (true, !isSubsetOfJoinKey(expr, joinKey)) +case expr: MultiLikeBase => (true, !isSubsetOfJoinKey(expr, joinKey)) +case _ => (false, false) + } + + private def isSubsetOfJoinKey(e: Expression, joinKey: Expression): Boolean = + e.references.subsetOf(joinKey.references) + + /** + * Search a filtering predicate in a given logical plan. + */ + private def hasSelectivePredicate(plan: LogicalPlan, joinKey: Expression): Boolean = { Review comment: Do we need to make this so complicated? This is one of the last rules applied and by this time all static filters have been implied and pushed down? Can't we simply test if there's any static equality-condition filters on the same partition column that the dynamic filter would apply on? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #35127: [SPARK-37837][INFRA] Enable black formatter in dev Python scripts
dongjoon-hyun commented on pull request #35127: URL: https://github.com/apache/spark/pull/35127#issuecomment-1007572412 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #35127: [SPARK-37837][INFRA] Enable black formatter in dev Python scripts
dongjoon-hyun closed pull request #35127: URL: https://github.com/apache/spark/pull/35127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Peng-Lei commented on a change in pull request #35131: [SPARK-37827][SQL] Put the some built-in table properties into V1Table.propertie to adapt to V2 command
Peng-Lei commented on a change in pull request #35131: URL: https://github.com/apache/spark/pull/35131#discussion_r780390548 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala ## @@ -55,7 +56,8 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table { } } - override lazy val properties: util.Map[String, String] = v1Table.properties.asJava + override lazy val properties: util.Map[String, String] = +V1Table.filterTableProperties(v1Table).asJava Review comment: Thank you very much. I try do it. In fact, officially because store these v2 table properties into metastore when create table, I spent a lot of time looking for failed use cases and trying to fix. It is indeed a breaking change to generate v2 properties when creating tables and store them. Thank you for your reminder very much. @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bersprockets commented on pull request #35120: [SPARK-37832][SQL] Orc struct converter should use an array to look up field converters rather than a linked list
bersprockets commented on pull request #35120: URL: https://github.com/apache/spark/pull/35120#issuecomment-1007551297 Thanks @HyukjinKwon @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zero323 commented on a change in pull request #35136: [WIP][SPARK-37418][PYTHON][ML] Inline annotations for pyspark.ml.param.__init__.py
zero323 commented on a change in pull request #35136: URL: https://github.com/apache/spark/pull/35136#discussion_r780382158 ## File path: python/pyspark/ml/param/__init__.py ## @@ -76,33 +99,33 @@ class TypeConverters: """ @staticmethod -def _is_numeric(value): +def _is_numeric(value: Any) -> bool: vtype = type(value) return vtype in [int, float, np.float64, np.int64] or vtype.__name__ == "long" @staticmethod -def _is_integer(value): +def _is_integer(value: Any) -> bool: return TypeConverters._is_numeric(value) and float(value).is_integer() @staticmethod -def _can_convert_to_list(value): +def _can_convert_to_list(value: Any) -> bool: vtype = type(value) return vtype in [list, np.ndarray, tuple, range, array.array] or isinstance(value, Vector) @staticmethod -def _can_convert_to_string(value): +def _can_convert_to_string(value: Any) -> bool: vtype = type(value) return isinstance(value, str) or vtype in [np.unicode_, np.string_, np.str_] @staticmethod -def identity(value): +def identity(value: "T") -> "T": """ Dummy converter that just returns value. """ return value @staticmethod -def toList(value): +def toList(value: Any) -> List: Review comment: This and other `to*` methods could be more precise with specific overloads, i.e. ```python @staticmethod @overload def toList(value: List[T]) -> List[T]: ... @staticmethod @overload def toList(value: range) -> List[int]: ... @staticmethod @overload def toList(value: Vector) -> List[float]: ... ``` and so on, but it is probably not worth the effort. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wzhfy opened a new pull request #35137: [TEST] get sleep interval in task
wzhfy opened a new pull request #35137: URL: https://github.com/apache/spark/pull/35137 ### What changes were proposed in this pull request? Currently in the tests, `TaskContext.getPartitionId()` is called at driver side, so the sleep interval is always 100ms, which is not expected and makes some tests flaky. ### Why are the changes needed? Correct the test logic and make tests stable. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By existing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tgravescs commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.
tgravescs commented on a change in pull request #23348: URL: https://github.com/apache/spark/pull/23348#discussion_r780379362 ## File path: core/src/main/scala/org/apache/spark/deploy/security/README.md ## @@ -0,0 +1,249 @@ +# Delegation Token Handling In Spark + +This document aims to explain and demystify delegation tokens as they are used by Spark, since +this topic is generally a huge source of confusion. + + +## What are delegation tokens and why use them? + +Delegation tokens (DTs from now on) are authentication tokens used by some services to replace +Kerberos service tokens. Many services in the Hadoop ecosystem have support for DTs, since they +have some very desirable advantages over Kerberos tokens: + +* No need to distribute Kerberos credentials + +In a distributed application, distributing Kerberos credentials is tricky. Not all users have +keytabs, and when they do, it's generally frowned upon to distribute them over the network as +part of application data. + +DTs allow for a single place (e.g. the Spark driver) to require Kerberos credentials. That entity +can then distribute the DTs to other parts of the distributed application (e.g. Spark executors), +so they can authenticate to services. + +* A single token per service is used for authentication + +If Kerberos authentication were used, each client connection to a server would require a trip +to the KDC and generation of a service ticket. In a distributed system, the number of service +tickets can balloon pretty quickly when you think about the number of client processes (e.g. Spark +executors) vs. the number of service processes (e.g. HDFS DataNodes). That generates unnecessary +extra load on the KDC, and may even run into usage limits set up by the KDC admin. + +* DTs are only used for authentication + +DTs, unlike TGTs, can only be used to authenticate to the specific service for which they were +issued. You cannot use an existing DT to create new DTs or to create DTs for a different service. + +So in short, DTs are *not* Kerberos tokens. They are used by many services to replace Kerberos +authentication, or even other forms of authentication, although there is nothing (aside from +maybe implementation details) that ties them to Kerberos or any other authentication mechanism. + + +## Lifecycle of DTs + +DTs, unlike Kerberos tokens, are service-specific. There is no centralized location you contact +to create a DT for a service. So, the first step needed to get a DT is being able to authenticate +to the service in question. In the Hadoop ecosystem, that is generally done using Kerberos. + +This requires Kerberos credentials to be available somewhere for the application to use. The user +is generally responsible for providing those credentials, which is most commonly done by logging +in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" containing a TGT +(ticket granting ticket), which can then be used to request service tickets. + +There are other ways of obtaining TGTs, but, ultimately, you need a TGT to bootstrap the process. + +Once a TGT is available, the target service's client library can then be used to authenticate +to the service using the Kerberos credentials, and request the creation of a delegation token. +This token can now be sent to other processes and used to authenticate to different daemons +belonging to that service. + +And thus the first drawback of DTs becomes apparent: you need service-specific logic to create and +use them. + +Spark implements a (somewhat) pluggable, internal DT creation API. Support for new services can be +added by implementing a `HadoopDelegationTokenProvider` that is then called by Spark when generating +delegation tokens for an application. Spark makes the DTs available to code by stashing them in the +`UserGroupInformation` credentials, and it's up to the DT provider and the respective client library +to agree on how to use those tokens. + +Once they are created, the semantics of how DTs operate are also service-specific. But, in general, +they try to follow the semantics of Kerberos tokens: + +* A "renewable period (equivalent to TGT's "lifetime") which is for how long the DT is valid + before it requires renewal. +* A "max lifetime" (equivalent to TGT's "renewable life") which is for how long the DT can be + renewed. + +Once the token reaches its "max lifetime", a new one needs to be created by contacting the +appropriate service, restarting the above process. + + +## DT Renewal, Renewers, and YARN + +This is the most confusing part of DT handling, and part of it is because much of the system was +designed with MapReduce, and later YARN, in mind. + +As seen above, DTs need to be renewed periodically until they finally expire for good. An example of +this is the default configuration of HDFS services: delegation tokens are valid for up to 7 days, +and need to be renewed every 24 hours. If 24 hours pass wit
[GitHub] [spark] tgravescs commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.
tgravescs commented on a change in pull request #23348: URL: https://github.com/apache/spark/pull/23348#discussion_r780378105 ## File path: core/src/main/scala/org/apache/spark/deploy/security/README.md ## @@ -0,0 +1,249 @@ +# Delegation Token Handling In Spark + +This document aims to explain and demystify delegation tokens as they are used by Spark, since +this topic is generally a huge source of confusion. + + +## What are delegation tokens and why use them? + +Delegation tokens (DTs from now on) are authentication tokens used by some services to replace +Kerberos service tokens. Many services in the Hadoop ecosystem have support for DTs, since they +have some very desirable advantages over Kerberos tokens: + +* No need to distribute Kerberos credentials + +In a distributed application, distributing Kerberos credentials is tricky. Not all users have +keytabs, and when they do, it's generally frowned upon to distribute them over the network as +part of application data. + +DTs allow for a single place (e.g. the Spark driver) to require Kerberos credentials. That entity +can then distribute the DTs to other parts of the distributed application (e.g. Spark executors), +so they can authenticate to services. + +* A single token per service is used for authentication + +If Kerberos authentication were used, each client connection to a server would require a trip +to the KDC and generation of a service ticket. In a distributed system, the number of service +tickets can balloon pretty quickly when you think about the number of client processes (e.g. Spark +executors) vs. the number of service processes (e.g. HDFS DataNodes). That generates unnecessary +extra load on the KDC, and may even run into usage limits set up by the KDC admin. + +* DTs are only used for authentication + +DTs, unlike TGTs, can only be used to authenticate to the specific service for which they were +issued. You cannot use an existing DT to create new DTs or to create DTs for a different service. + +So in short, DTs are *not* Kerberos tokens. They are used by many services to replace Kerberos +authentication, or even other forms of authentication, although there is nothing (aside from +maybe implementation details) that ties them to Kerberos or any other authentication mechanism. + + +## Lifecycle of DTs + +DTs, unlike Kerberos tokens, are service-specific. There is no centralized location you contact +to create a DT for a service. So, the first step needed to get a DT is being able to authenticate +to the service in question. In the Hadoop ecosystem, that is generally done using Kerberos. + +This requires Kerberos credentials to be available somewhere for the application to use. The user +is generally responsible for providing those credentials, which is most commonly done by logging +in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" containing a TGT +(ticket granting ticket), which can then be used to request service tickets. + +There are other ways of obtaining TGTs, but, ultimately, you need a TGT to bootstrap the process. + +Once a TGT is available, the target service's client library can then be used to authenticate +to the service using the Kerberos credentials, and request the creation of a delegation token. +This token can now be sent to other processes and used to authenticate to different daemons +belonging to that service. + +And thus the first drawback of DTs becomes apparent: you need service-specific logic to create and +use them. + +Spark implements a (somewhat) pluggable, internal DT creation API. Support for new services can be +added by implementing a `HadoopDelegationTokenProvider` that is then called by Spark when generating +delegation tokens for an application. Spark makes the DTs available to code by stashing them in the +`UserGroupInformation` credentials, and it's up to the DT provider and the respective client library +to agree on how to use those tokens. + +Once they are created, the semantics of how DTs operate are also service-specific. But, in general, +they try to follow the semantics of Kerberos tokens: + +* A "renewable period (equivalent to TGT's "lifetime") which is for how long the DT is valid + before it requires renewal. +* A "max lifetime" (equivalent to TGT's "renewable life") which is for how long the DT can be + renewed. + +Once the token reaches its "max lifetime", a new one needs to be created by contacting the +appropriate service, restarting the above process. + + +## DT Renewal, Renewers, and YARN + +This is the most confusing part of DT handling, and part of it is because much of the system was +designed with MapReduce, and later YARN, in mind. + +As seen above, DTs need to be renewed periodically until they finally expire for good. An example of +this is the default configuration of HDFS services: delegation tokens are valid for up to 7 days, +and need to be renewed every 24 hours. If 24 hours pass wit
[GitHub] [spark] tgravescs commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.
tgravescs commented on a change in pull request #23348: URL: https://github.com/apache/spark/pull/23348#discussion_r780378105 ## File path: core/src/main/scala/org/apache/spark/deploy/security/README.md ## @@ -0,0 +1,249 @@ +# Delegation Token Handling In Spark + +This document aims to explain and demystify delegation tokens as they are used by Spark, since +this topic is generally a huge source of confusion. + + +## What are delegation tokens and why use them? + +Delegation tokens (DTs from now on) are authentication tokens used by some services to replace +Kerberos service tokens. Many services in the Hadoop ecosystem have support for DTs, since they +have some very desirable advantages over Kerberos tokens: + +* No need to distribute Kerberos credentials + +In a distributed application, distributing Kerberos credentials is tricky. Not all users have +keytabs, and when they do, it's generally frowned upon to distribute them over the network as +part of application data. + +DTs allow for a single place (e.g. the Spark driver) to require Kerberos credentials. That entity +can then distribute the DTs to other parts of the distributed application (e.g. Spark executors), +so they can authenticate to services. + +* A single token per service is used for authentication + +If Kerberos authentication were used, each client connection to a server would require a trip +to the KDC and generation of a service ticket. In a distributed system, the number of service +tickets can balloon pretty quickly when you think about the number of client processes (e.g. Spark +executors) vs. the number of service processes (e.g. HDFS DataNodes). That generates unnecessary +extra load on the KDC, and may even run into usage limits set up by the KDC admin. + +* DTs are only used for authentication + +DTs, unlike TGTs, can only be used to authenticate to the specific service for which they were +issued. You cannot use an existing DT to create new DTs or to create DTs for a different service. + +So in short, DTs are *not* Kerberos tokens. They are used by many services to replace Kerberos +authentication, or even other forms of authentication, although there is nothing (aside from +maybe implementation details) that ties them to Kerberos or any other authentication mechanism. + + +## Lifecycle of DTs + +DTs, unlike Kerberos tokens, are service-specific. There is no centralized location you contact +to create a DT for a service. So, the first step needed to get a DT is being able to authenticate +to the service in question. In the Hadoop ecosystem, that is generally done using Kerberos. + +This requires Kerberos credentials to be available somewhere for the application to use. The user +is generally responsible for providing those credentials, which is most commonly done by logging +in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" containing a TGT +(ticket granting ticket), which can then be used to request service tickets. + +There are other ways of obtaining TGTs, but, ultimately, you need a TGT to bootstrap the process. + +Once a TGT is available, the target service's client library can then be used to authenticate +to the service using the Kerberos credentials, and request the creation of a delegation token. +This token can now be sent to other processes and used to authenticate to different daemons +belonging to that service. + +And thus the first drawback of DTs becomes apparent: you need service-specific logic to create and +use them. + +Spark implements a (somewhat) pluggable, internal DT creation API. Support for new services can be +added by implementing a `HadoopDelegationTokenProvider` that is then called by Spark when generating +delegation tokens for an application. Spark makes the DTs available to code by stashing them in the +`UserGroupInformation` credentials, and it's up to the DT provider and the respective client library +to agree on how to use those tokens. + +Once they are created, the semantics of how DTs operate are also service-specific. But, in general, +they try to follow the semantics of Kerberos tokens: + +* A "renewable period (equivalent to TGT's "lifetime") which is for how long the DT is valid + before it requires renewal. +* A "max lifetime" (equivalent to TGT's "renewable life") which is for how long the DT can be + renewed. + +Once the token reaches its "max lifetime", a new one needs to be created by contacting the +appropriate service, restarting the above process. + + +## DT Renewal, Renewers, and YARN + +This is the most confusing part of DT handling, and part of it is because much of the system was +designed with MapReduce, and later YARN, in mind. + +As seen above, DTs need to be renewed periodically until they finally expire for good. An example of +this is the default configuration of HDFS services: delegation tokens are valid for up to 7 days, +and need to be renewed every 24 hours. If 24 hours pass wit
[GitHub] [spark] cloud-fan commented on a change in pull request #32602: [SPARK-35455][SQL] Unify empty relation optimization between normal and AQE optimizer
cloud-fan commented on a change in pull request #32602: URL: https://github.com/apache/spark/pull/32602#discussion_r780377367 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala ## @@ -137,3 +125,55 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit } } } + +/** + * This rule runs in the normal optimizer and optimizes more cases + * compared to [[PropagateEmptyRelationBase]]: + * 1. Higher-node Logical Plans + *- Union with all empty children. + * 2. Unary-node Logical Plans + *- Project/Filter/Sample with all empty children. + * + * The reason why we don't apply this rule at AQE optimizer side is: the benefit is not big enough + * and it may introduce extra exchanges. Review comment: After more thought, I think this is a big performance issue if we can't propagate empty relations through project/filter which are quite common. The risk of introducing new shuffles is relatively small compared to this. @ulysses-you can we move all the logic to `PropagateEmptyRelationBase`? `PropagateEmptyRelation` should not have any extra logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tgravescs commented on a change in pull request #34326: [SPARK-37053][CORE] Add metrics to SparkHistoryServer
tgravescs commented on a change in pull request #34326: URL: https://github.com/apache/spark/pull/34326#discussion_r780369492 ## File path: core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ## @@ -157,6 +161,15 @@ class HistoryServer( contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX) contextHandler.addServlet(new ServletHolder(loaderServlet), "/*") attachHandler(contextHandler) + +// Register history server source to history server metrics system. +cacheMetrics.init() Review comment: why are we calling init here? Doesn't this end up getting called twice? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tgravescs commented on pull request #33934: [SPARK-36691][PYTHON] PythonRunner failed should pass error message to ApplicationMaster too
tgravescs commented on pull request #33934: URL: https://github.com/apache/spark/pull/33934#issuecomment-1007523767 @holdenk who may have dealt with the python side before and have thoughts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.
gaborgsomogyi commented on a change in pull request #23348: URL: https://github.com/apache/spark/pull/23348#discussion_r780350756 ## File path: core/src/main/scala/org/apache/spark/deploy/security/README.md ## @@ -0,0 +1,249 @@ +# Delegation Token Handling In Spark + +This document aims to explain and demystify delegation tokens as they are used by Spark, since +this topic is generally a huge source of confusion. + + +## What are delegation tokens and why use them? + +Delegation tokens (DTs from now on) are authentication tokens used by some services to replace +Kerberos service tokens. Many services in the Hadoop ecosystem have support for DTs, since they +have some very desirable advantages over Kerberos tokens: + +* No need to distribute Kerberos credentials + +In a distributed application, distributing Kerberos credentials is tricky. Not all users have +keytabs, and when they do, it's generally frowned upon to distribute them over the network as +part of application data. + +DTs allow for a single place (e.g. the Spark driver) to require Kerberos credentials. That entity +can then distribute the DTs to other parts of the distributed application (e.g. Spark executors), +so they can authenticate to services. + +* A single token per service is used for authentication + +If Kerberos authentication were used, each client connection to a server would require a trip +to the KDC and generation of a service ticket. In a distributed system, the number of service +tickets can balloon pretty quickly when you think about the number of client processes (e.g. Spark +executors) vs. the number of service processes (e.g. HDFS DataNodes). That generates unnecessary +extra load on the KDC, and may even run into usage limits set up by the KDC admin. + +* DTs are only used for authentication + +DTs, unlike TGTs, can only be used to authenticate to the specific service for which they were +issued. You cannot use an existing DT to create new DTs or to create DTs for a different service. + +So in short, DTs are *not* Kerberos tokens. They are used by many services to replace Kerberos +authentication, or even other forms of authentication, although there is nothing (aside from +maybe implementation details) that ties them to Kerberos or any other authentication mechanism. + + +## Lifecycle of DTs + +DTs, unlike Kerberos tokens, are service-specific. There is no centralized location you contact +to create a DT for a service. So, the first step needed to get a DT is being able to authenticate +to the service in question. In the Hadoop ecosystem, that is generally done using Kerberos. + +This requires Kerberos credentials to be available somewhere for the application to use. The user +is generally responsible for providing those credentials, which is most commonly done by logging +in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" containing a TGT +(ticket granting ticket), which can then be used to request service tickets. + +There are other ways of obtaining TGTs, but, ultimately, you need a TGT to bootstrap the process. + +Once a TGT is available, the target service's client library can then be used to authenticate +to the service using the Kerberos credentials, and request the creation of a delegation token. +This token can now be sent to other processes and used to authenticate to different daemons +belonging to that service. + +And thus the first drawback of DTs becomes apparent: you need service-specific logic to create and +use them. + +Spark implements a (somewhat) pluggable, internal DT creation API. Support for new services can be +added by implementing a `HadoopDelegationTokenProvider` that is then called by Spark when generating +delegation tokens for an application. Spark makes the DTs available to code by stashing them in the +`UserGroupInformation` credentials, and it's up to the DT provider and the respective client library +to agree on how to use those tokens. + +Once they are created, the semantics of how DTs operate are also service-specific. But, in general, +they try to follow the semantics of Kerberos tokens: + +* A "renewable period (equivalent to TGT's "lifetime") which is for how long the DT is valid + before it requires renewal. +* A "max lifetime" (equivalent to TGT's "renewable life") which is for how long the DT can be + renewed. + +Once the token reaches its "max lifetime", a new one needs to be created by contacting the +appropriate service, restarting the above process. + + +## DT Renewal, Renewers, and YARN + +This is the most confusing part of DT handling, and part of it is because much of the system was +designed with MapReduce, and later YARN, in mind. + +As seen above, DTs need to be renewed periodically until they finally expire for good. An example of +this is the default configuration of HDFS services: delegation tokens are valid for up to 7 days, +and need to be renewed every 24 hours. If 24 hours pass
[GitHub] [spark] zero323 commented on pull request #34674: [SPARK-37419][PYTHON][ML] Rewrite _shared_params_code_gen.py to inline type hints for ml/param/shared.py
zero323 commented on pull request #34674: URL: https://github.com/apache/spark/pull/34674#issuecomment-1007510055 Merged into master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zero323 closed pull request #34674: [SPARK-37419][PYTHON][ML] Rewrite _shared_params_code_gen.py to inline type hints for ml/param/shared.py
zero323 closed pull request #34674: URL: https://github.com/apache/spark/pull/34674 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tgravescs commented on pull request #34982: [SPARK-37712][YARN] Spark request yarn cluster metrics slow cause delay
tgravescs commented on pull request #34982: URL: https://github.com/apache/spark/pull/34982#issuecomment-1007504670 I guess I'm fine with this, I kind of assume this is because your yarn resource manager is overloaded or unresponsive though again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tgravescs commented on pull request #34672: [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured
tgravescs commented on pull request #34672: URL: https://github.com/apache/spark/pull/34672#issuecomment-1007499597 @attilapiros is there any active work on it now? I get that is a lot more work but at the same time if we keep putting it off for temporary hacks it won't get done. if you look at it from just a Spark perspective, the API is not public, we would be adding an interface that is not used internally to Spark or a config that would not be used internally and is for a private api. Neither of those things are great API choices in my mind. I do get that shuffle is overridden by people though and that full features take time, which is why I'm a bit torn on this. There is logic right above this check for external shuffle service as well that sets the shuffleServerID, what are you using for that port? Its used in mapStatus and register, I guess it doesn't matter in this case for map status. I'm not familiar with the details of Uber RSS, can it just fake up a remote shuffle port and ignore what is sent? have you requested their version to support dynamic allocation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tgravescs commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.
tgravescs commented on a change in pull request #23348: URL: https://github.com/apache/spark/pull/23348#discussion_r780320642 ## File path: core/src/main/scala/org/apache/spark/deploy/security/README.md ## @@ -0,0 +1,249 @@ +# Delegation Token Handling In Spark + +This document aims to explain and demystify delegation tokens as they are used by Spark, since +this topic is generally a huge source of confusion. + + +## What are delegation tokens and why use them? + +Delegation tokens (DTs from now on) are authentication tokens used by some services to replace +Kerberos service tokens. Many services in the Hadoop ecosystem have support for DTs, since they +have some very desirable advantages over Kerberos tokens: + +* No need to distribute Kerberos credentials + +In a distributed application, distributing Kerberos credentials is tricky. Not all users have +keytabs, and when they do, it's generally frowned upon to distribute them over the network as +part of application data. + +DTs allow for a single place (e.g. the Spark driver) to require Kerberos credentials. That entity +can then distribute the DTs to other parts of the distributed application (e.g. Spark executors), +so they can authenticate to services. + +* A single token per service is used for authentication + +If Kerberos authentication were used, each client connection to a server would require a trip +to the KDC and generation of a service ticket. In a distributed system, the number of service +tickets can balloon pretty quickly when you think about the number of client processes (e.g. Spark +executors) vs. the number of service processes (e.g. HDFS DataNodes). That generates unnecessary +extra load on the KDC, and may even run into usage limits set up by the KDC admin. + +* DTs are only used for authentication + +DTs, unlike TGTs, can only be used to authenticate to the specific service for which they were +issued. You cannot use an existing DT to create new DTs or to create DTs for a different service. + +So in short, DTs are *not* Kerberos tokens. They are used by many services to replace Kerberos +authentication, or even other forms of authentication, although there is nothing (aside from +maybe implementation details) that ties them to Kerberos or any other authentication mechanism. + + +## Lifecycle of DTs + +DTs, unlike Kerberos tokens, are service-specific. There is no centralized location you contact +to create a DT for a service. So, the first step needed to get a DT is being able to authenticate +to the service in question. In the Hadoop ecosystem, that is generally done using Kerberos. + +This requires Kerberos credentials to be available somewhere for the application to use. The user +is generally responsible for providing those credentials, which is most commonly done by logging +in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" containing a TGT +(ticket granting ticket), which can then be used to request service tickets. + +There are other ways of obtaining TGTs, but, ultimately, you need a TGT to bootstrap the process. + +Once a TGT is available, the target service's client library can then be used to authenticate +to the service using the Kerberos credentials, and request the creation of a delegation token. +This token can now be sent to other processes and used to authenticate to different daemons +belonging to that service. + +And thus the first drawback of DTs becomes apparent: you need service-specific logic to create and +use them. + +Spark implements a (somewhat) pluggable, internal DT creation API. Support for new services can be +added by implementing a `HadoopDelegationTokenProvider` that is then called by Spark when generating +delegation tokens for an application. Spark makes the DTs available to code by stashing them in the +`UserGroupInformation` credentials, and it's up to the DT provider and the respective client library +to agree on how to use those tokens. + +Once they are created, the semantics of how DTs operate are also service-specific. But, in general, +they try to follow the semantics of Kerberos tokens: + +* A "renewable period (equivalent to TGT's "lifetime") which is for how long the DT is valid + before it requires renewal. +* A "max lifetime" (equivalent to TGT's "renewable life") which is for how long the DT can be + renewed. + +Once the token reaches its "max lifetime", a new one needs to be created by contacting the +appropriate service, restarting the above process. + + +## DT Renewal, Renewers, and YARN + +This is the most confusing part of DT handling, and part of it is because much of the system was +designed with MapReduce, and later YARN, in mind. + +As seen above, DTs need to be renewed periodically until they finally expire for good. An example of +this is the default configuration of HDFS services: delegation tokens are valid for up to 7 days, +and need to be renewed every 24 hours. If 24 hours pass wit
[GitHub] [spark] ulysses-you commented on pull request #35135: [SPARK-35442][SQL] Support propagate empty relation through aggregate
ulysses-you commented on pull request #35135: URL: https://github.com/apache/spark/pull/35135#issuecomment-1007463332 ``` [info] - SPARK-37578: Update output metrics from Datasource v2 *** FAILED *** (65 milliseconds) [info] 123 did not equal 246 (SQLAppStatusListenerSuite.scala:936) ``` The failed test is not related, also find in [https://github.com/apache/spark/pull/35127](https://github.com/HyukjinKwon/spark/runs/4737688471?check_suite_focus=true). cc @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #35126: [SPARK-37836][PYTHON][INFRA] Enable F841, E722, E305 and E226 for PEP 8 compliance
HyukjinKwon closed pull request #35126: URL: https://github.com/apache/spark/pull/35126 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #35126: [SPARK-37836][PYTHON][INFRA] Enable F841, E722, E305 and E226 for PEP 8 compliance
HyukjinKwon commented on pull request #35126: URL: https://github.com/apache/spark/pull/35126#issuecomment-1007461103 Merged to master. Thanks guys! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #35130: [SPARK-37839][SQL] DS V2 supports partial aggregate push-down AVG
cloud-fan commented on pull request #35130: URL: https://github.com/apache/spark/pull/35130#issuecomment-1007449447 What's the new algorithm? It's simple before: if agg funcs contain `GeneralAggregateFunc`, don't try partial pushdown. Othwewise, try complete pushdown first, then partial pushdown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zero323 opened a new pull request #35136: [WIP][SPARK-37418][PYTHON][ML] Inline annotations for pyspark.ml.param.__init__.py
zero323 opened a new pull request #35136: URL: https://github.com/apache/spark/pull/35136 ### What changes were proposed in this pull request? Migration of type hints from `pyspark.ml.param.__init__.pyi` to `pyspark.ml.param.__init__.py`. ### Why are the changes needed? Part of ongoing migration of type stubs to inline annotations. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35130: [SPARK-37839][SQL] DS V2 supports partial aggregate push-down AVG
cloud-fan commented on a change in pull request #35130: URL: https://github.com/apache/spark/pull/35130#discussion_r780295489 ## File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala ## @@ -223,7 +225,11 @@ abstract class JdbcDialect extends Serializable with Logging{ case f: GeneralAggregateFunc if f.name() == "AVG" => assert(f.inputs().length == 1) val distinct = if (f.isDistinct) "DISTINCT " else "" -Some(s"AVG($distinct${f.inputs().head})") +if (supportCompletePushDown) { + Some(s"AVG($distinct${f.inputs().head})") +} else { + Some(s"Sum($distinct${f.inputs().head}), Count($distinct${f.inputs().head})") Review comment: why do we need it? It's Spark's responsibility to rewrite avg to sum/count, not the data source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35130: [SPARK-37839][SQL] DS V2 supports partial aggregate push-down AVG
cloud-fan commented on a change in pull request #35130: URL: https://github.com/apache/spark/pull/35130#discussion_r780294560 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ## @@ -212,7 +224,10 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { private def supportPartialAggPushDown(agg: Aggregation): Boolean = { // We don't know the agg buffer of `GeneralAggregateFunc`, so can't do partial agg push down. -agg.aggregateExpressions().forall(!_.isInstanceOf[GeneralAggregateFunc]) +agg.aggregateExpressions().forall { aggregateFunc => + !aggregateFunc.isInstanceOf[GeneralAggregateFunc] || +aggregateFunc.asInstanceOf[GeneralAggregateFunc].name() == "AVG" Review comment: If `AVG` can be partially pushed, I'd like to create a dedicated class for it, instead of using `GeneralAggregateFunc`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #34464: [SPARK-37193][SQL] DynamicJoinSelection.shouldDemoteBroadcastHashJoin should not apply to outer joins
cloud-fan commented on pull request #34464: URL: https://github.com/apache/spark/pull/34464#issuecomment-1007438535 I'm reverting this to avoid mistakenly releasing a performance regression in Spark 3.3. Please resubmit this PR with SPARK-37753 resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35131: [SPARK-37827][SQL] Put the some built-in table properties into V1Table.propertie to adapt to V2 command
cloud-fan commented on a change in pull request #35131: URL: https://github.com/apache/spark/pull/35131#discussion_r780287813 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala ## @@ -55,7 +56,8 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table { } } - override lazy val properties: util.Map[String, String] = v1Table.properties.asJava + override lazy val properties: util.Map[String, String] = +V1Table.filterTableProperties(v1Table).asJava Review comment: I think we should generate these v2 table properties here, instead of when creating the table. It's a breaking change to generate v2 properties when creating tables, as it changes what to store in the metastore, which may have unknown consequences. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35131: [SPARK-37827][SQL] Put the some built-in table properties into V1Table.propertie to adapt to V2 command
cloud-fan commented on a change in pull request #35131: URL: https://github.com/apache/spark/pull/35131#discussion_r780285665 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ## @@ -384,9 +384,12 @@ case class CatalogTable( def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { val map = new mutable.LinkedHashMap[String, String]() val tableProperties = properties - .filterKeys(!_.startsWith(VIEW_PREFIX)) + .filter(!_._1.startsWith(VIEW_PREFIX)) + .filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1)) + .filter(!_._1.startsWith(TableCatalog.OPTION_PREFIX)) + .filter(_._1 != TableCatalog.PROP_EXTERNAL) .toSeq.sortBy(_._1) - .map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") Review comment: why this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35131: [SPARK-37827][SQL] Put the some built-in table properties into V1Table.propertie to adapt to V2 command
cloud-fan commented on a change in pull request #35131: URL: https://github.com/apache/spark/pull/35131#discussion_r780285249 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ## @@ -384,9 +384,12 @@ case class CatalogTable( def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { val map = new mutable.LinkedHashMap[String, String]() val tableProperties = properties - .filterKeys(!_.startsWith(VIEW_PREFIX)) + .filter(!_._1.startsWith(VIEW_PREFIX)) + .filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1)) + .filter(!_._1.startsWith(TableCatalog.OPTION_PREFIX)) + .filter(_._1 != TableCatalog.PROP_EXTERNAL) Review comment: hmm, isn't `PROP_EXTERNAL` included in `TABLE_RESERVED_PROPERTIES`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Peng-Lei commented on a change in pull request #35107: [SPARK-37818][DOCS] Add option in the description document for show create table
Peng-Lei commented on a change in pull request #35107: URL: https://github.com/apache/spark/pull/35107#discussion_r780269024 ## File path: docs/sql-ref-syntax-aux-show-create-table.md ## @@ -26,7 +26,7 @@ license: | ### Syntax ```sql -SHOW CREATE TABLE table_identifier +SHOW CREATE TABLE table_identifier [ AS SERDE ] Review comment: ok, I will do it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Peng-Lei commented on a change in pull request #35107: [SPARK-37818][DOCS] Add option in the description document for show create table
Peng-Lei commented on a change in pull request #35107: URL: https://github.com/apache/spark/pull/35107#discussion_r780268917 ## File path: sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala ## @@ -1952,6 +1952,22 @@ class DataSourceV2SQLSuite } } + test("xxx") { Review comment: Sorry. remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.
gaborgsomogyi commented on a change in pull request #23348: URL: https://github.com/apache/spark/pull/23348#discussion_r780256850 ## File path: core/src/main/scala/org/apache/spark/deploy/security/README.md ## @@ -0,0 +1,249 @@ +# Delegation Token Handling In Spark + +This document aims to explain and demystify delegation tokens as they are used by Spark, since +this topic is generally a huge source of confusion. + + +## What are delegation tokens and why use them? + +Delegation tokens (DTs from now on) are authentication tokens used by some services to replace +Kerberos service tokens. Many services in the Hadoop ecosystem have support for DTs, since they +have some very desirable advantages over Kerberos tokens: + +* No need to distribute Kerberos credentials + +In a distributed application, distributing Kerberos credentials is tricky. Not all users have +keytabs, and when they do, it's generally frowned upon to distribute them over the network as +part of application data. + +DTs allow for a single place (e.g. the Spark driver) to require Kerberos credentials. That entity +can then distribute the DTs to other parts of the distributed application (e.g. Spark executors), +so they can authenticate to services. + +* A single token per service is used for authentication + +If Kerberos authentication were used, each client connection to a server would require a trip +to the KDC and generation of a service ticket. In a distributed system, the number of service +tickets can balloon pretty quickly when you think about the number of client processes (e.g. Spark +executors) vs. the number of service processes (e.g. HDFS DataNodes). That generates unnecessary +extra load on the KDC, and may even run into usage limits set up by the KDC admin. + +* DTs are only used for authentication + +DTs, unlike TGTs, can only be used to authenticate to the specific service for which they were +issued. You cannot use an existing DT to create new DTs or to create DTs for a different service. + +So in short, DTs are *not* Kerberos tokens. They are used by many services to replace Kerberos +authentication, or even other forms of authentication, although there is nothing (aside from +maybe implementation details) that ties them to Kerberos or any other authentication mechanism. + + +## Lifecycle of DTs + +DTs, unlike Kerberos tokens, are service-specific. There is no centralized location you contact +to create a DT for a service. So, the first step needed to get a DT is being able to authenticate +to the service in question. In the Hadoop ecosystem, that is generally done using Kerberos. + +This requires Kerberos credentials to be available somewhere for the application to use. The user +is generally responsible for providing those credentials, which is most commonly done by logging +in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" containing a TGT +(ticket granting ticket), which can then be used to request service tickets. + +There are other ways of obtaining TGTs, but, ultimately, you need a TGT to bootstrap the process. + +Once a TGT is available, the target service's client library can then be used to authenticate +to the service using the Kerberos credentials, and request the creation of a delegation token. +This token can now be sent to other processes and used to authenticate to different daemons +belonging to that service. + +And thus the first drawback of DTs becomes apparent: you need service-specific logic to create and +use them. + +Spark implements a (somewhat) pluggable, internal DT creation API. Support for new services can be +added by implementing a `HadoopDelegationTokenProvider` that is then called by Spark when generating +delegation tokens for an application. Spark makes the DTs available to code by stashing them in the +`UserGroupInformation` credentials, and it's up to the DT provider and the respective client library +to agree on how to use those tokens. + +Once they are created, the semantics of how DTs operate are also service-specific. But, in general, +they try to follow the semantics of Kerberos tokens: + +* A "renewable period (equivalent to TGT's "lifetime") which is for how long the DT is valid + before it requires renewal. +* A "max lifetime" (equivalent to TGT's "renewable life") which is for how long the DT can be + renewed. + +Once the token reaches its "max lifetime", a new one needs to be created by contacting the +appropriate service, restarting the above process. + + +## DT Renewal, Renewers, and YARN + +This is the most confusing part of DT handling, and part of it is because much of the system was +designed with MapReduce, and later YARN, in mind. + +As seen above, DTs need to be renewed periodically until they finally expire for good. An example of +this is the default configuration of HDFS services: delegation tokens are valid for up to 7 days, +and need to be renewed every 24 hours. If 24 hours pass
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.
gaborgsomogyi commented on a change in pull request #23348: URL: https://github.com/apache/spark/pull/23348#discussion_r780256850 ## File path: core/src/main/scala/org/apache/spark/deploy/security/README.md ## @@ -0,0 +1,249 @@ +# Delegation Token Handling In Spark + +This document aims to explain and demystify delegation tokens as they are used by Spark, since +this topic is generally a huge source of confusion. + + +## What are delegation tokens and why use them? + +Delegation tokens (DTs from now on) are authentication tokens used by some services to replace +Kerberos service tokens. Many services in the Hadoop ecosystem have support for DTs, since they +have some very desirable advantages over Kerberos tokens: + +* No need to distribute Kerberos credentials + +In a distributed application, distributing Kerberos credentials is tricky. Not all users have +keytabs, and when they do, it's generally frowned upon to distribute them over the network as +part of application data. + +DTs allow for a single place (e.g. the Spark driver) to require Kerberos credentials. That entity +can then distribute the DTs to other parts of the distributed application (e.g. Spark executors), +so they can authenticate to services. + +* A single token per service is used for authentication + +If Kerberos authentication were used, each client connection to a server would require a trip +to the KDC and generation of a service ticket. In a distributed system, the number of service +tickets can balloon pretty quickly when you think about the number of client processes (e.g. Spark +executors) vs. the number of service processes (e.g. HDFS DataNodes). That generates unnecessary +extra load on the KDC, and may even run into usage limits set up by the KDC admin. + +* DTs are only used for authentication + +DTs, unlike TGTs, can only be used to authenticate to the specific service for which they were +issued. You cannot use an existing DT to create new DTs or to create DTs for a different service. + +So in short, DTs are *not* Kerberos tokens. They are used by many services to replace Kerberos +authentication, or even other forms of authentication, although there is nothing (aside from +maybe implementation details) that ties them to Kerberos or any other authentication mechanism. + + +## Lifecycle of DTs + +DTs, unlike Kerberos tokens, are service-specific. There is no centralized location you contact +to create a DT for a service. So, the first step needed to get a DT is being able to authenticate +to the service in question. In the Hadoop ecosystem, that is generally done using Kerberos. + +This requires Kerberos credentials to be available somewhere for the application to use. The user +is generally responsible for providing those credentials, which is most commonly done by logging +in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" containing a TGT +(ticket granting ticket), which can then be used to request service tickets. + +There are other ways of obtaining TGTs, but, ultimately, you need a TGT to bootstrap the process. + +Once a TGT is available, the target service's client library can then be used to authenticate +to the service using the Kerberos credentials, and request the creation of a delegation token. +This token can now be sent to other processes and used to authenticate to different daemons +belonging to that service. + +And thus the first drawback of DTs becomes apparent: you need service-specific logic to create and +use them. + +Spark implements a (somewhat) pluggable, internal DT creation API. Support for new services can be +added by implementing a `HadoopDelegationTokenProvider` that is then called by Spark when generating +delegation tokens for an application. Spark makes the DTs available to code by stashing them in the +`UserGroupInformation` credentials, and it's up to the DT provider and the respective client library +to agree on how to use those tokens. + +Once they are created, the semantics of how DTs operate are also service-specific. But, in general, +they try to follow the semantics of Kerberos tokens: + +* A "renewable period (equivalent to TGT's "lifetime") which is for how long the DT is valid + before it requires renewal. +* A "max lifetime" (equivalent to TGT's "renewable life") which is for how long the DT can be + renewed. + +Once the token reaches its "max lifetime", a new one needs to be created by contacting the +appropriate service, restarting the above process. + + +## DT Renewal, Renewers, and YARN + +This is the most confusing part of DT handling, and part of it is because much of the system was +designed with MapReduce, and later YARN, in mind. + +As seen above, DTs need to be renewed periodically until they finally expire for good. An example of +this is the default configuration of HDFS services: delegation tokens are valid for up to 7 days, +and need to be renewed every 24 hours. If 24 hours pass
[GitHub] [spark] zero323 commented on pull request #35124: [WIP][SPARK-37398][PYTHON] Inline type hints for python/pyspark/ml/classification.py
zero323 commented on pull request #35124: URL: https://github.com/apache/spark/pull/35124#issuecomment-1007376991 Thanks @javierivanov. Let's revisit this after all prerequisites are done (core for starters, but there are also internal dependencies that have to be followed). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you opened a new pull request #35135: [SPARK-35442][SQL] Support propagate empty relation through aggregate
ulysses-you opened a new pull request #35135: URL: https://github.com/apache/spark/pull/35135 ### What changes were proposed in this pull request? - Add `LogicalQueryStage(_, agg: BaseAggregateExec)` check in `AQEPropagateEmptyRelation` - Add `LeafNode` check in `PropagateEmptyRelationBase`, so we can eliminate `LogicalQueryStage` to `LocalRelation` ### Why are the changes needed? The Aggregate in AQE is different with others, the `LogicalQueryStage` looks like `LogicalQueryStage(Aggregate, BaseAggregate)`. We should handle this case specially. Logically, if the Aggregate grouping expression is not empty, we can eliminate it safely. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add new test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Peng-Lei commented on pull request #35131: [SPARK-37827][SQL] Put the some built-in table properties into V1Table.propertie to adapt to V2 command
Peng-Lei commented on pull request #35131: URL: https://github.com/apache/spark/pull/35131#issuecomment-1007364208 @cloud-fan @imback82 @huaxingao Could you take a look? Thank you very much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError
pan3793 edited a comment on pull request #35076: URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842 Hi @otterc I got more information for this issue. Add assertion and debug log in `RemoteBlockPushResolver`(ESS side) ```java public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { ... for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) { synchronized (partition) { try { // This can throw IOException which will marks this shuffle partition as not merged. partition.finalizePartition(); bitmaps.add(partition.mapTracker); reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFilesAndDeleteIfNeeded(false); } } + assert partition.dataFile.length() == partition.lastChunkOffset; + assert partition.indexFile.file.length() == partition.indexFile.getPos(); + assert partition.metaFile.file.length() == partition.metaFile.getPos(); + logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}", + msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId, + partition.indexFile.getPos() / 8 - 1, + partition.metaFile.getPos(), + partition.lastChunkOffset); } mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId, bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } ... } ``` Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side) ```scala override def getMergedBlockData( blockId: ShuffleMergedBlockId, dirs: Option[Array[String]]): Seq[ManagedBuffer] = { val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) // Load all the indexes in order to identify all chunks in the specified merged shuffle file. val size = indexFile.length.toInt val offsets = Utils.tryWithResource { new DataInputStream(Files.newInputStream(indexFile.toPath)) } { dis => val buffer = ByteBuffer.allocate(size) dis.readFully(buffer.array) buffer.asLongBuffer } // Number of chunks is number of indexes - 1 val numChunks = size / 8 - 1 + if (numChunks == 0) { + val indexBackupPath = java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}") + val dataBackupPath = java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}") + val metaBackupPath = java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}") + logError(s"$blockId chunk_size is 0, " + + s"index_file is $indexFile, backup to $indexBackupPath" + + s"data_file is $dataFile, backup to $dataBackupPath" + + s"meta_file is $metaFile, backup to $metaBackupPath") + Files.copy(indexFile.toPath, indexBackupPath) + Files.copy(dataFile.toPath, dataBackupPath) + Files.copy(metaFile.toPath, metaBackupPath) + assert(false) } for (index <- 0 until numChunks) yield { new FileSegmentManagedBuffer(transportConf, dataFile, offsets.get(index), offsets.get(index + 1) - offsets.get(index)) } } ``` Then I run TPCDS several rounds and reproduce the exception. Assertion failed in reduce task side. ```log 01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 executor 562): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) at org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504) at org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85) at org.apache.spark.util.Completi
[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError
pan3793 edited a comment on pull request #35076: URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842 Hi @otterc I got more information for this issue. Add assertion and debug log in `RemoteBlockPushResolver`(ESS side) ```java public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { ... for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) { synchronized (partition) { try { // This can throw IOException which will marks this shuffle partition as not merged. partition.finalizePartition(); bitmaps.add(partition.mapTracker); reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFilesAndDeleteIfNeeded(false); } } + assert partition.dataFile.length() == partition.lastChunkOffset; + assert partition.indexFile.file.length() == partition.indexFile.getPos(); + assert partition.metaFile.file.length() == partition.metaFile.getPos(); + logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}", + msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId, + partition.indexFile.getPos() / 8 - 1, + partition.metaFile.getPos(), + partition.lastChunkOffset); } mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId, bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } ... } ``` ``` 2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157 ``` Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side) ```scala override def getMergedBlockData( blockId: ShuffleMergedBlockId, dirs: Option[Array[String]]): Seq[ManagedBuffer] = { val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) // Load all the indexes in order to identify all chunks in the specified merged shuffle file. val size = indexFile.length.toInt val offsets = Utils.tryWithResource { new DataInputStream(Files.newInputStream(indexFile.toPath)) } { dis => val buffer = ByteBuffer.allocate(size) dis.readFully(buffer.array) buffer.asLongBuffer } // Number of chunks is number of indexes - 1 val numChunks = size / 8 - 1 + if (numChunks == 0) { + val indexBackupPath = java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}") + val dataBackupPath = java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}") + val metaBackupPath = java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}") + logError(s"$blockId chunk_size is 0, " + + s"index_file is $indexFile, backup to $indexBackupPath" + + s"data_file is $dataFile, backup to $dataBackupPath" + + s"meta_file is $metaFile, backup to $metaBackupPath") + Files.copy(indexFile.toPath, indexBackupPath) + Files.copy(dataFile.toPath, dataBackupPath) + Files.copy(metaFile.toPath, metaBackupPath) + assert(false) } for (index <- 0 until numChunks) yield { new FileSegmentManagedBuffer(transportConf, dataFile, offsets.get(index), offsets.get(index + 1) - offsets.get(index)) } } ``` Then I run TPCDS several rounds and reproduce the exception. ```log 01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 executor 562): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) at org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504) at org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945
[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError
pan3793 edited a comment on pull request #35076: URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842 Hi @otterc I got more information for this issue. Add assertion and debug log in `RemoteBlockPushResolver`(ESS side) ```java public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { ... for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) { synchronized (partition) { try { // This can throw IOException which will marks this shuffle partition as not merged. partition.finalizePartition(); bitmaps.add(partition.mapTracker); reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFilesAndDeleteIfNeeded(false); } } + assert partition.dataFile.length() == partition.lastChunkOffset; + assert partition.indexFile.file.length() == partition.indexFile.getPos(); + assert partition.metaFile.file.length() == partition.metaFile.getPos(); + logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}", + msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId, + partition.indexFile.getPos() / 8 - 1, + partition.metaFile.getPos(), + partition.lastChunkOffset); +} mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId, bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } ... } ``` ``` 2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157 ``` Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side) ```scala override def getMergedBlockData( blockId: ShuffleMergedBlockId, dirs: Option[Array[String]]): Seq[ManagedBuffer] = { val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) // Load all the indexes in order to identify all chunks in the specified merged shuffle file. val size = indexFile.length.toInt val offsets = Utils.tryWithResource { new DataInputStream(Files.newInputStream(indexFile.toPath)) } { dis => val buffer = ByteBuffer.allocate(size) dis.readFully(buffer.array) buffer.asLongBuffer } // Number of chunks is number of indexes - 1 val numChunks = size / 8 - 1 + if (numChunks == 0) { + val indexBackupPath = java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}") + val dataBackupPath = java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}") + val metaBackupPath = java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}") + logError(s"$blockId chunk_size is 0, " + + s"index_file is $indexFile, backup to $indexBackupPath" + + s"data_file is $dataFile, backup to $dataBackupPath" + + s"meta_file is $metaFile, backup to $metaBackupPath") + Files.copy(indexFile.toPath, indexBackupPath) + Files.copy(dataFile.toPath, dataBackupPath) + Files.copy(metaFile.toPath, metaBackupPath) + assert(false) } for (index <- 0 until numChunks) yield { new FileSegmentManagedBuffer(transportConf, dataFile, offsets.get(index), offsets.get(index + 1) - offsets.get(index)) } } ``` Then I run TPCDS several rounds and reproduce the exception. ```log 01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 executor 562): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) at org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504) at org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945
[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError
pan3793 commented on pull request #35076: URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842 Hi @otterc I got more information for this issue. Add assertion and debug log in `RemoteBlockPushResolver`(ESS side) ```java public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { ... for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) { synchronized (partition) { try { // This can throw IOException which will marks this shuffle partition as not merged. partition.finalizePartition(); bitmaps.add(partition.mapTracker); reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFilesAndDeleteIfNeeded(false); } } assert partition.dataFile.length() == partition.lastChunkOffset; assert partition.indexFile.file.length() == partition.indexFile.getPos(); assert partition.metaFile.file.length() == partition.metaFile.getPos(); logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}", msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId, partition.indexFile.getPos() / 8 - 1, partition.metaFile.getPos(), partition.lastChunkOffset); } mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId, bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } ... } ``` ``` 2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157 ``` Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side) ```scala override def getMergedBlockData( blockId: ShuffleMergedBlockId, dirs: Option[Array[String]]): Seq[ManagedBuffer] = { val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId, blockId.reduceId, dirs) // Load all the indexes in order to identify all chunks in the specified merged shuffle file. val size = indexFile.length.toInt val offsets = Utils.tryWithResource { new DataInputStream(Files.newInputStream(indexFile.toPath)) } { dis => val buffer = ByteBuffer.allocate(size) dis.readFully(buffer.array) buffer.asLongBuffer } // Number of chunks is number of indexes - 1 val numChunks = size / 8 - 1 if (numChunks == 0) { val indexBackupPath = java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}") val dataBackupPath = java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}") val metaBackupPath = java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}") logError(s"$blockId chunk_size is 0, " + s"index_file is $indexFile, backup to $indexBackupPath" + s"data_file is $dataFile, backup to $dataBackupPath" + s"meta_file is $metaFile, backup to $metaBackupPath") Files.copy(indexFile.toPath, indexBackupPath) Files.copy(dataFile.toPath, dataBackupPath) Files.copy(metaFile.toPath, metaBackupPath) assert(false) } for (index <- 0 until numChunks) yield { new FileSegmentManagedBuffer(transportConf, dataFile, offsets.get(index), offsets.get(index + 1) - offsets.get(index)) } } ``` Then I run TPCDS several rounds and reproduce the exception. ```log 01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 executor 562): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) at org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504) at org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
[GitHub] [spark] zero323 commented on pull request #35126: [SPARK-37836][PYTHON][INFRA] Enable F841, E722, E305 and E226 for PEP 8 compliance
zero323 commented on pull request #35126: URL: https://github.com/apache/spark/pull/35126#issuecomment-1007350779 E305, E226 should be covered by black, if enabled for particular path, but there is no harm in adding them. Personally, I have mixed feelings about E722 ‒ using `BaseException` or `Exception` doesn't really seem to add anything. But I guess there is no harm in that either :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon edited a comment on pull request #35121: [SPARK-37833][INFRA] Add `precondition` job to skip the main GitHub Action jobs
HyukjinKwon edited a comment on pull request #35121: URL: https://github.com/apache/spark/pull/35121#issuecomment-1007292642 Oops, the test is broken in the commits e.g., https://github.com/apache/spark/runs/4737174642?check_suite_focus=true (PRs are not affected). I made a quick followup but seems like we need some more fixes in `is-changed.py` script .. let me revert this and my own followup for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #35121: [SPARK-37833][INFRA] Add `precondition` job to skip the main GitHub Action jobs
HyukjinKwon commented on pull request #35121: URL: https://github.com/apache/spark/pull/35121#issuecomment-1007292642 Oops, the test is broken in the commits (PRs are not affected). I made a quick followup but seems like we need some more fixes in `is-changed.py` script .. let me revert this and my own followup for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #35133: [SPARK-37833][INFRA][FOLLOW-UP] Run checking modules of precondition only in forked repository
HyukjinKwon closed pull request #35133: URL: https://github.com/apache/spark/pull/35133 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #35133: [SPARK-37833][INFRA][FOLLOW-UP] Run checking modules of precondition only in forked repository
HyukjinKwon commented on pull request #35133: URL: https://github.com/apache/spark/pull/35133#issuecomment-1007285278 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #35133: [SPARK-37833][INFRA][FOLLOW-UP] Run checking modules of precondition only in forked repository
HyukjinKwon commented on pull request #35133: URL: https://github.com/apache/spark/pull/35133#issuecomment-1007284852 cc @dongjoon-hyun, let me just merge this to fix up the build. If this doesn't work, I will just revert this and #35121. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request #35134: [SPARK-37842][SQL][MINIOR] Use `multi-catch` to simplify duplicate exception handling behavior in Java code
LuciferYang opened a new pull request #35134: URL: https://github.com/apache/spark/pull/35134 ### What changes were proposed in this pull request? Java 7 began to support the `Multiple exception catching`, This pr uses it to simplify exception handling in Java code. ### Why are the changes needed? Code simplification ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GA -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon opened a new pull request #35133: [SPARK-37833][INFRA][FOLLOW-UP] Run checking modules of precondition only in forked repository
HyukjinKwon opened a new pull request #35133: URL: https://github.com/apache/spark/pull/35133 ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/35121. We should run "Check all modules" in precondition job only in the forked repository because `is-changed.py` requires `APACHE_SPARK_REF` to be set: https://github.com/apache/spark/blob/master/dev/is-changed.py#L60 ### Why are the changes needed? To fix broken build in main branch. PRs are not affected. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Should be merged to test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org