[GitHub] [spark] zhongjingxiong opened a new pull request #35142: [SPARK-37708][K8S] Provides basic images that support centos7

2022-01-07 Thread GitBox


zhongjingxiong opened a new pull request #35142:
URL: https://github.com/apache/spark/pull/35142


   
   
   ### What changes were proposed in this pull request?
   
   To adapt to the production environment, CentOS 7 basic image is provided.
   
   ### Why are the changes needed?
   
   If we compile Python on centos and run this package on K8S, I submit it this 
way:
   `spark-submit  \
   --archives s3a://zhongjingxiong/python3.6.9.tgz#python3.6 \
   --conf spark.pyspark.driver.python=python3.6/bin/python3 \
   --conf spark.pyspark.python=python3.6/bin/python3 \
   examples/src/main/python/pi.py 10 `
   
   We will report dependency loss.
   `Unpacking an archive s3a://zhongjingxiong/python3.6.9.tgz#python3 from 
/tmp/spark-0002ed81-3a01-4e40-a9b7-826295b86ece/python3.6.9.tgz to 
/opt/spark/work-dir/./python3
   Traceback (most recent call last):
 File "/tmp/spark-0002ed81-3a01-4e40-a9b7-826295b86ece/pi.py", line 21, in 

   from pyspark.sql import SparkSession
 File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 121, in 

 File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/__init__.py", line 42, 
in 
 File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 27, in 

   async def _ag():
 File "/opt/spark/work-dir/python3/lib/python3.6/ctypes/__init__.py", line 
7, in 
   from _ctypes import Union, Structure, Array
   ImportError: libffi.so.6: cannot open shared object file: No such file or 
directory
   22/01/07 22:56:11 INFO ShutdownHookManager: Shutdown hook called`
   
   But it works fine when running on YARN. Therefore, it is better to provide 
an image that supports centos to run Spark.
   ### Does this PR introduce _any_ user-facing change?
   
   Yes,this is a new docker file exposed to the customer.
   
   ### How was this patch tested?
   
   It has been deployed in the production environment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhouyejoe commented on pull request #34083: Add docs about using Shiv for packaging (similar to PEX)

2022-01-07 Thread GitBox


zhouyejoe commented on pull request #34083:
URL: https://github.com/apache/spark/pull/34083#issuecomment-1007900746


   @mridulm I think we can reopen this until we have done enough testing around 
using Shiv here. Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on pull request #34083: Add docs about using Shiv for packaging (similar to PEX)

2022-01-07 Thread GitBox


mridulm commented on pull request #34083:
URL: https://github.com/apache/spark/pull/34083#issuecomment-1007900266


   Looks like this got dropped from our review radar @HyukjinKwon.
   Do you think this is valid ? If yes, we can reopen and review ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wzhfy commented on pull request #35137: [TEST] Fix test logic by getting sleep interval in task

2022-01-07 Thread GitBox


wzhfy commented on pull request #35137:
URL: https://github.com/apache/spark/pull/35137#issuecomment-1007888966


   cc @Ngone51 @agrawaldevesh, could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wzhfy removed a comment on pull request #35137: [TEST] Fix test logic by getting sleep interval in task

2022-01-07 Thread GitBox


wzhfy removed a comment on pull request #35137:
URL: https://github.com/apache/spark/pull/35137#issuecomment-1007875631


   cc @Ngone51 @agrawaldevesh


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wzhfy commented on pull request #35137: [TEST] Fix test logic by getting sleep interval in task

2022-01-07 Thread GitBox


wzhfy commented on pull request #35137:
URL: https://github.com/apache/spark/pull/35137#issuecomment-1007875631


   cc @Ngone51 @agrawaldevesh


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #35141: [SPARK-37843][CORE] Suppress NoSuchFieldError at setMDCForTask

2022-01-07 Thread GitBox


dongjoon-hyun closed pull request #35141:
URL: https://github.com/apache/spark/pull/35141


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #35141: [SPARK-37843][CORE] Suppress NoSuchFieldError at setMDCForTask

2022-01-07 Thread GitBox


dongjoon-hyun commented on pull request #35141:
URL: https://github.com/apache/spark/pull/35141#issuecomment-1007867336


   Thank you, @viirya . The following is the reproducer in Java 
17/Maven/AppleSilicon combination. 
   Java 17/SBT/AppleSilicon works fine without this issue.
   
   ```
   $ build/mvn -Dtest=none 
-DwildcardSuites=org.apache.spark.deploy.SparkSubmitSuite test
   ...
   SparkSubmitSuite:
   - prints usage on empty input
   - prints usage with only --help
   - prints error with unrecognized options
   - handle binary specified but not class
   - handles arguments with --key=val
   - handles arguments to user program
   - handles arguments to user program with name collision
   - print the right queue name
   - SPARK-24241: do not fail fast if executor num is 0 when dynamic allocation 
is enabled
   - specify deploy mode through configuration
   - handles YARN cluster mode
   - handles YARN client mode
   - SPARK-33530: handles standalone mode with archives
   - handles standalone cluster mode
   - handles legacy standalone cluster mode
   - handles standalone client mode
   - handles mesos client mode
   - handles k8s cluster mode
   - automatically sets mainClass if primary resource is S3 JAR in client mode
   - automatically sets mainClass if primary resource is S3 JAR in cluster mode
   - error informatively when mainClass isn't set and S3 JAR doesn't exist
   - handles confs with flag equivalents
   - SPARK-21568 ConsoleProgressBar should be enabled only in shells
   2022-01-07 16:54:31.508 - stderr> SLF4J: Class path contains multiple SLF4J 
bindings.
   2022-01-07 16:54:31.508 - stderr> SLF4J: Found binding in 
[jar:file:/Users/dongjoon/.m2/repository/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   2022-01-07 16:54:31.508 - stderr> SLF4J: Found binding in 
[jar:file:/Users/dongjoon/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.17.1/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   2022-01-07 16:54:31.509 - stderr> SLF4J: See 
http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
   2022-01-07 16:54:31.509 - stderr> SLF4J: Actual binding is of type 
[org.slf4j.impl.Log4jLoggerFactory]
   2022-01-07 16:54:32.659 - stderr> Exception in thread "Executor task launch 
worker-0" java.lang.NoSuchFieldError: mdc
   2022-01-07 16:54:32.659 - stderr>at 
org.apache.log4j.MDCFriend.fixForJava9(MDCFriend.java:11)
   2022-01-07 16:54:32.659 - stderr>at 
org.slf4j.impl.Log4jMDCAdapter.(Log4jMDCAdapter.java:38)
   2022-01-07 16:54:32.659 - stderr>at 
org.slf4j.impl.StaticMDCBinder.getMDCA(StaticMDCBinder.java:59)
   2022-01-07 16:54:32.659 - stderr>at 
org.slf4j.MDC.bwCompatibleGetMDCAdapterFromBinder(MDC.java:99)
   2022-01-07 16:54:32.659 - stderr>at org.slf4j.MDC.(MDC.java:108)
   2022-01-07 16:54:32.659 - stderr>at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$setMDCForTask(Executor.scala:750)
   2022-01-07 16:54:32.659 - stderr>at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:441)
   2022-01-07 16:54:32.659 - stderr>at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
   2022-01-07 16:54:32.659 - stderr>at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
   2022-01-07 16:54:32.659 - stderr>at 
java.base/java.lang.Thread.run(Thread.java:833)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] c21 commented on a change in pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys

2022-01-07 Thread GitBox


c21 commented on a change in pull request #35138:
URL: https://github.com/apache/spark/pull/35138#discussion_r780617335



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -396,6 +396,16 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS =
+buildConf("spark.sql.join.requireAllJoinKeysAsPartitionKeys")

Review comment:
   would this config take effect for all physical operators having 2 
children and requiring `ClusteredDistribution`? Example like `CoGroupExec`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] c21 commented on a change in pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys

2022-01-07 Thread GitBox


c21 commented on a change in pull request #35138:
URL: https://github.com/apache/spark/pull/35138#discussion_r780605984



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##
@@ -451,30 +468,24 @@ case class HashShuffleSpec(
   false
   }
 
-  override def canCreatePartitioning: Boolean = true
+  override def canCreatePartitioning: Boolean = {
+// To avoid potential data skew, we don't allow `HashShuffleSpec` to 
create partitioning if
+// the hash partition keys are not the full join keys (the cluster keys). 
Then the planner
+// will add shuffles with the default partitioning of 
`ClusteredDistribution`, which uses all
+// the join keys.
+if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS)) {
+  distribution.clustering.forall(x => 
partitioning.expressions.exists(_.semanticEquals(x)))

Review comment:
   Do we need to require `partitioning.expressions` to be exactly same with 
`distribution.clustering` as well? e.g. for followed cases:
   
   ```
   partitioning.expressions: [a, b]
   distribution.clustering: [b, a]
   ```
   
   ```
   partitioning.expressions: [a, b, a]
   distribution.clustering: [a, b]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #35130: [SPARK-37839][SQL] DS V2 supports partial aggregate push-down AVG

2022-01-07 Thread GitBox


beliefer commented on pull request #35130:
URL: https://github.com/apache/spark/pull/35130#issuecomment-1007849173


   > What's the new algorithm? It's simple before: if agg funcs contain 
`GeneralAggregateFunc`, don't try partial pushdown. Othwewise, try complete 
pushdown first, then partial pushdown.
   
   Yes. But AVG is common used aggregate function, we hope try my best to 
pushdown. Maybe we could construct Arerage in DS V2, so we keep the simply.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #35130: [SPARK-37839][SQL] DS V2 supports partial aggregate push-down AVG

2022-01-07 Thread GitBox


beliefer commented on a change in pull request #35130:
URL: https://github.com/apache/spark/pull/35130#discussion_r780603780



##
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##
@@ -223,7 +225,11 @@ abstract class JdbcDialect extends Serializable with 
Logging{
   case f: GeneralAggregateFunc if f.name() == "AVG" =>
 assert(f.inputs().length == 1)
 val distinct = if (f.isDistinct) "DISTINCT " else ""
-Some(s"AVG($distinct${f.inputs().head})")
+if (supportCompletePushDown) {
+  Some(s"AVG($distinct${f.inputs().head})")
+} else {
+  Some(s"Sum($distinct${f.inputs().head}), 
Count($distinct${f.inputs().head})")

Review comment:
   If Spark cannot complete pushdown AVG to database, we still want partial 
pushdown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aa1371 commented on pull request #35083: [WIP][SPARK-37798] PySpark Pandas API: Cross and conditional merging

2022-01-07 Thread GitBox


aa1371 commented on pull request #35083:
URL: https://github.com/apache/spark/pull/35083#issuecomment-1007846355


   @HyukjinKwon - thoughts on the PR in its current state?
   
   Also, is it possible to see the specific formatting issues caught by the 
black formatter in the CI? It seems to contain some different black 
configuration than when I run it dev/format-python locally 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #35124: [WIP][SPARK-37398][PYTHON] Inline type hints for python/pyspark/ml/classification.py

2022-01-07 Thread GitBox


AmplabJenkins commented on pull request #35124:
URL: https://github.com/apache/spark/pull/35124#issuecomment-1007838795


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #35141: [SPARK-37843][CORE] Suppress NoSuchFieldError at setMDCForTask

2022-01-07 Thread GitBox


dongjoon-hyun commented on pull request #35141:
URL: https://github.com/apache/spark/pull/35141#issuecomment-1007837947


   Could you review this, @viirya and @HyukjinKwon ?
   I observed this when I run maven Jenkins test on our community M1 machine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun opened a new pull request #35141: [SPARK-37843][CORE] Suppress NoSuchFieldError at setMDCForTask

2022-01-07 Thread GitBox


dongjoon-hyun opened a new pull request #35141:
URL: https://github.com/apache/spark/pull/35141


   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #32365: [SPARK-35228][SQL] Add expression ToHiveString for keep consistent between hive/spark format in df.show

2022-01-07 Thread GitBox


github-actions[bot] closed pull request #32365:
URL: https://github.com/apache/spark/pull/32365


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #32289: [SPARK-33357][K8S] Support Spark application managing with SparkAppHandle on Kubernetes

2022-01-07 Thread GitBox


github-actions[bot] closed pull request #32289:
URL: https://github.com/apache/spark/pull/32289


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #34024: [SPARK-36784][SHUFFLE][WIP] Handle DNS issues on executor to prevent shuffle nodes from getting added to exclude list

2022-01-07 Thread GitBox


github-actions[bot] commented on pull request #34024:
URL: https://github.com/apache/spark/pull/34024#issuecomment-1007834093


   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #33878: [SPARK-36303][SQL] Refactor fourthteenth set of 20 query execution errors to use error classes

2022-01-07 Thread GitBox


github-actions[bot] closed pull request #33878:
URL: https://github.com/apache/spark/pull/33878


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #31267: [SPARK-21195][CORE] MetricSystem should pick up dynamically registered metrics in sources

2022-01-07 Thread GitBox


github-actions[bot] closed pull request #31267:
URL: https://github.com/apache/spark/pull/31267


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #34083: Add docs about using Shiv for packaging (similar to PEX)

2022-01-07 Thread GitBox


github-actions[bot] closed pull request #34083:
URL: https://github.com/apache/spark/pull/34083


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #34141: [SPARK-33887][SQL] Allow insert overwrite same table with static partition if using dynamic partition overwrite mode

2022-01-07 Thread GitBox


github-actions[bot] commented on pull request #34141:
URL: https://github.com/apache/spark/pull/34141#issuecomment-1007834075


   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests

2022-01-07 Thread GitBox


github-actions[bot] commented on pull request #33174:
URL: https://github.com/apache/spark/pull/33174#issuecomment-1007834106


   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #34108: [SPARK-36638][SQL][TEST] Generalize OptimizeSkewedJoin - correctness

2022-01-07 Thread GitBox


github-actions[bot] closed pull request #34108:
URL: https://github.com/apache/spark/pull/34108


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] yangwwei commented on pull request #34672: [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured

2022-01-07 Thread GitBox


yangwwei commented on pull request #34672:
URL: https://github.com/apache/spark/pull/34672#issuecomment-1007823318


   hi @tgravescs thanks for getting back on this one : ). 
   
   I think @attilapiros already mentioned 
   
   > But it got sidetracked and become stale.
   Based on this experience I am afraid the ideal solution is a bit far away in 
the future.
   
   @attilapiros please share your thoughts.
   
   I understand this may not be the best/elegant solution, but it is simple and 
it works.  I do not think hacking more things in the remote shuffle service 
side is better, this is a common problem that should be handled on the Spark 
side. If there is a better, actionable solution, I would love to explore more.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] shardulm94 commented on a change in pull request #33446: [SPARK-36215][SHUFFLE] Add logging for slow fetches to diagnose external shuffle service issues

2022-01-07 Thread GitBox


shardulm94 commented on a change in pull request #33446:
URL: https://github.com/apache/spark/pull/33446#discussion_r780556023



##
File path: core/src/main/scala/org/apache/spark/TestUtils.scala
##
@@ -448,6 +451,34 @@ private[spark] object TestUtils {
   EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
 file.getPath
   }
+
+  /**
+   * A log4j-specific log capturing mechanism. Provided with a class name, it 
will add a new
+   * appender to the log for that class to capture the output. log4j must be 
properly configured
+   * on the classpath (e.g., with slf4j-log4j12) for this to work. This should 
be closed when it is
+   * done to remove the temporary appender.
+   */
+  class Log4jCapture(val loggerName: String) extends AutoCloseable {

Review comment:
   Using `SparkFunSuite.withLogAppender` now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] shardulm94 commented on a change in pull request #33446: [SPARK-36215][SHUFFLE] Add logging for slow fetches to diagnose external shuffle service issues

2022-01-07 Thread GitBox


shardulm94 commented on a change in pull request #33446:
URL: https://github.com/apache/spark/pull/33446#discussion_r780519077



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1214,6 +1214,29 @@ package object config {
   .checkValue(_ > 0, "The max no. of blocks in flight cannot be 
non-positive.")
   .createWithDefault(Int.MaxValue)
 
+  private[spark] val REDUCER_SHUFFLE_FETCH_SLOW_LOG_THRESHOLD_MS =
+ConfigBuilder("spark.reducer.shuffleFetchSlowLogThreshold.time")
+  .doc("When fetching blocks from an external shuffle service is slower 
than expected, the " +
+"fetch will be logged to allow for subsequent investigation. A fetch 
is determined " +
+"to be slow if it has a total duration of at least this value, and a 
transfer rate " +
+// cannot reference val REDUCER_SHUFFLE_FETCH_SLOW_LOG_THRESHOLD_BPS 
since its uninitialized

Review comment:
   Both these confs reference each other in the doc string, so atleast one 
of these confs will have to be referenced using a string. 

##
File path: docs/configuration.md
##
@@ -2064,6 +2064,28 @@ Apart from these, the following properties are also 
available, and may be useful
   
   1.2.0
 
+
+  spark.reducer.shuffleFetchSlowLogThreshold.time
+  value of 
spark.reducer.shuffleFetchSlowLogThreshold.time

Review comment:
   Fixed

##
File path: docs/configuration.md
##
@@ -2064,6 +2064,28 @@ Apart from these, the following properties are also 
available, and may be useful
   
   1.2.0
 
+
+  spark.reducer.shuffleFetchSlowLogThreshold.time
+  value of 
spark.reducer.shuffleFetchSlowLogThreshold.time
+  
+When fetching blocks from an external shuffle service is slower than 
expected, the
+fetch will be logged to allow for subsequent investigation. A fetch is 
determined
+to be slow if it has a total duration of at least this value, and a 
transfer rate
+less than 
spark.reducer.shuffleFetchSlowLogThreshold.bytesPerSec.
+  
+  3.2.0
+
+
+  spark.reducer.shuffleFetchSlowLogThreshold.bytesPerSec
+  value of 
spark.reducer.shuffleFetchSlowLogThreshold.bytesPerSec

Review comment:
   Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sathiyapk commented on a change in pull request #34729: [SPARK-37475][SQL] Add scale parameter to floor and ceil functions

2022-01-07 Thread GitBox


sathiyapk commented on a change in pull request #34729:
URL: https://github.com/apache/spark/pull/34729#discussion_r780514098



##
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
##
@@ -658,6 +669,62 @@ class MathExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 val intResultsB: Seq[Int] = Seq(31400, 31420, 31416, 
314159000, 314159300,
   314159260) ++ Seq.fill(7)(314159265)
 
+def doubleResultsFloor(i: Int): Any = {
+  val results = Seq(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3,
+3.1, 3.14, 3.141, 3.1415, 3.14159, 3.141592)
+  if (i <= 6) results(i).toLong else results(i)
+}
+
+def doubleResultsCeil(i: Int): Any = {
+  val results = Seq(100.0, 10.0, 1.0, 1000.0, 100.0, 10.0,
+4L, 3.2, 3.15, 3.142, 3.1416, 3.1416, 3.141593)
+  if (i <= 6) results(i).toLong else results(i)
+}
+
+def floatResultsFloor(i: Int): Any = {
+  val results = Seq(0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 3L,
+3.1d, 3.14d, 3.141d, 3.1414d, 3.14149d, 3.141499d)
+  if (i <= 6) results(i).toLong else results(i)
+}
+
+def floatResultsCeil(i: Int): Any = {
+  val results = Seq(100.0f, 10.0f, 1.0f, 1000.0f, 100.0f, 
10.0f,
+4L, 3.2d, 3.15d, 3.142d, 3.1415d, 3.1415d, 3.1415d)
+  if (i <= 6) results(i).toLong else results(i)
+}
+
+def shortResultsFloor(i: Int): Any = {
+  val results = Seq[Long](0L, 0L, 3L, 31000L,
+31400L, 31410L, 31415L) ++ Seq.fill[Long](7)(31415)
+  results(i)
+}
+
+def shortResultsCeil(i: Int): Any = {

Review comment:
   Sorry, i am not sure i got it right. 
   
   Actually we test `RoundCeil` and `RoundFloor` directly for positive `scale` 
on big decimal, i can add long and double in that case, if you prefer. But we 
can't test `RoundCeil` and `RoundFloor` directly for negative `scale` because 
the casting to `Long` for negative scale happens one step before. 
   
   May be if i put the casting in the `RoundCeil` and `RoundFloor` objects and 
let the `ExpressionBuilder` call these objects like it was before, we can able 
to test `RoundCeil` and `RoundFloor` directly for both negative and positive 
`scale`. What do you say ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sathiyapk commented on a change in pull request #34729: [SPARK-37475][SQL] Add scale parameter to floor and ceil functions

2022-01-07 Thread GitBox


sathiyapk commented on a change in pull request #34729:
URL: https://github.com/apache/spark/pull/34729#discussion_r780514098



##
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
##
@@ -658,6 +669,62 @@ class MathExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 val intResultsB: Seq[Int] = Seq(31400, 31420, 31416, 
314159000, 314159300,
   314159260) ++ Seq.fill(7)(314159265)
 
+def doubleResultsFloor(i: Int): Any = {
+  val results = Seq(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3,
+3.1, 3.14, 3.141, 3.1415, 3.14159, 3.141592)
+  if (i <= 6) results(i).toLong else results(i)
+}
+
+def doubleResultsCeil(i: Int): Any = {
+  val results = Seq(100.0, 10.0, 1.0, 1000.0, 100.0, 10.0,
+4L, 3.2, 3.15, 3.142, 3.1416, 3.1416, 3.141593)
+  if (i <= 6) results(i).toLong else results(i)
+}
+
+def floatResultsFloor(i: Int): Any = {
+  val results = Seq(0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 3L,
+3.1d, 3.14d, 3.141d, 3.1414d, 3.14149d, 3.141499d)
+  if (i <= 6) results(i).toLong else results(i)
+}
+
+def floatResultsCeil(i: Int): Any = {
+  val results = Seq(100.0f, 10.0f, 1.0f, 1000.0f, 100.0f, 
10.0f,
+4L, 3.2d, 3.15d, 3.142d, 3.1415d, 3.1415d, 3.1415d)
+  if (i <= 6) results(i).toLong else results(i)
+}
+
+def shortResultsFloor(i: Int): Any = {
+  val results = Seq[Long](0L, 0L, 3L, 31000L,
+31400L, 31410L, 31415L) ++ Seq.fill[Long](7)(31415)
+  results(i)
+}
+
+def shortResultsCeil(i: Int): Any = {

Review comment:
   Sorry, i am not sure i got it right. 
   
   Actually we test `RoundCeil` and `RoundFloor` directly for positive `scale` 
on big decimal, i can add long and double in that case, if you prefer. But we 
can't test `RoundCeil` and `RoundFloor` directly for negative `scale` because 
the casting to `Long` for negative scale happens one step before. 
   
   May be if put the casting in the `RoundCeil` and `RoundFloor` objects and 
let the `ExpressionBuilder` call these objects like it was before, we can able 
to test `RoundCeil` and `RoundFloor` directly for both negative and positive 
`scale`. What do you say ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cdegroc opened a new pull request #35140: [SPARK-37829][SQL] DataFrame.joinWith should return null rows for missing values

2022-01-07 Thread GitBox


cdegroc opened a new pull request #35140:
URL: https://github.com/apache/spark/pull/35140


   ### What changes were proposed in this pull request?
   Add a unit test demonstrating the regression on `DataFrame.joinWith`.
   Revert  [commit 
cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59)
 making the test pass.
   
   ### Why are the changes needed?
   Doing an outer-join using joinWith on DataFrames used to return missing 
values as null in Spark 2.4.8, but returns them as Rows with null values in 
Spark 3.0.0+.
   The regression has been introduced in [commit 
cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59).
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   A unit test was added.
   
   Ran unit tests for the `sql-core` and `sql-catalyst` submodules with 
`./build/mvn clean package -pl sql/core,cql/catalyst`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #35113: [SPARK-37636][SQL] Migrate CREATE NAMESPACE to use V2 command by default

2022-01-07 Thread GitBox


imback82 commented on a change in pull request #35113:
URL: https://github.com/apache/spark/pull/35113#discussion_r780475265



##
File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
##
@@ -189,8 +189,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 
   override def createDatabase(
   dbDefinition: CatalogDatabase,
-  ignoreIfExists: Boolean): Unit = withClient {
-client.createDatabase(dbDefinition, ignoreIfExists)
+  ignoreIfExists: Boolean): Unit = {
+try {
+  withClient {
+client.createDatabase(dbDefinition, ignoreIfExists)
+  }
+} catch {
+  case e: AnalysisException if e.message.contains("already exists") =>
+throw new DatabaseAlreadyExistsException(dbDefinition.name)

Review comment:
   Good idea! Let me try this approach.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cdegroc opened a new pull request #35139: [SPARK-37829][SQL] DataFrame.joinWith should return null rows for missing values

2022-01-07 Thread GitBox


cdegroc opened a new pull request #35139:
URL: https://github.com/apache/spark/pull/35139


   ### What changes were proposed in this pull request?
   Add a unit test demonstrating the regression on `DataFrame.joinWith`.
   Update `ExpressionEncoder` to make the test pass (equivalent to reverting 
the 
[commit](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59)
 that introduced the issue).
   
   ### Why are the changes needed?
   Doing an outer-join using joinWith on DataFrames used to return missing 
values as null in Spark 2.4.8, but returns them as Rows with null values in 
Spark 3.0.0+.
   The regression has been introduced in [this 
commit](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59).
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   A unit test was added.
   
   Ran unit tests for the `sql-core` and `sql-catalyst` submodules with 
`./build/mvn clean package -pl sql/core,cql/catalyst`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #35132: [SPARK-37841][SQL] BasicWriteTaskStatsTracker should not try get status for a skipped file

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #35132:
URL: https://github.com/apache/spark/pull/35132#discussion_r780459615



##
File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
##
@@ -305,6 +305,12 @@ private[orc] class OrcOutputWriter(
   recordWriter.close(Reporter.NULL)
 }
   }
+
+  /**
+   * If `recordWriterInstantiated` is false, the output file is not pretouched.

Review comment:
   Alternatively, can we fix the Hive ORC data source to always write the 
file? This seems wrong to me. We should at least write one file even if the 
input query is empty, to record the output schema. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #35132: [SPARK-37841][SQL] BasicWriteTaskStatsTracker should not try get status for a skipped file

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #35132:
URL: https://github.com/apache/spark/pull/35132#discussion_r780458052



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
##
@@ -142,8 +142,13 @@ class BasicWriteTaskStatsTracker(
 numSubmittedFiles += 1
   }
 
-  override def closeFile(filePath: String): Unit = {
-updateFileStats(filePath)
+  override def closeFile(filePath: String, isPathCreated: Boolean): Unit = {
+if (isPathCreated) {
+  updateFileStats(filePath)
+} else {
+  logDebug(s"$filePath is not pre-touched by writer, skipping update file 
stats")

Review comment:
   ```suggestion
 logDebug(s"$filePath is not created due to no data, skip updating file 
stats")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #35135: [SPARK-35442][SQL] Support propagate empty relation through aggregate

2022-01-07 Thread GitBox


cloud-fan commented on pull request #35135:
URL: https://github.com/apache/spark/pull/35135#issuecomment-1007644925


   can you also address 
https://github.com/apache/spark/pull/32602#discussion_r780377367 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #35113: [SPARK-37636][SQL] Migrate CREATE NAMESPACE to use V2 command by default

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #35113:
URL: https://github.com/apache/spark/pull/35113#discussion_r780453457



##
File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
##
@@ -189,8 +189,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 
   override def createDatabase(
   dbDefinition: CatalogDatabase,
-  ignoreIfExists: Boolean): Unit = withClient {
-client.createDatabase(dbDefinition, ignoreIfExists)
+  ignoreIfExists: Boolean): Unit = {
+try {
+  withClient {
+client.createDatabase(dbDefinition, ignoreIfExists)
+  }
+} catch {
+  case e: AnalysisException if e.message.contains("already exists") =>
+throw new DatabaseAlreadyExistsException(dbDefinition.name)

Review comment:
   You made a good point. But it's still a bit hacky that we wrap the 
exception multiple times. How about
   ```
   def withClient(convertException: Throwable => AnalysisException, body: => T) 
...
   def withClient(body: => T) = withClient(e => throw new AnalysisException..., 
body)
   ```
   
   Then we can call the new overload of `withClient` in `createDatabase` and 
translate the hive exception directly. We can probably apply it in more places 
like `createTable` when we need to unify the error message further.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tgravescs edited a comment on pull request #34672: [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured

2022-01-07 Thread GitBox


tgravescs edited a comment on pull request #34672:
URL: https://github.com/apache/spark/pull/34672#issuecomment-1007641251


   yes my concern is that its a config for something that isn't public. it 
doesn't make sense to me to have a public config for a non-public api and by 
itself without this 3rd party lib the config would not apply.
   
   I don't think my question was answered (can't you just open a port on one of 
your remote shuffle services and ignore the messages) and it sounds like it 
does work now with some hack (is that hack just setup and configs)?   If so 
then I would much rather prefer a real solution like was already mentioned 
above because you can make it work now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tgravescs commented on pull request #34672: [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured

2022-01-07 Thread GitBox


tgravescs commented on pull request #34672:
URL: https://github.com/apache/spark/pull/34672#issuecomment-1007641251


   yes my concern is that its a config for something that isn't public. it 
doesn't make sense to me to have a public config for a non-public api and by 
itself without this 3rd party lib the config would not apply.
   
   I don't think my question was answered and it sounds like it does work now 
with some hack (is that hack just setup and configs)?   If so then I would much 
rather prefer a real solution like was already mentioned above because you can 
make it work now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-07 Thread GitBox


venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r780452227



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
   executorFailureEpoch -= execId
 }
 shuffleFileLostEpoch -= execId
+
+if (pushBasedShuffleEnabled) {

Review comment:
   Agree. Let me see how we can send an event when a new blockManager's 
host which is not already part of existing shuffleMergers and correspondingly 
fire an event to DAGScheduler




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-07 Thread GitBox


venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r780451587



##
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##
@@ -39,7 +39,9 @@ class StageInfo(
 val taskMetrics: TaskMetrics = null,
 private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = 
Seq.empty,
 private[spark] val shuffleDepId: Option[Int] = None,
-val resourceProfileId: Int) {
+val resourceProfileId: Int,
+private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+private[spark] var shuffleMergerCount: Int = 0) {

Review comment:
   Yes, currently there are no consumers with in Spark. But 
[[SPARK-36620][SHUFFLE] Add client side push based shuffle 
metrics](https://github.com/apache/spark/pull/34000) has to use these metrics 
and populate it which is currently ongoing. Do you think we can move this 
change as well to that PR or wait for this one to get merged and then 
correspondingly make the changes in the other PR to consume these metrics? cc 
@thejdeep 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys

2022-01-07 Thread GitBox


cloud-fan commented on pull request #35138:
URL: https://github.com/apache/spark/pull/35138#issuecomment-1007635990


   cc @sunchao @c21


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #35138:
URL: https://github.com/apache/spark/pull/35138#discussion_r780448272



##
File path: 
sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q17.sf100/simplified.txt
##
@@ -1,90 +1,97 @@
 TakeOrderedAndProject 
[i_item_id,i_item_desc,s_state,store_sales_quantitycount,store_sales_quantityave,store_sales_quantitystdev,store_sales_quantitycov,as_store_returns_quantitycount,as_store_returns_quantityave,as_store_returns_quantitystdev,store_returns_quantitycov,catalog_sales_quantitycount,catalog_sales_quantityave,catalog_sales_quantitystdev,catalog_sales_quantitycov]

Review comment:
   I checked it locally. Now the plan golden files are exactly the same 
with before #32875




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #35138:
URL: https://github.com/apache/spark/pull/35138#discussion_r780448272



##
File path: 
sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q17.sf100/simplified.txt
##
@@ -1,90 +1,97 @@
 TakeOrderedAndProject 
[i_item_id,i_item_desc,s_state,store_sales_quantitycount,store_sales_quantityave,store_sales_quantitystdev,store_sales_quantitycov,as_store_returns_quantitycount,as_store_returns_quantityave,as_store_returns_quantitystdev,store_returns_quantitycov,catalog_sales_quantitycount,catalog_sales_quantityave,catalog_sales_quantitystdev,catalog_sales_quantitycov]

Review comment:
   I checked it locally. Now the plan golden files are exactly the same 
with the ones before #32875




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan opened a new pull request #35138: [SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys

2022-01-07 Thread GitBox


cloud-fan opened a new pull request #35138:
URL: https://github.com/apache/spark/pull/35138


   
   
   ### What changes were proposed in this pull request?
   
   This is a followup of https://github.com/apache/spark/pull/32875 . Basically 
https://github.com/apache/spark/pull/32875 did two improvements:
   1. allow bucket join even if the bucket hash function is different from 
Spark's shuffle hash function
   2. allow bucket join even if the hash partition keys are subset of join keys.
   
   The first improvement is the major target for implementing the SPIP "storage 
partition join". The second improvement is kind of a consequence of the 
framework refactor, which is not planned.
   
   This PR is to disable the second improvement by default, which may introduce 
perf regression if there are data skew without shuffle. We need more designs to 
enable this improvement, like checking the ndv.
   
   ### Why are the changes needed?
   
   Avoid perf regression
   
   ### Does this PR introduce _any_ user-facing change?
   
   no
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zero323 commented on pull request #34439: [SPARK-37095][PYTHON] Inline type hints for files in python/pyspark/broadcast.py

2022-01-07 Thread GitBox


zero323 commented on pull request #34439:
URL: https://github.com/apache/spark/pull/34439#issuecomment-1007619850


   Could you please resolve the conflicts @dchvn?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hiboyang commented on pull request #34672: [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured

2022-01-07 Thread GitBox


hiboyang commented on pull request #34672:
URL: https://github.com/apache/spark/pull/34672#issuecomment-1007611924


   @tgravescs  I am the author of Uber RSS. In terms of your question "I'm not 
familiar with the details of Uber RSS, can it just fake up a remote shuffle 
port and ignore what is sent? have you requested their version to support 
dynamic allocation?"... Uber RSS could support dynamic allocation on Kubernetes 
by some work around, e.g enabling shuffle tracking feature and set shuffle 
timeout to be zero. But it is a hack. The code change here will make that hack 
unnecessary.
   
   In terms of adding a config like "spark.shuffle.registration.enabled", any 
concern?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #35113: [SPARK-37636][SQL] Migrate CREATE NAMESPACE to use V2 command by default

2022-01-07 Thread GitBox


imback82 commented on a change in pull request #35113:
URL: https://github.com/apache/spark/pull/35113#discussion_r780418849



##
File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
##
@@ -189,8 +189,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 
   override def createDatabase(
   dbDefinition: CatalogDatabase,
-  ignoreIfExists: Boolean): Unit = withClient {
-client.createDatabase(dbDefinition, ignoreIfExists)
+  ignoreIfExists: Boolean): Unit = {
+try {
+  withClient {
+client.createDatabase(dbDefinition, ignoreIfExists)
+  }
+} catch {
+  case e: AnalysisException if e.message.contains("already exists") =>
+throw new DatabaseAlreadyExistsException(dbDefinition.name)

Review comment:
   Two concerns if we move the logic to `withClient`:
   1. How can we guarantee 
`org.apache.hadoop.hive.metastore.api.AlreadyExistsException` is thrown only by 
`createDatabase` but not for other calls like `createTable`?
   2. Since db name is not available, we need to parse out the db name from the 
message.
   
   Are you OK with these?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.

2022-01-07 Thread GitBox


gaborgsomogyi commented on a change in pull request #23348:
URL: https://github.com/apache/spark/pull/23348#discussion_r780426592



##
File path: core/src/main/scala/org/apache/spark/deploy/security/README.md
##
@@ -0,0 +1,249 @@
+# Delegation Token Handling In Spark
+
+This document aims to explain and demystify delegation tokens as they are used 
by Spark, since
+this topic is generally a huge source of confusion.
+
+
+## What are delegation tokens and why use them?
+
+Delegation tokens (DTs from now on) are authentication tokens used by some 
services to replace
+Kerberos service tokens. Many services in the Hadoop ecosystem have support 
for DTs, since they
+have some very desirable advantages over Kerberos tokens:
+
+* No need to distribute Kerberos credentials
+
+In a distributed application, distributing Kerberos credentials is tricky. Not 
all users have
+keytabs, and when they do, it's generally frowned upon to distribute them over 
the network as
+part of application data.
+
+DTs allow for a single place (e.g. the Spark driver) to require Kerberos 
credentials. That entity
+can then distribute the DTs to other parts of the distributed application 
(e.g. Spark executors),
+so they can authenticate to services.
+
+* A single token per service is used for authentication
+
+If Kerberos authentication were used, each client connection to a server would 
require a trip
+to the KDC and generation of a service ticket. In a distributed system, the 
number of service
+tickets can balloon pretty quickly when you think about the number of client 
processes (e.g. Spark
+executors) vs. the number of service processes (e.g. HDFS DataNodes). That 
generates unnecessary
+extra load on the KDC, and may even run into usage limits set up by the KDC 
admin.
+
+* DTs are only used for authentication
+
+DTs, unlike TGTs, can only be used to authenticate to the specific service for 
which they were
+issued. You cannot use an existing DT to create new DTs or to create DTs for a 
different service.
+
+So in short, DTs are *not* Kerberos tokens. They are used by many services to 
replace Kerberos
+authentication, or even other forms of authentication, although there is 
nothing (aside from
+maybe implementation details) that ties them to Kerberos or any other 
authentication mechanism.
+
+
+## Lifecycle of DTs
+
+DTs, unlike Kerberos tokens, are service-specific. There is no centralized 
location you contact
+to create a DT for a service. So, the first step needed to get a DT is being 
able to authenticate
+to the service in question. In the Hadoop ecosystem, that is generally done 
using Kerberos.
+
+This requires Kerberos credentials to be available somewhere for the 
application to use. The user
+is generally responsible for providing those credentials, which is most 
commonly done by logging
+in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" 
containing a TGT
+(ticket granting ticket), which can then be used to request service tickets.
+
+There are other ways of obtaining TGTs, but, ultimately, you need a TGT to 
bootstrap the process.
+
+Once a TGT is available, the target service's client library can then be used 
to authenticate
+to the service using the Kerberos credentials, and request the creation of a 
delegation token.
+This token can now be sent to other processes and used to authenticate to 
different daemons
+belonging to that service.
+
+And thus the first drawback of DTs becomes apparent: you need service-specific 
logic to create and
+use them.
+
+Spark implements a (somewhat) pluggable, internal DT creation API. Support for 
new services can be
+added by implementing a `HadoopDelegationTokenProvider` that is then called by 
Spark when generating
+delegation tokens for an application. Spark makes the DTs available to code by 
stashing them in the
+`UserGroupInformation` credentials, and it's up to the DT provider and the 
respective client library
+to agree on how to use those tokens.
+
+Once they are created, the semantics of how DTs operate are also 
service-specific. But, in general,
+they try to follow the semantics of Kerberos tokens:
+
+* A "renewable period (equivalent to TGT's "lifetime") which is for how long 
the DT is valid
+  before it requires renewal.
+* A "max lifetime" (equivalent to TGT's "renewable life") which is for how 
long the DT can be
+  renewed.
+
+Once the token reaches its "max lifetime", a new one needs to be created by 
contacting the
+appropriate service, restarting the above process.
+
+
+## DT Renewal, Renewers, and YARN
+
+This is the most confusing part of DT handling, and part of it is because much 
of the system was
+designed with MapReduce, and later YARN, in mind.
+
+As seen above, DTs need to be renewed periodically until they finally expire 
for good. An example of
+this is the default configuration of HDFS services: delegation tokens are 
valid for up to 7 days,
+and need to be renewed every 24 hours. If 24 hours pass

[GitHub] [spark] imback82 commented on a change in pull request #35113: [SPARK-37636][SQL] Migrate CREATE NAMESPACE to use V2 command by default

2022-01-07 Thread GitBox


imback82 commented on a change in pull request #35113:
URL: https://github.com/apache/spark/pull/35113#discussion_r780418849



##
File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
##
@@ -189,8 +189,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 
   override def createDatabase(
   dbDefinition: CatalogDatabase,
-  ignoreIfExists: Boolean): Unit = withClient {
-client.createDatabase(dbDefinition, ignoreIfExists)
+  ignoreIfExists: Boolean): Unit = {
+try {
+  withClient {
+client.createDatabase(dbDefinition, ignoreIfExists)
+  }
+} catch {
+  case e: AnalysisException if e.message.contains("already exists") =>
+throw new DatabaseAlreadyExistsException(dbDefinition.name)

Review comment:
   Two concerns if we move the logic to `withClient`:
   1. How can we guarantee 
`org.apache.hadoop.hive.metastore.api.AlreadyExistsException` is thrown by 
`createDatabase` but not for other calls like `createTable`?
   2. Since db name is not available, we need to parse out the db name from the 
message.
   
   Are you OK with these?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on pull request #34951: [SPARK-37686][PYTHON][SQL] Use _invoke_function helpers for all pyspark.sql.functions

2022-01-07 Thread GitBox


viirya commented on pull request #34951:
URL: https://github.com/apache/spark/pull/34951#issuecomment-1007579476


   Looks okay.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #34062: [SPARK-36819][SQL] Don't insert redundant filters in case static partition pruning can be done

2022-01-07 Thread GitBox


maryannxue commented on a change in pull request #34062:
URL: https://github.com/apache/spark/pull/34062#discussion_r780403353



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
##
@@ -216,11 +216,53 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
   }
 
   /**
-   * Search a filtering predicate in a given logical plan
+   * Returns whether an expression is likely to be selective in dynamic 
partition filtering.
+   * 1. the predicate is selective.
+   * 2. the filtering predicate must not be subset of join key. In case it is, 
then partition
+   * filter can be inferred statically in optimization phase, hence return 
false.
*/
-  private def hasSelectivePredicate(plan: LogicalPlan): Boolean = {
+  private def isLikelySelectiveWithInferFiltersEnabled(
+  e: Expression,
+  joinKey: Expression): (Boolean, Boolean) = e match {
+case Not(expr) => isLikelySelectiveWithInferFiltersEnabled(expr, joinKey)
+case And(l, r) =>
+  val (isSelectiveLeft, notSubsetOfJoinKeyLeft) =
+isLikelySelectiveWithInferFiltersEnabled(l, joinKey)
+  val (isSelectiveRight, notSubsetOfJoinKeyRight) =
+isLikelySelectiveWithInferFiltersEnabled(r, joinKey)
+  (isSelectiveLeft || isSelectiveRight, notSubsetOfJoinKeyLeft || 
notSubsetOfJoinKeyRight)
+case Or(l, r) =>
+  val (isSelectiveLeft, notSubsetOfJoinKeyLeft) =
+isLikelySelectiveWithInferFiltersEnabled(l, joinKey)
+  val (isSelectiveRight, notSubsetOfJoinKeyRight) =
+isLikelySelectiveWithInferFiltersEnabled(r, joinKey)
+  (isSelectiveLeft && isSelectiveRight && (notSubsetOfJoinKeyLeft || 
notSubsetOfJoinKeyRight),
+notSubsetOfJoinKeyLeft || notSubsetOfJoinKeyRight)
+case expr: StringRegexExpression => (true, !isSubsetOfJoinKey(expr, 
joinKey))
+case expr: BinaryComparison => (true, !isSubsetOfJoinKey(expr, joinKey))
+case expr: In => (true, !isSubsetOfJoinKey(expr, joinKey))
+case expr: InSet => (true, !isSubsetOfJoinKey(expr, joinKey))
+case expr: StringPredicate => (true, !isSubsetOfJoinKey(expr, joinKey))
+case expr: MultiLikeBase => (true, !isSubsetOfJoinKey(expr, joinKey))
+case _ => (false, false)
+  }
+
+  private def isSubsetOfJoinKey(e: Expression, joinKey: Expression): Boolean =
+  e.references.subsetOf(joinKey.references)
+
+  /**
+   * Search a filtering predicate in a given logical plan.
+   */
+  private def hasSelectivePredicate(plan: LogicalPlan, joinKey: Expression): 
Boolean = {

Review comment:
   Do we need to make this so complicated? This is one of the last rules 
applied and by this time all static filters have been implied and pushed down? 
Can't we simply test if there's any static equality-condition filters on the 
same partition column that the dynamic filter would apply on?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #35127: [SPARK-37837][INFRA] Enable black formatter in dev Python scripts

2022-01-07 Thread GitBox


dongjoon-hyun commented on pull request #35127:
URL: https://github.com/apache/spark/pull/35127#issuecomment-1007572412


   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #35127: [SPARK-37837][INFRA] Enable black formatter in dev Python scripts

2022-01-07 Thread GitBox


dongjoon-hyun closed pull request #35127:
URL: https://github.com/apache/spark/pull/35127


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Peng-Lei commented on a change in pull request #35131: [SPARK-37827][SQL] Put the some built-in table properties into V1Table.propertie to adapt to V2 command

2022-01-07 Thread GitBox


Peng-Lei commented on a change in pull request #35131:
URL: https://github.com/apache/spark/pull/35131#discussion_r780390548



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
##
@@ -55,7 +56,8 @@ private[sql] case class V1Table(v1Table: CatalogTable) 
extends Table {
 }
   }
 
-  override lazy val properties: util.Map[String, String] = 
v1Table.properties.asJava
+  override lazy val properties: util.Map[String, String] =
+V1Table.filterTableProperties(v1Table).asJava

Review comment:
   Thank you very much. I try do it. In fact, officially because store 
these v2 table properties into metastore when create table, I spent a lot of 
time looking for failed use cases and trying to fix. It is indeed a breaking 
change to generate v2 properties when creating tables and store them. Thank you 
for your reminder very much. @cloud-fan 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bersprockets commented on pull request #35120: [SPARK-37832][SQL] Orc struct converter should use an array to look up field converters rather than a linked list

2022-01-07 Thread GitBox


bersprockets commented on pull request #35120:
URL: https://github.com/apache/spark/pull/35120#issuecomment-1007551297


   Thanks @HyukjinKwon @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zero323 commented on a change in pull request #35136: [WIP][SPARK-37418][PYTHON][ML] Inline annotations for pyspark.ml.param.__init__.py

2022-01-07 Thread GitBox


zero323 commented on a change in pull request #35136:
URL: https://github.com/apache/spark/pull/35136#discussion_r780382158



##
File path: python/pyspark/ml/param/__init__.py
##
@@ -76,33 +99,33 @@ class TypeConverters:
 """
 
 @staticmethod
-def _is_numeric(value):
+def _is_numeric(value: Any) -> bool:
 vtype = type(value)
 return vtype in [int, float, np.float64, np.int64] or vtype.__name__ 
== "long"
 
 @staticmethod
-def _is_integer(value):
+def _is_integer(value: Any) -> bool:
 return TypeConverters._is_numeric(value) and float(value).is_integer()
 
 @staticmethod
-def _can_convert_to_list(value):
+def _can_convert_to_list(value: Any) -> bool:
 vtype = type(value)
 return vtype in [list, np.ndarray, tuple, range, array.array] or 
isinstance(value, Vector)
 
 @staticmethod
-def _can_convert_to_string(value):
+def _can_convert_to_string(value: Any) -> bool:
 vtype = type(value)
 return isinstance(value, str) or vtype in [np.unicode_, np.string_, 
np.str_]
 
 @staticmethod
-def identity(value):
+def identity(value: "T") -> "T":
 """
 Dummy converter that just returns value.
 """
 return value
 
 @staticmethod
-def toList(value):
+def toList(value: Any) -> List:

Review comment:
   This and other `to*` methods could be more precise with specific 
overloads, i.e.
   
   ```python
   @staticmethod
   @overload
   def toList(value: List[T]) -> List[T]:
   ...
   
   @staticmethod
   @overload
   def toList(value: range) -> List[int]:
   ...
   
   @staticmethod
   @overload
   def toList(value: Vector) -> List[float]:
   ...
   
   ```
   
   and so on, but it is probably not worth the effort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wzhfy opened a new pull request #35137: [TEST] get sleep interval in task

2022-01-07 Thread GitBox


wzhfy opened a new pull request #35137:
URL: https://github.com/apache/spark/pull/35137


   
   
   ### What changes were proposed in this pull request?
   
   Currently in the tests, `TaskContext.getPartitionId()` is called at driver 
side, so the sleep interval is always 100ms, which is not expected and makes 
some tests flaky.
   
   ### Why are the changes needed?
   
   Correct the test logic and make tests stable.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   By existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tgravescs commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.

2022-01-07 Thread GitBox


tgravescs commented on a change in pull request #23348:
URL: https://github.com/apache/spark/pull/23348#discussion_r780379362



##
File path: core/src/main/scala/org/apache/spark/deploy/security/README.md
##
@@ -0,0 +1,249 @@
+# Delegation Token Handling In Spark
+
+This document aims to explain and demystify delegation tokens as they are used 
by Spark, since
+this topic is generally a huge source of confusion.
+
+
+## What are delegation tokens and why use them?
+
+Delegation tokens (DTs from now on) are authentication tokens used by some 
services to replace
+Kerberos service tokens. Many services in the Hadoop ecosystem have support 
for DTs, since they
+have some very desirable advantages over Kerberos tokens:
+
+* No need to distribute Kerberos credentials
+
+In a distributed application, distributing Kerberos credentials is tricky. Not 
all users have
+keytabs, and when they do, it's generally frowned upon to distribute them over 
the network as
+part of application data.
+
+DTs allow for a single place (e.g. the Spark driver) to require Kerberos 
credentials. That entity
+can then distribute the DTs to other parts of the distributed application 
(e.g. Spark executors),
+so they can authenticate to services.
+
+* A single token per service is used for authentication
+
+If Kerberos authentication were used, each client connection to a server would 
require a trip
+to the KDC and generation of a service ticket. In a distributed system, the 
number of service
+tickets can balloon pretty quickly when you think about the number of client 
processes (e.g. Spark
+executors) vs. the number of service processes (e.g. HDFS DataNodes). That 
generates unnecessary
+extra load on the KDC, and may even run into usage limits set up by the KDC 
admin.
+
+* DTs are only used for authentication
+
+DTs, unlike TGTs, can only be used to authenticate to the specific service for 
which they were
+issued. You cannot use an existing DT to create new DTs or to create DTs for a 
different service.
+
+So in short, DTs are *not* Kerberos tokens. They are used by many services to 
replace Kerberos
+authentication, or even other forms of authentication, although there is 
nothing (aside from
+maybe implementation details) that ties them to Kerberos or any other 
authentication mechanism.
+
+
+## Lifecycle of DTs
+
+DTs, unlike Kerberos tokens, are service-specific. There is no centralized 
location you contact
+to create a DT for a service. So, the first step needed to get a DT is being 
able to authenticate
+to the service in question. In the Hadoop ecosystem, that is generally done 
using Kerberos.
+
+This requires Kerberos credentials to be available somewhere for the 
application to use. The user
+is generally responsible for providing those credentials, which is most 
commonly done by logging
+in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" 
containing a TGT
+(ticket granting ticket), which can then be used to request service tickets.
+
+There are other ways of obtaining TGTs, but, ultimately, you need a TGT to 
bootstrap the process.
+
+Once a TGT is available, the target service's client library can then be used 
to authenticate
+to the service using the Kerberos credentials, and request the creation of a 
delegation token.
+This token can now be sent to other processes and used to authenticate to 
different daemons
+belonging to that service.
+
+And thus the first drawback of DTs becomes apparent: you need service-specific 
logic to create and
+use them.
+
+Spark implements a (somewhat) pluggable, internal DT creation API. Support for 
new services can be
+added by implementing a `HadoopDelegationTokenProvider` that is then called by 
Spark when generating
+delegation tokens for an application. Spark makes the DTs available to code by 
stashing them in the
+`UserGroupInformation` credentials, and it's up to the DT provider and the 
respective client library
+to agree on how to use those tokens.
+
+Once they are created, the semantics of how DTs operate are also 
service-specific. But, in general,
+they try to follow the semantics of Kerberos tokens:
+
+* A "renewable period (equivalent to TGT's "lifetime") which is for how long 
the DT is valid
+  before it requires renewal.
+* A "max lifetime" (equivalent to TGT's "renewable life") which is for how 
long the DT can be
+  renewed.
+
+Once the token reaches its "max lifetime", a new one needs to be created by 
contacting the
+appropriate service, restarting the above process.
+
+
+## DT Renewal, Renewers, and YARN
+
+This is the most confusing part of DT handling, and part of it is because much 
of the system was
+designed with MapReduce, and later YARN, in mind.
+
+As seen above, DTs need to be renewed periodically until they finally expire 
for good. An example of
+this is the default configuration of HDFS services: delegation tokens are 
valid for up to 7 days,
+and need to be renewed every 24 hours. If 24 hours pass wit

[GitHub] [spark] tgravescs commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.

2022-01-07 Thread GitBox


tgravescs commented on a change in pull request #23348:
URL: https://github.com/apache/spark/pull/23348#discussion_r780378105



##
File path: core/src/main/scala/org/apache/spark/deploy/security/README.md
##
@@ -0,0 +1,249 @@
+# Delegation Token Handling In Spark
+
+This document aims to explain and demystify delegation tokens as they are used 
by Spark, since
+this topic is generally a huge source of confusion.
+
+
+## What are delegation tokens and why use them?
+
+Delegation tokens (DTs from now on) are authentication tokens used by some 
services to replace
+Kerberos service tokens. Many services in the Hadoop ecosystem have support 
for DTs, since they
+have some very desirable advantages over Kerberos tokens:
+
+* No need to distribute Kerberos credentials
+
+In a distributed application, distributing Kerberos credentials is tricky. Not 
all users have
+keytabs, and when they do, it's generally frowned upon to distribute them over 
the network as
+part of application data.
+
+DTs allow for a single place (e.g. the Spark driver) to require Kerberos 
credentials. That entity
+can then distribute the DTs to other parts of the distributed application 
(e.g. Spark executors),
+so they can authenticate to services.
+
+* A single token per service is used for authentication
+
+If Kerberos authentication were used, each client connection to a server would 
require a trip
+to the KDC and generation of a service ticket. In a distributed system, the 
number of service
+tickets can balloon pretty quickly when you think about the number of client 
processes (e.g. Spark
+executors) vs. the number of service processes (e.g. HDFS DataNodes). That 
generates unnecessary
+extra load on the KDC, and may even run into usage limits set up by the KDC 
admin.
+
+* DTs are only used for authentication
+
+DTs, unlike TGTs, can only be used to authenticate to the specific service for 
which they were
+issued. You cannot use an existing DT to create new DTs or to create DTs for a 
different service.
+
+So in short, DTs are *not* Kerberos tokens. They are used by many services to 
replace Kerberos
+authentication, or even other forms of authentication, although there is 
nothing (aside from
+maybe implementation details) that ties them to Kerberos or any other 
authentication mechanism.
+
+
+## Lifecycle of DTs
+
+DTs, unlike Kerberos tokens, are service-specific. There is no centralized 
location you contact
+to create a DT for a service. So, the first step needed to get a DT is being 
able to authenticate
+to the service in question. In the Hadoop ecosystem, that is generally done 
using Kerberos.
+
+This requires Kerberos credentials to be available somewhere for the 
application to use. The user
+is generally responsible for providing those credentials, which is most 
commonly done by logging
+in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" 
containing a TGT
+(ticket granting ticket), which can then be used to request service tickets.
+
+There are other ways of obtaining TGTs, but, ultimately, you need a TGT to 
bootstrap the process.
+
+Once a TGT is available, the target service's client library can then be used 
to authenticate
+to the service using the Kerberos credentials, and request the creation of a 
delegation token.
+This token can now be sent to other processes and used to authenticate to 
different daemons
+belonging to that service.
+
+And thus the first drawback of DTs becomes apparent: you need service-specific 
logic to create and
+use them.
+
+Spark implements a (somewhat) pluggable, internal DT creation API. Support for 
new services can be
+added by implementing a `HadoopDelegationTokenProvider` that is then called by 
Spark when generating
+delegation tokens for an application. Spark makes the DTs available to code by 
stashing them in the
+`UserGroupInformation` credentials, and it's up to the DT provider and the 
respective client library
+to agree on how to use those tokens.
+
+Once they are created, the semantics of how DTs operate are also 
service-specific. But, in general,
+they try to follow the semantics of Kerberos tokens:
+
+* A "renewable period (equivalent to TGT's "lifetime") which is for how long 
the DT is valid
+  before it requires renewal.
+* A "max lifetime" (equivalent to TGT's "renewable life") which is for how 
long the DT can be
+  renewed.
+
+Once the token reaches its "max lifetime", a new one needs to be created by 
contacting the
+appropriate service, restarting the above process.
+
+
+## DT Renewal, Renewers, and YARN
+
+This is the most confusing part of DT handling, and part of it is because much 
of the system was
+designed with MapReduce, and later YARN, in mind.
+
+As seen above, DTs need to be renewed periodically until they finally expire 
for good. An example of
+this is the default configuration of HDFS services: delegation tokens are 
valid for up to 7 days,
+and need to be renewed every 24 hours. If 24 hours pass wit

[GitHub] [spark] tgravescs commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.

2022-01-07 Thread GitBox


tgravescs commented on a change in pull request #23348:
URL: https://github.com/apache/spark/pull/23348#discussion_r780378105



##
File path: core/src/main/scala/org/apache/spark/deploy/security/README.md
##
@@ -0,0 +1,249 @@
+# Delegation Token Handling In Spark
+
+This document aims to explain and demystify delegation tokens as they are used 
by Spark, since
+this topic is generally a huge source of confusion.
+
+
+## What are delegation tokens and why use them?
+
+Delegation tokens (DTs from now on) are authentication tokens used by some 
services to replace
+Kerberos service tokens. Many services in the Hadoop ecosystem have support 
for DTs, since they
+have some very desirable advantages over Kerberos tokens:
+
+* No need to distribute Kerberos credentials
+
+In a distributed application, distributing Kerberos credentials is tricky. Not 
all users have
+keytabs, and when they do, it's generally frowned upon to distribute them over 
the network as
+part of application data.
+
+DTs allow for a single place (e.g. the Spark driver) to require Kerberos 
credentials. That entity
+can then distribute the DTs to other parts of the distributed application 
(e.g. Spark executors),
+so they can authenticate to services.
+
+* A single token per service is used for authentication
+
+If Kerberos authentication were used, each client connection to a server would 
require a trip
+to the KDC and generation of a service ticket. In a distributed system, the 
number of service
+tickets can balloon pretty quickly when you think about the number of client 
processes (e.g. Spark
+executors) vs. the number of service processes (e.g. HDFS DataNodes). That 
generates unnecessary
+extra load on the KDC, and may even run into usage limits set up by the KDC 
admin.
+
+* DTs are only used for authentication
+
+DTs, unlike TGTs, can only be used to authenticate to the specific service for 
which they were
+issued. You cannot use an existing DT to create new DTs or to create DTs for a 
different service.
+
+So in short, DTs are *not* Kerberos tokens. They are used by many services to 
replace Kerberos
+authentication, or even other forms of authentication, although there is 
nothing (aside from
+maybe implementation details) that ties them to Kerberos or any other 
authentication mechanism.
+
+
+## Lifecycle of DTs
+
+DTs, unlike Kerberos tokens, are service-specific. There is no centralized 
location you contact
+to create a DT for a service. So, the first step needed to get a DT is being 
able to authenticate
+to the service in question. In the Hadoop ecosystem, that is generally done 
using Kerberos.
+
+This requires Kerberos credentials to be available somewhere for the 
application to use. The user
+is generally responsible for providing those credentials, which is most 
commonly done by logging
+in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" 
containing a TGT
+(ticket granting ticket), which can then be used to request service tickets.
+
+There are other ways of obtaining TGTs, but, ultimately, you need a TGT to 
bootstrap the process.
+
+Once a TGT is available, the target service's client library can then be used 
to authenticate
+to the service using the Kerberos credentials, and request the creation of a 
delegation token.
+This token can now be sent to other processes and used to authenticate to 
different daemons
+belonging to that service.
+
+And thus the first drawback of DTs becomes apparent: you need service-specific 
logic to create and
+use them.
+
+Spark implements a (somewhat) pluggable, internal DT creation API. Support for 
new services can be
+added by implementing a `HadoopDelegationTokenProvider` that is then called by 
Spark when generating
+delegation tokens for an application. Spark makes the DTs available to code by 
stashing them in the
+`UserGroupInformation` credentials, and it's up to the DT provider and the 
respective client library
+to agree on how to use those tokens.
+
+Once they are created, the semantics of how DTs operate are also 
service-specific. But, in general,
+they try to follow the semantics of Kerberos tokens:
+
+* A "renewable period (equivalent to TGT's "lifetime") which is for how long 
the DT is valid
+  before it requires renewal.
+* A "max lifetime" (equivalent to TGT's "renewable life") which is for how 
long the DT can be
+  renewed.
+
+Once the token reaches its "max lifetime", a new one needs to be created by 
contacting the
+appropriate service, restarting the above process.
+
+
+## DT Renewal, Renewers, and YARN
+
+This is the most confusing part of DT handling, and part of it is because much 
of the system was
+designed with MapReduce, and later YARN, in mind.
+
+As seen above, DTs need to be renewed periodically until they finally expire 
for good. An example of
+this is the default configuration of HDFS services: delegation tokens are 
valid for up to 7 days,
+and need to be renewed every 24 hours. If 24 hours pass wit

[GitHub] [spark] cloud-fan commented on a change in pull request #32602: [SPARK-35455][SQL] Unify empty relation optimization between normal and AQE optimizer

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #32602:
URL: https://github.com/apache/spark/pull/32602#discussion_r780377367



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
##
@@ -137,3 +125,55 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] 
with PredicateHelper wit
 }
   }
 }
+
+/**
+ * This rule runs in the normal optimizer and optimizes more cases
+ * compared to [[PropagateEmptyRelationBase]]:
+ * 1. Higher-node Logical Plans
+ *- Union with all empty children.
+ * 2. Unary-node Logical Plans
+ *- Project/Filter/Sample with all empty children.
+ *
+ * The reason why we don't apply this rule at AQE optimizer side is: the 
benefit is not big enough
+ * and it may introduce extra exchanges.

Review comment:
   After more thought, I think this is a big performance issue if we can't 
propagate empty relations through project/filter which are quite common. The 
risk of introducing new shuffles is relatively small compared to this.
   
   @ulysses-you can we move all the logic to `PropagateEmptyRelationBase`? 
`PropagateEmptyRelation` should not have any extra logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tgravescs commented on a change in pull request #34326: [SPARK-37053][CORE] Add metrics to SparkHistoryServer

2022-01-07 Thread GitBox


tgravescs commented on a change in pull request #34326:
URL: https://github.com/apache/spark/pull/34326#discussion_r780369492



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
##
@@ -157,6 +161,15 @@ class HistoryServer(
 contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
 contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
 attachHandler(contextHandler)
+
+// Register history server source to history server metrics system.
+cacheMetrics.init()

Review comment:
   why are we calling init here?  Doesn't this end up getting called twice?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tgravescs commented on pull request #33934: [SPARK-36691][PYTHON] PythonRunner failed should pass error message to ApplicationMaster too

2022-01-07 Thread GitBox


tgravescs commented on pull request #33934:
URL: https://github.com/apache/spark/pull/33934#issuecomment-1007523767


   @holdenk who may have dealt with the python side before and have thoughts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.

2022-01-07 Thread GitBox


gaborgsomogyi commented on a change in pull request #23348:
URL: https://github.com/apache/spark/pull/23348#discussion_r780350756



##
File path: core/src/main/scala/org/apache/spark/deploy/security/README.md
##
@@ -0,0 +1,249 @@
+# Delegation Token Handling In Spark
+
+This document aims to explain and demystify delegation tokens as they are used 
by Spark, since
+this topic is generally a huge source of confusion.
+
+
+## What are delegation tokens and why use them?
+
+Delegation tokens (DTs from now on) are authentication tokens used by some 
services to replace
+Kerberos service tokens. Many services in the Hadoop ecosystem have support 
for DTs, since they
+have some very desirable advantages over Kerberos tokens:
+
+* No need to distribute Kerberos credentials
+
+In a distributed application, distributing Kerberos credentials is tricky. Not 
all users have
+keytabs, and when they do, it's generally frowned upon to distribute them over 
the network as
+part of application data.
+
+DTs allow for a single place (e.g. the Spark driver) to require Kerberos 
credentials. That entity
+can then distribute the DTs to other parts of the distributed application 
(e.g. Spark executors),
+so they can authenticate to services.
+
+* A single token per service is used for authentication
+
+If Kerberos authentication were used, each client connection to a server would 
require a trip
+to the KDC and generation of a service ticket. In a distributed system, the 
number of service
+tickets can balloon pretty quickly when you think about the number of client 
processes (e.g. Spark
+executors) vs. the number of service processes (e.g. HDFS DataNodes). That 
generates unnecessary
+extra load on the KDC, and may even run into usage limits set up by the KDC 
admin.
+
+* DTs are only used for authentication
+
+DTs, unlike TGTs, can only be used to authenticate to the specific service for 
which they were
+issued. You cannot use an existing DT to create new DTs or to create DTs for a 
different service.
+
+So in short, DTs are *not* Kerberos tokens. They are used by many services to 
replace Kerberos
+authentication, or even other forms of authentication, although there is 
nothing (aside from
+maybe implementation details) that ties them to Kerberos or any other 
authentication mechanism.
+
+
+## Lifecycle of DTs
+
+DTs, unlike Kerberos tokens, are service-specific. There is no centralized 
location you contact
+to create a DT for a service. So, the first step needed to get a DT is being 
able to authenticate
+to the service in question. In the Hadoop ecosystem, that is generally done 
using Kerberos.
+
+This requires Kerberos credentials to be available somewhere for the 
application to use. The user
+is generally responsible for providing those credentials, which is most 
commonly done by logging
+in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" 
containing a TGT
+(ticket granting ticket), which can then be used to request service tickets.
+
+There are other ways of obtaining TGTs, but, ultimately, you need a TGT to 
bootstrap the process.
+
+Once a TGT is available, the target service's client library can then be used 
to authenticate
+to the service using the Kerberos credentials, and request the creation of a 
delegation token.
+This token can now be sent to other processes and used to authenticate to 
different daemons
+belonging to that service.
+
+And thus the first drawback of DTs becomes apparent: you need service-specific 
logic to create and
+use them.
+
+Spark implements a (somewhat) pluggable, internal DT creation API. Support for 
new services can be
+added by implementing a `HadoopDelegationTokenProvider` that is then called by 
Spark when generating
+delegation tokens for an application. Spark makes the DTs available to code by 
stashing them in the
+`UserGroupInformation` credentials, and it's up to the DT provider and the 
respective client library
+to agree on how to use those tokens.
+
+Once they are created, the semantics of how DTs operate are also 
service-specific. But, in general,
+they try to follow the semantics of Kerberos tokens:
+
+* A "renewable period (equivalent to TGT's "lifetime") which is for how long 
the DT is valid
+  before it requires renewal.
+* A "max lifetime" (equivalent to TGT's "renewable life") which is for how 
long the DT can be
+  renewed.
+
+Once the token reaches its "max lifetime", a new one needs to be created by 
contacting the
+appropriate service, restarting the above process.
+
+
+## DT Renewal, Renewers, and YARN
+
+This is the most confusing part of DT handling, and part of it is because much 
of the system was
+designed with MapReduce, and later YARN, in mind.
+
+As seen above, DTs need to be renewed periodically until they finally expire 
for good. An example of
+this is the default configuration of HDFS services: delegation tokens are 
valid for up to 7 days,
+and need to be renewed every 24 hours. If 24 hours pass

[GitHub] [spark] zero323 commented on pull request #34674: [SPARK-37419][PYTHON][ML] Rewrite _shared_params_code_gen.py to inline type hints for ml/param/shared.py

2022-01-07 Thread GitBox


zero323 commented on pull request #34674:
URL: https://github.com/apache/spark/pull/34674#issuecomment-1007510055


   Merged into master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zero323 closed pull request #34674: [SPARK-37419][PYTHON][ML] Rewrite _shared_params_code_gen.py to inline type hints for ml/param/shared.py

2022-01-07 Thread GitBox


zero323 closed pull request #34674:
URL: https://github.com/apache/spark/pull/34674


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tgravescs commented on pull request #34982: [SPARK-37712][YARN] Spark request yarn cluster metrics slow cause delay

2022-01-07 Thread GitBox


tgravescs commented on pull request #34982:
URL: https://github.com/apache/spark/pull/34982#issuecomment-1007504670


   I guess I'm fine with this, I kind of assume this is because your yarn 
resource manager is overloaded or unresponsive though again?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tgravescs commented on pull request #34672: [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured

2022-01-07 Thread GitBox


tgravescs commented on pull request #34672:
URL: https://github.com/apache/spark/pull/34672#issuecomment-1007499597


   @attilapiros is there any active work on it now?  I get that is a lot more 
work but at the same time if we keep putting it off for temporary hacks it 
won't get done.
   
   if you look at it from just a Spark perspective, the API is not public, we 
would be adding an interface that is not used internally to Spark or a config 
that would not be used internally and is for a private api. Neither of those 
things are great API choices in my mind.  I do get that shuffle is overridden 
by people though and that full features take time, which is why I'm a bit torn 
on this.
   
   There is logic right above this check for external shuffle service as well 
that sets the shuffleServerID, what are you using for that port?  Its used in 
mapStatus and register, I guess it doesn't matter in this case for map status. 
I'm not familiar with the details of Uber RSS, can it just fake up a remote 
shuffle port and ignore what is sent?  have you requested their version to 
support dynamic allocation?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tgravescs commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.

2022-01-07 Thread GitBox


tgravescs commented on a change in pull request #23348:
URL: https://github.com/apache/spark/pull/23348#discussion_r780320642



##
File path: core/src/main/scala/org/apache/spark/deploy/security/README.md
##
@@ -0,0 +1,249 @@
+# Delegation Token Handling In Spark
+
+This document aims to explain and demystify delegation tokens as they are used 
by Spark, since
+this topic is generally a huge source of confusion.
+
+
+## What are delegation tokens and why use them?
+
+Delegation tokens (DTs from now on) are authentication tokens used by some 
services to replace
+Kerberos service tokens. Many services in the Hadoop ecosystem have support 
for DTs, since they
+have some very desirable advantages over Kerberos tokens:
+
+* No need to distribute Kerberos credentials
+
+In a distributed application, distributing Kerberos credentials is tricky. Not 
all users have
+keytabs, and when they do, it's generally frowned upon to distribute them over 
the network as
+part of application data.
+
+DTs allow for a single place (e.g. the Spark driver) to require Kerberos 
credentials. That entity
+can then distribute the DTs to other parts of the distributed application 
(e.g. Spark executors),
+so they can authenticate to services.
+
+* A single token per service is used for authentication
+
+If Kerberos authentication were used, each client connection to a server would 
require a trip
+to the KDC and generation of a service ticket. In a distributed system, the 
number of service
+tickets can balloon pretty quickly when you think about the number of client 
processes (e.g. Spark
+executors) vs. the number of service processes (e.g. HDFS DataNodes). That 
generates unnecessary
+extra load on the KDC, and may even run into usage limits set up by the KDC 
admin.
+
+* DTs are only used for authentication
+
+DTs, unlike TGTs, can only be used to authenticate to the specific service for 
which they were
+issued. You cannot use an existing DT to create new DTs or to create DTs for a 
different service.
+
+So in short, DTs are *not* Kerberos tokens. They are used by many services to 
replace Kerberos
+authentication, or even other forms of authentication, although there is 
nothing (aside from
+maybe implementation details) that ties them to Kerberos or any other 
authentication mechanism.
+
+
+## Lifecycle of DTs
+
+DTs, unlike Kerberos tokens, are service-specific. There is no centralized 
location you contact
+to create a DT for a service. So, the first step needed to get a DT is being 
able to authenticate
+to the service in question. In the Hadoop ecosystem, that is generally done 
using Kerberos.
+
+This requires Kerberos credentials to be available somewhere for the 
application to use. The user
+is generally responsible for providing those credentials, which is most 
commonly done by logging
+in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" 
containing a TGT
+(ticket granting ticket), which can then be used to request service tickets.
+
+There are other ways of obtaining TGTs, but, ultimately, you need a TGT to 
bootstrap the process.
+
+Once a TGT is available, the target service's client library can then be used 
to authenticate
+to the service using the Kerberos credentials, and request the creation of a 
delegation token.
+This token can now be sent to other processes and used to authenticate to 
different daemons
+belonging to that service.
+
+And thus the first drawback of DTs becomes apparent: you need service-specific 
logic to create and
+use them.
+
+Spark implements a (somewhat) pluggable, internal DT creation API. Support for 
new services can be
+added by implementing a `HadoopDelegationTokenProvider` that is then called by 
Spark when generating
+delegation tokens for an application. Spark makes the DTs available to code by 
stashing them in the
+`UserGroupInformation` credentials, and it's up to the DT provider and the 
respective client library
+to agree on how to use those tokens.
+
+Once they are created, the semantics of how DTs operate are also 
service-specific. But, in general,
+they try to follow the semantics of Kerberos tokens:
+
+* A "renewable period (equivalent to TGT's "lifetime") which is for how long 
the DT is valid
+  before it requires renewal.
+* A "max lifetime" (equivalent to TGT's "renewable life") which is for how 
long the DT can be
+  renewed.
+
+Once the token reaches its "max lifetime", a new one needs to be created by 
contacting the
+appropriate service, restarting the above process.
+
+
+## DT Renewal, Renewers, and YARN
+
+This is the most confusing part of DT handling, and part of it is because much 
of the system was
+designed with MapReduce, and later YARN, in mind.
+
+As seen above, DTs need to be renewed periodically until they finally expire 
for good. An example of
+this is the default configuration of HDFS services: delegation tokens are 
valid for up to 7 days,
+and need to be renewed every 24 hours. If 24 hours pass wit

[GitHub] [spark] ulysses-you commented on pull request #35135: [SPARK-35442][SQL] Support propagate empty relation through aggregate

2022-01-07 Thread GitBox


ulysses-you commented on pull request #35135:
URL: https://github.com/apache/spark/pull/35135#issuecomment-1007463332


   ```
   [info] - SPARK-37578: Update output metrics from Datasource v2 *** FAILED 
*** (65 milliseconds)
   [info]   123 did not equal 246 (SQLAppStatusListenerSuite.scala:936)
   ```
   The failed test is not related, also find in 
[https://github.com/apache/spark/pull/35127](https://github.com/HyukjinKwon/spark/runs/4737688471?check_suite_focus=true).
 cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #35126: [SPARK-37836][PYTHON][INFRA] Enable F841, E722, E305 and E226 for PEP 8 compliance

2022-01-07 Thread GitBox


HyukjinKwon closed pull request #35126:
URL: https://github.com/apache/spark/pull/35126


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #35126: [SPARK-37836][PYTHON][INFRA] Enable F841, E722, E305 and E226 for PEP 8 compliance

2022-01-07 Thread GitBox


HyukjinKwon commented on pull request #35126:
URL: https://github.com/apache/spark/pull/35126#issuecomment-1007461103


   Merged to master. Thanks guys!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #35130: [SPARK-37839][SQL] DS V2 supports partial aggregate push-down AVG

2022-01-07 Thread GitBox


cloud-fan commented on pull request #35130:
URL: https://github.com/apache/spark/pull/35130#issuecomment-1007449447


   What's the new algorithm? It's simple before: if agg funcs contain 
`GeneralAggregateFunc`, don't try partial pushdown. Othwewise, try complete 
pushdown first, then partial pushdown.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zero323 opened a new pull request #35136: [WIP][SPARK-37418][PYTHON][ML] Inline annotations for pyspark.ml.param.__init__.py

2022-01-07 Thread GitBox


zero323 opened a new pull request #35136:
URL: https://github.com/apache/spark/pull/35136


   
   
   ### What changes were proposed in this pull request?
   
   
   Migration of type hints from `pyspark.ml.param.__init__.pyi` to 
`pyspark.ml.param.__init__.py`.
   
   ### Why are the changes needed?
   
   
   Part of ongoing migration of type stubs to inline annotations.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   No.
   
   ### How was this patch tested?
   
   
   
   Existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #35130: [SPARK-37839][SQL] DS V2 supports partial aggregate push-down AVG

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #35130:
URL: https://github.com/apache/spark/pull/35130#discussion_r780295489



##
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##
@@ -223,7 +225,11 @@ abstract class JdbcDialect extends Serializable with 
Logging{
   case f: GeneralAggregateFunc if f.name() == "AVG" =>
 assert(f.inputs().length == 1)
 val distinct = if (f.isDistinct) "DISTINCT " else ""
-Some(s"AVG($distinct${f.inputs().head})")
+if (supportCompletePushDown) {
+  Some(s"AVG($distinct${f.inputs().head})")
+} else {
+  Some(s"Sum($distinct${f.inputs().head}), 
Count($distinct${f.inputs().head})")

Review comment:
   why do we need it? It's Spark's responsibility to rewrite avg to 
sum/count, not the data source.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #35130: [SPARK-37839][SQL] DS V2 supports partial aggregate push-down AVG

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #35130:
URL: https://github.com/apache/spark/pull/35130#discussion_r780294560



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##
@@ -212,7 +224,10 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
 
   private def supportPartialAggPushDown(agg: Aggregation): Boolean = {
 // We don't know the agg buffer of `GeneralAggregateFunc`, so can't do 
partial agg push down.
-agg.aggregateExpressions().forall(!_.isInstanceOf[GeneralAggregateFunc])
+agg.aggregateExpressions().forall { aggregateFunc =>
+  !aggregateFunc.isInstanceOf[GeneralAggregateFunc] ||
+aggregateFunc.asInstanceOf[GeneralAggregateFunc].name() == "AVG"

Review comment:
   If `AVG` can be partially pushed, I'd like to create a dedicated class 
for it, instead of using `GeneralAggregateFunc`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #34464: [SPARK-37193][SQL] DynamicJoinSelection.shouldDemoteBroadcastHashJoin should not apply to outer joins

2022-01-07 Thread GitBox


cloud-fan commented on pull request #34464:
URL: https://github.com/apache/spark/pull/34464#issuecomment-1007438535


   I'm reverting this to avoid mistakenly releasing a performance regression in 
Spark 3.3. Please resubmit this PR with SPARK-37753 resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #35131: [SPARK-37827][SQL] Put the some built-in table properties into V1Table.propertie to adapt to V2 command

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #35131:
URL: https://github.com/apache/spark/pull/35131#discussion_r780287813



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
##
@@ -55,7 +56,8 @@ private[sql] case class V1Table(v1Table: CatalogTable) 
extends Table {
 }
   }
 
-  override lazy val properties: util.Map[String, String] = 
v1Table.properties.asJava
+  override lazy val properties: util.Map[String, String] =
+V1Table.filterTableProperties(v1Table).asJava

Review comment:
   I think we should generate these v2 table properties here, instead of 
when creating the table. It's a breaking change to generate v2 properties when 
creating tables, as it changes what to store in the metastore, which may have 
unknown consequences.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #35131: [SPARK-37827][SQL] Put the some built-in table properties into V1Table.propertie to adapt to V2 command

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #35131:
URL: https://github.com/apache/spark/pull/35131#discussion_r780285665



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
##
@@ -384,9 +384,12 @@ case class CatalogTable(
   def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
 val map = new mutable.LinkedHashMap[String, String]()
 val tableProperties = properties
-  .filterKeys(!_.startsWith(VIEW_PREFIX))
+  .filter(!_._1.startsWith(VIEW_PREFIX))
+  .filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1))
+  .filter(!_._1.startsWith(TableCatalog.OPTION_PREFIX))
+  .filter(_._1 != TableCatalog.PROP_EXTERNAL)
   .toSeq.sortBy(_._1)
-  .map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")

Review comment:
   why this change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #35131: [SPARK-37827][SQL] Put the some built-in table properties into V1Table.propertie to adapt to V2 command

2022-01-07 Thread GitBox


cloud-fan commented on a change in pull request #35131:
URL: https://github.com/apache/spark/pull/35131#discussion_r780285249



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
##
@@ -384,9 +384,12 @@ case class CatalogTable(
   def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
 val map = new mutable.LinkedHashMap[String, String]()
 val tableProperties = properties
-  .filterKeys(!_.startsWith(VIEW_PREFIX))
+  .filter(!_._1.startsWith(VIEW_PREFIX))
+  .filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1))
+  .filter(!_._1.startsWith(TableCatalog.OPTION_PREFIX))
+  .filter(_._1 != TableCatalog.PROP_EXTERNAL)

Review comment:
   hmm, isn't `PROP_EXTERNAL` included in `TABLE_RESERVED_PROPERTIES`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Peng-Lei commented on a change in pull request #35107: [SPARK-37818][DOCS] Add option in the description document for show create table

2022-01-07 Thread GitBox


Peng-Lei commented on a change in pull request #35107:
URL: https://github.com/apache/spark/pull/35107#discussion_r780269024



##
File path: docs/sql-ref-syntax-aux-show-create-table.md
##
@@ -26,7 +26,7 @@ license: |
 ### Syntax
 
 ```sql
-SHOW CREATE TABLE table_identifier
+SHOW CREATE TABLE table_identifier [ AS SERDE ]

Review comment:
   ok, I will do it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Peng-Lei commented on a change in pull request #35107: [SPARK-37818][DOCS] Add option in the description document for show create table

2022-01-07 Thread GitBox


Peng-Lei commented on a change in pull request #35107:
URL: https://github.com/apache/spark/pull/35107#discussion_r780268917



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
##
@@ -1952,6 +1952,22 @@ class DataSourceV2SQLSuite
 }
   }
 
+  test("xxx") {

Review comment:
   Sorry. remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.

2022-01-07 Thread GitBox


gaborgsomogyi commented on a change in pull request #23348:
URL: https://github.com/apache/spark/pull/23348#discussion_r780256850



##
File path: core/src/main/scala/org/apache/spark/deploy/security/README.md
##
@@ -0,0 +1,249 @@
+# Delegation Token Handling In Spark
+
+This document aims to explain and demystify delegation tokens as they are used 
by Spark, since
+this topic is generally a huge source of confusion.
+
+
+## What are delegation tokens and why use them?
+
+Delegation tokens (DTs from now on) are authentication tokens used by some 
services to replace
+Kerberos service tokens. Many services in the Hadoop ecosystem have support 
for DTs, since they
+have some very desirable advantages over Kerberos tokens:
+
+* No need to distribute Kerberos credentials
+
+In a distributed application, distributing Kerberos credentials is tricky. Not 
all users have
+keytabs, and when they do, it's generally frowned upon to distribute them over 
the network as
+part of application data.
+
+DTs allow for a single place (e.g. the Spark driver) to require Kerberos 
credentials. That entity
+can then distribute the DTs to other parts of the distributed application 
(e.g. Spark executors),
+so they can authenticate to services.
+
+* A single token per service is used for authentication
+
+If Kerberos authentication were used, each client connection to a server would 
require a trip
+to the KDC and generation of a service ticket. In a distributed system, the 
number of service
+tickets can balloon pretty quickly when you think about the number of client 
processes (e.g. Spark
+executors) vs. the number of service processes (e.g. HDFS DataNodes). That 
generates unnecessary
+extra load on the KDC, and may even run into usage limits set up by the KDC 
admin.
+
+* DTs are only used for authentication
+
+DTs, unlike TGTs, can only be used to authenticate to the specific service for 
which they were
+issued. You cannot use an existing DT to create new DTs or to create DTs for a 
different service.
+
+So in short, DTs are *not* Kerberos tokens. They are used by many services to 
replace Kerberos
+authentication, or even other forms of authentication, although there is 
nothing (aside from
+maybe implementation details) that ties them to Kerberos or any other 
authentication mechanism.
+
+
+## Lifecycle of DTs
+
+DTs, unlike Kerberos tokens, are service-specific. There is no centralized 
location you contact
+to create a DT for a service. So, the first step needed to get a DT is being 
able to authenticate
+to the service in question. In the Hadoop ecosystem, that is generally done 
using Kerberos.
+
+This requires Kerberos credentials to be available somewhere for the 
application to use. The user
+is generally responsible for providing those credentials, which is most 
commonly done by logging
+in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" 
containing a TGT
+(ticket granting ticket), which can then be used to request service tickets.
+
+There are other ways of obtaining TGTs, but, ultimately, you need a TGT to 
bootstrap the process.
+
+Once a TGT is available, the target service's client library can then be used 
to authenticate
+to the service using the Kerberos credentials, and request the creation of a 
delegation token.
+This token can now be sent to other processes and used to authenticate to 
different daemons
+belonging to that service.
+
+And thus the first drawback of DTs becomes apparent: you need service-specific 
logic to create and
+use them.
+
+Spark implements a (somewhat) pluggable, internal DT creation API. Support for 
new services can be
+added by implementing a `HadoopDelegationTokenProvider` that is then called by 
Spark when generating
+delegation tokens for an application. Spark makes the DTs available to code by 
stashing them in the
+`UserGroupInformation` credentials, and it's up to the DT provider and the 
respective client library
+to agree on how to use those tokens.
+
+Once they are created, the semantics of how DTs operate are also 
service-specific. But, in general,
+they try to follow the semantics of Kerberos tokens:
+
+* A "renewable period (equivalent to TGT's "lifetime") which is for how long 
the DT is valid
+  before it requires renewal.
+* A "max lifetime" (equivalent to TGT's "renewable life") which is for how 
long the DT can be
+  renewed.
+
+Once the token reaches its "max lifetime", a new one needs to be created by 
contacting the
+appropriate service, restarting the above process.
+
+
+## DT Renewal, Renewers, and YARN
+
+This is the most confusing part of DT handling, and part of it is because much 
of the system was
+designed with MapReduce, and later YARN, in mind.
+
+As seen above, DTs need to be renewed periodically until they finally expire 
for good. An example of
+this is the default configuration of HDFS services: delegation tokens are 
valid for up to 7 days,
+and need to be renewed every 24 hours. If 24 hours pass

[GitHub] [spark] gaborgsomogyi commented on a change in pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.

2022-01-07 Thread GitBox


gaborgsomogyi commented on a change in pull request #23348:
URL: https://github.com/apache/spark/pull/23348#discussion_r780256850



##
File path: core/src/main/scala/org/apache/spark/deploy/security/README.md
##
@@ -0,0 +1,249 @@
+# Delegation Token Handling In Spark
+
+This document aims to explain and demystify delegation tokens as they are used 
by Spark, since
+this topic is generally a huge source of confusion.
+
+
+## What are delegation tokens and why use them?
+
+Delegation tokens (DTs from now on) are authentication tokens used by some 
services to replace
+Kerberos service tokens. Many services in the Hadoop ecosystem have support 
for DTs, since they
+have some very desirable advantages over Kerberos tokens:
+
+* No need to distribute Kerberos credentials
+
+In a distributed application, distributing Kerberos credentials is tricky. Not 
all users have
+keytabs, and when they do, it's generally frowned upon to distribute them over 
the network as
+part of application data.
+
+DTs allow for a single place (e.g. the Spark driver) to require Kerberos 
credentials. That entity
+can then distribute the DTs to other parts of the distributed application 
(e.g. Spark executors),
+so they can authenticate to services.
+
+* A single token per service is used for authentication
+
+If Kerberos authentication were used, each client connection to a server would 
require a trip
+to the KDC and generation of a service ticket. In a distributed system, the 
number of service
+tickets can balloon pretty quickly when you think about the number of client 
processes (e.g. Spark
+executors) vs. the number of service processes (e.g. HDFS DataNodes). That 
generates unnecessary
+extra load on the KDC, and may even run into usage limits set up by the KDC 
admin.
+
+* DTs are only used for authentication
+
+DTs, unlike TGTs, can only be used to authenticate to the specific service for 
which they were
+issued. You cannot use an existing DT to create new DTs or to create DTs for a 
different service.
+
+So in short, DTs are *not* Kerberos tokens. They are used by many services to 
replace Kerberos
+authentication, or even other forms of authentication, although there is 
nothing (aside from
+maybe implementation details) that ties them to Kerberos or any other 
authentication mechanism.
+
+
+## Lifecycle of DTs
+
+DTs, unlike Kerberos tokens, are service-specific. There is no centralized 
location you contact
+to create a DT for a service. So, the first step needed to get a DT is being 
able to authenticate
+to the service in question. In the Hadoop ecosystem, that is generally done 
using Kerberos.
+
+This requires Kerberos credentials to be available somewhere for the 
application to use. The user
+is generally responsible for providing those credentials, which is most 
commonly done by logging
+in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" 
containing a TGT
+(ticket granting ticket), which can then be used to request service tickets.
+
+There are other ways of obtaining TGTs, but, ultimately, you need a TGT to 
bootstrap the process.
+
+Once a TGT is available, the target service's client library can then be used 
to authenticate
+to the service using the Kerberos credentials, and request the creation of a 
delegation token.
+This token can now be sent to other processes and used to authenticate to 
different daemons
+belonging to that service.
+
+And thus the first drawback of DTs becomes apparent: you need service-specific 
logic to create and
+use them.
+
+Spark implements a (somewhat) pluggable, internal DT creation API. Support for 
new services can be
+added by implementing a `HadoopDelegationTokenProvider` that is then called by 
Spark when generating
+delegation tokens for an application. Spark makes the DTs available to code by 
stashing them in the
+`UserGroupInformation` credentials, and it's up to the DT provider and the 
respective client library
+to agree on how to use those tokens.
+
+Once they are created, the semantics of how DTs operate are also 
service-specific. But, in general,
+they try to follow the semantics of Kerberos tokens:
+
+* A "renewable period (equivalent to TGT's "lifetime") which is for how long 
the DT is valid
+  before it requires renewal.
+* A "max lifetime" (equivalent to TGT's "renewable life") which is for how 
long the DT can be
+  renewed.
+
+Once the token reaches its "max lifetime", a new one needs to be created by 
contacting the
+appropriate service, restarting the above process.
+
+
+## DT Renewal, Renewers, and YARN
+
+This is the most confusing part of DT handling, and part of it is because much 
of the system was
+designed with MapReduce, and later YARN, in mind.
+
+As seen above, DTs need to be renewed periodically until they finally expire 
for good. An example of
+this is the default configuration of HDFS services: delegation tokens are 
valid for up to 7 days,
+and need to be renewed every 24 hours. If 24 hours pass

[GitHub] [spark] zero323 commented on pull request #35124: [WIP][SPARK-37398][PYTHON] Inline type hints for python/pyspark/ml/classification.py

2022-01-07 Thread GitBox


zero323 commented on pull request #35124:
URL: https://github.com/apache/spark/pull/35124#issuecomment-1007376991


   Thanks @javierivanov. Let's revisit this after all prerequisites are done 
(core for starters, but there are also internal dependencies that have to be 
followed).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you opened a new pull request #35135: [SPARK-35442][SQL] Support propagate empty relation through aggregate

2022-01-07 Thread GitBox


ulysses-you opened a new pull request #35135:
URL: https://github.com/apache/spark/pull/35135


   
   
   ### What changes were proposed in this pull request?
   
   - Add `LogicalQueryStage(_, agg: BaseAggregateExec)` check in 
`AQEPropagateEmptyRelation`
   - Add `LeafNode` check in `PropagateEmptyRelationBase`, so we can eliminate 
`LogicalQueryStage` to `LocalRelation`
   
   ### Why are the changes needed?
   
   The Aggregate in AQE is different with others, the `LogicalQueryStage` looks 
like `LogicalQueryStage(Aggregate, BaseAggregate)`. We should handle this case 
specially.
   
   Logically, if the Aggregate grouping expression is not empty, we can 
eliminate it safely.
   
   ### Does this PR introduce _any_ user-facing change?
   
   no
   
   ### How was this patch tested?
   
   add new test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Peng-Lei commented on pull request #35131: [SPARK-37827][SQL] Put the some built-in table properties into V1Table.propertie to adapt to V2 command

2022-01-07 Thread GitBox


Peng-Lei commented on pull request #35131:
URL: https://github.com/apache/spark/pull/35131#issuecomment-1007364208


   @cloud-fan @imback82 @huaxingao Could you take a look? Thank you very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

2022-01-07 Thread GitBox


pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842


   Hi @otterc I got more information for this issue.
   
   Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
   
   ```java
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
   ...
   for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) 
{
   synchronized (partition) {
 try {
   // This can throw IOException which will marks this shuffle 
partition as not merged.
   partition.finalizePartition();
   bitmaps.add(partition.mapTracker);
   reduceIds.add(partition.reduceId);
   sizes.add(partition.getLastChunkOffset());
 } catch (IOException ioe) {
   logger.warn("Exception while finalizing shuffle partition {}_{} 
{} {}", msg.appId,
 msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
 } finally {
   partition.closeAllFilesAndDeleteIfNeeded(false);
 }
   }
   +   assert partition.dataFile.length() == partition.lastChunkOffset;
   +   assert partition.indexFile.file.length() == 
partition.indexFile.getPos();
   +   assert partition.metaFile.file.length() == 
partition.metaFile.getPos();
   +   logger.info("shuffle partition {}_{} {} {}, chunk_size={}, 
meta_length={}, data_length={}",
   +  msg.appId, msg.appAttemptId, msg.shuffleId, 
partition.reduceId,
   +  partition.indexFile.getPos() / 8 - 1,
   +  partition.metaFile.getPos(),
   +  partition.lastChunkOffset);
}
 mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
   bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
   Longs.toArray(sizes));
   }
   ...
   }
   ```
   
   Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
   ```scala
   override def getMergedBlockData(
   blockId: ShuffleMergedBlockId,
   dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
 val indexFile =
   getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.shuffleMergeId,
 blockId.reduceId, dirs)
 val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
   blockId.shuffleMergeId, blockId.reduceId, dirs)
 val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
   blockId.shuffleMergeId, blockId.reduceId, dirs)
 // Load all the indexes in order to identify all chunks in the specified 
merged shuffle file.
 val size = indexFile.length.toInt
 val offsets = Utils.tryWithResource {
   new DataInputStream(Files.newInputStream(indexFile.toPath))
 } { dis =>
   val buffer = ByteBuffer.allocate(size)
   dis.readFully(buffer.array)
   buffer.asLongBuffer
 }
 // Number of chunks is number of indexes - 1
 val numChunks = size / 8 - 1
   + if (numChunks == 0) {
   +   val indexBackupPath = 
java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
   +   val dataBackupPath = 
java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
   +   val metaBackupPath = 
java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
   +   logError(s"$blockId chunk_size is 0, " +
   +  s"index_file is $indexFile, backup to $indexBackupPath" +
   +  s"data_file is $dataFile, backup to $dataBackupPath" +
   +  s"meta_file is $metaFile, backup to $metaBackupPath")
   +   Files.copy(indexFile.toPath, indexBackupPath)
   +   Files.copy(dataFile.toPath, dataBackupPath)
   +   Files.copy(metaFile.toPath, metaBackupPath)
   +   assert(false)
 }
 for (index <- 0 until numChunks) yield {
   new FileSegmentManagedBuffer(transportConf, dataFile,
 offsets.get(index),
 offsets.get(index + 1) - offsets.get(index))
 }
   }
   ```
   
   Then I run TPCDS several rounds and reproduce the exception.
   
   Assertion failed in reduce task side.
   ```log
   01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: 
ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job 
aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most 
recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 
executor 562): java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
at 
org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
at 
org.apache.spark.util.Completi

[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

2022-01-07 Thread GitBox


pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842


   Hi @otterc I got more information for this issue.
   
   Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
   
   ```java
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
   ...
   for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) 
{
   synchronized (partition) {
 try {
   // This can throw IOException which will marks this shuffle 
partition as not merged.
   partition.finalizePartition();
   bitmaps.add(partition.mapTracker);
   reduceIds.add(partition.reduceId);
   sizes.add(partition.getLastChunkOffset());
 } catch (IOException ioe) {
   logger.warn("Exception while finalizing shuffle partition {}_{} 
{} {}", msg.appId,
 msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
 } finally {
   partition.closeAllFilesAndDeleteIfNeeded(false);
 }
   }
   +   assert partition.dataFile.length() == partition.lastChunkOffset;
   +   assert partition.indexFile.file.length() == 
partition.indexFile.getPos();
   +   assert partition.metaFile.file.length() == 
partition.metaFile.getPos();
   +   logger.info("shuffle partition {}_{} {} {}, chunk_size={}, 
meta_length={}, data_length={}",
   +  msg.appId, msg.appAttemptId, msg.shuffleId, 
partition.reduceId,
   +  partition.indexFile.getPos() / 8 - 1,
   +  partition.metaFile.getPos(),
   +  partition.lastChunkOffset);
}
 mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
   bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
   Longs.toArray(sizes));
   }
   ...
   }
   ```
   
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle 
partition application_1640143179334_0148_-1 126 4877, chunk_size=1, 
meta_length=18, data_length=157
   ```
   
   Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
   ```scala
   override def getMergedBlockData(
   blockId: ShuffleMergedBlockId,
   dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
 val indexFile =
   getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.shuffleMergeId,
 blockId.reduceId, dirs)
 val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
   blockId.shuffleMergeId, blockId.reduceId, dirs)
 val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
   blockId.shuffleMergeId, blockId.reduceId, dirs)
 // Load all the indexes in order to identify all chunks in the specified 
merged shuffle file.
 val size = indexFile.length.toInt
 val offsets = Utils.tryWithResource {
   new DataInputStream(Files.newInputStream(indexFile.toPath))
 } { dis =>
   val buffer = ByteBuffer.allocate(size)
   dis.readFully(buffer.array)
   buffer.asLongBuffer
 }
 // Number of chunks is number of indexes - 1
 val numChunks = size / 8 - 1
   + if (numChunks == 0) {
   +   val indexBackupPath = 
java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
   +   val dataBackupPath = 
java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
   +   val metaBackupPath = 
java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
   +   logError(s"$blockId chunk_size is 0, " +
   +  s"index_file is $indexFile, backup to $indexBackupPath" +
   +  s"data_file is $dataFile, backup to $dataBackupPath" +
   +  s"meta_file is $metaFile, backup to $metaBackupPath")
   +   Files.copy(indexFile.toPath, indexBackupPath)
   +   Files.copy(dataFile.toPath, dataBackupPath)
   +   Files.copy(metaFile.toPath, metaBackupPath)
   +   assert(false)
 }
 for (index <- 0 until numChunks) yield {
   new FileSegmentManagedBuffer(transportConf, dataFile,
 offsets.get(index),
 offsets.get(index + 1) - offsets.get(index))
 }
   }
   ```
   
   Then I run TPCDS several rounds and reproduce the exception.
   
   ```log
   01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: 
ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job 
aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most 
recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 
executor 562): java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
at 
org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945

[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

2022-01-07 Thread GitBox


pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842


   Hi @otterc I got more information for this issue.
   
   Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
   
   ```java
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
   ...
   for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) 
{
   synchronized (partition) {
 try {
   // This can throw IOException which will marks this shuffle 
partition as not merged.
   partition.finalizePartition();
   bitmaps.add(partition.mapTracker);
   reduceIds.add(partition.reduceId);
   sizes.add(partition.getLastChunkOffset());
 } catch (IOException ioe) {
   logger.warn("Exception while finalizing shuffle partition {}_{} 
{} {}", msg.appId,
 msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
 } finally {
   partition.closeAllFilesAndDeleteIfNeeded(false);
 }
   }
   +   assert partition.dataFile.length() == partition.lastChunkOffset;
   +   assert partition.indexFile.file.length() == 
partition.indexFile.getPos();
   +   assert partition.metaFile.file.length() == 
partition.metaFile.getPos();
   +   logger.info("shuffle partition {}_{} {} {}, chunk_size={}, 
meta_length={}, data_length={}",
   +  msg.appId, msg.appAttemptId, msg.shuffleId, 
partition.reduceId,
   +  partition.indexFile.getPos() / 8 - 1,
   +  partition.metaFile.getPos(),
   +  partition.lastChunkOffset);
   +}
 mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
   bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
   Longs.toArray(sizes));
   }
   ...
   }
   ```
   
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle 
partition application_1640143179334_0148_-1 126 4877, chunk_size=1, 
meta_length=18, data_length=157
   ```
   
   Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
   ```scala
   override def getMergedBlockData(
   blockId: ShuffleMergedBlockId,
   dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
 val indexFile =
   getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.shuffleMergeId,
 blockId.reduceId, dirs)
 val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
   blockId.shuffleMergeId, blockId.reduceId, dirs)
 val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
   blockId.shuffleMergeId, blockId.reduceId, dirs)
 // Load all the indexes in order to identify all chunks in the specified 
merged shuffle file.
 val size = indexFile.length.toInt
 val offsets = Utils.tryWithResource {
   new DataInputStream(Files.newInputStream(indexFile.toPath))
 } { dis =>
   val buffer = ByteBuffer.allocate(size)
   dis.readFully(buffer.array)
   buffer.asLongBuffer
 }
 // Number of chunks is number of indexes - 1
 val numChunks = size / 8 - 1
   + if (numChunks == 0) {
   +   val indexBackupPath = 
java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
   +   val dataBackupPath = 
java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
   +   val metaBackupPath = 
java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
   +   logError(s"$blockId chunk_size is 0, " +
   +  s"index_file is $indexFile, backup to $indexBackupPath" +
   +  s"data_file is $dataFile, backup to $dataBackupPath" +
   +  s"meta_file is $metaFile, backup to $metaBackupPath")
   +   Files.copy(indexFile.toPath, indexBackupPath)
   +   Files.copy(dataFile.toPath, dataBackupPath)
   +   Files.copy(metaFile.toPath, metaBackupPath)
   +   assert(false)
 }
 for (index <- 0 until numChunks) yield {
   new FileSegmentManagedBuffer(transportConf, dataFile,
 offsets.get(index),
 offsets.get(index + 1) - offsets.get(index))
 }
   }
   ```
   
   Then I run TPCDS several rounds and reproduce the exception.
   
   ```log
   01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: 
ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job 
aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most 
recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 
executor 562): java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
at 
org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945

[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

2022-01-07 Thread GitBox


pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842


   Hi @otterc I got more information for this issue.
   
   Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
   
   ```java
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
   ...
   for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) 
{
   synchronized (partition) {
 try {
   // This can throw IOException which will marks this shuffle 
partition as not merged.
   partition.finalizePartition();
   bitmaps.add(partition.mapTracker);
   reduceIds.add(partition.reduceId);
   sizes.add(partition.getLastChunkOffset());
 } catch (IOException ioe) {
   logger.warn("Exception while finalizing shuffle partition {}_{} 
{} {}", msg.appId,
 msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
 } finally {
   partition.closeAllFilesAndDeleteIfNeeded(false);
 }
   }
   assert partition.dataFile.length() == partition.lastChunkOffset;
   assert partition.indexFile.file.length() == 
partition.indexFile.getPos();
   assert partition.metaFile.file.length() == 
partition.metaFile.getPos();
   logger.info("shuffle partition {}_{} {} {}, chunk_size={}, 
meta_length={}, data_length={}",
   msg.appId, msg.appAttemptId, msg.shuffleId, 
partition.reduceId,
   partition.indexFile.getPos() / 8 - 1,
   partition.metaFile.getPos(),
   partition.lastChunkOffset);
 }
 mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
   bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
   Longs.toArray(sizes));
   }
   ...
   }
   ```
   
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle 
partition application_1640143179334_0148_-1 126 4877, chunk_size=1, 
meta_length=18, data_length=157
   ```
   
   Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
   ```scala
   override def getMergedBlockData(
   blockId: ShuffleMergedBlockId,
   dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
 val indexFile =
   getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.shuffleMergeId,
 blockId.reduceId, dirs)
 val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
   blockId.shuffleMergeId, blockId.reduceId, dirs)
 val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
   blockId.shuffleMergeId, blockId.reduceId, dirs)
 // Load all the indexes in order to identify all chunks in the specified 
merged shuffle file.
 val size = indexFile.length.toInt
 val offsets = Utils.tryWithResource {
   new DataInputStream(Files.newInputStream(indexFile.toPath))
 } { dis =>
   val buffer = ByteBuffer.allocate(size)
   dis.readFully(buffer.array)
   buffer.asLongBuffer
 }
 // Number of chunks is number of indexes - 1
 val numChunks = size / 8 - 1
 if (numChunks == 0) {
   val indexBackupPath = 
java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
   val dataBackupPath = 
java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
   val metaBackupPath = 
java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
   logError(s"$blockId chunk_size is 0, " +
 s"index_file is $indexFile, backup to $indexBackupPath" +
 s"data_file is $dataFile, backup to $dataBackupPath" +
 s"meta_file is $metaFile, backup to $metaBackupPath")
   Files.copy(indexFile.toPath, indexBackupPath)
   Files.copy(dataFile.toPath, dataBackupPath)
   Files.copy(metaFile.toPath, metaBackupPath)
   assert(false)
 }
 for (index <- 0 until numChunks) yield {
   new FileSegmentManagedBuffer(transportConf, dataFile,
 offsets.get(index),
 offsets.get(index + 1) - offsets.get(index))
 }
   }
   ```
   
   Then I run TPCDS several rounds and reproduce the exception.
   
   ```log
   01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: 
ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job 
aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most 
recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 
executor 562): java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
at 
org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
   

[GitHub] [spark] zero323 commented on pull request #35126: [SPARK-37836][PYTHON][INFRA] Enable F841, E722, E305 and E226 for PEP 8 compliance

2022-01-07 Thread GitBox


zero323 commented on pull request #35126:
URL: https://github.com/apache/spark/pull/35126#issuecomment-1007350779


   E305, E226 should be covered by black, if enabled for particular path, but 
there is no harm in adding them.
   
   Personally, I have mixed feelings about E722 ‒ using `BaseException` or 
`Exception` doesn't really seem to add anything. But I guess there is no harm 
in that either :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon edited a comment on pull request #35121: [SPARK-37833][INFRA] Add `precondition` job to skip the main GitHub Action jobs

2022-01-07 Thread GitBox


HyukjinKwon edited a comment on pull request #35121:
URL: https://github.com/apache/spark/pull/35121#issuecomment-1007292642


   Oops, the test is broken in the commits e.g., 
https://github.com/apache/spark/runs/4737174642?check_suite_focus=true (PRs are 
not affected). I made a quick followup but seems like we need some more fixes 
in `is-changed.py` script .. let me revert this and my own followup for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #35121: [SPARK-37833][INFRA] Add `precondition` job to skip the main GitHub Action jobs

2022-01-07 Thread GitBox


HyukjinKwon commented on pull request #35121:
URL: https://github.com/apache/spark/pull/35121#issuecomment-1007292642


   Oops, the test is broken in the commits (PRs are not affected). I made a 
quick followup but seems like we need some more fixes in `is-changed.py` script 
.. let me revert this and my own followup for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #35133: [SPARK-37833][INFRA][FOLLOW-UP] Run checking modules of precondition only in forked repository

2022-01-07 Thread GitBox


HyukjinKwon closed pull request #35133:
URL: https://github.com/apache/spark/pull/35133


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #35133: [SPARK-37833][INFRA][FOLLOW-UP] Run checking modules of precondition only in forked repository

2022-01-07 Thread GitBox


HyukjinKwon commented on pull request #35133:
URL: https://github.com/apache/spark/pull/35133#issuecomment-1007285278


   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #35133: [SPARK-37833][INFRA][FOLLOW-UP] Run checking modules of precondition only in forked repository

2022-01-07 Thread GitBox


HyukjinKwon commented on pull request #35133:
URL: https://github.com/apache/spark/pull/35133#issuecomment-1007284852


   cc @dongjoon-hyun, let me just merge this to fix up the build. If this 
doesn't work, I will just revert this and #35121.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang opened a new pull request #35134: [SPARK-37842][SQL][MINIOR] Use `multi-catch` to simplify duplicate exception handling behavior in Java code

2022-01-07 Thread GitBox


LuciferYang opened a new pull request #35134:
URL: https://github.com/apache/spark/pull/35134


   ### What changes were proposed in this pull request?
   Java 7 began to support the `Multiple exception catching`, This pr uses it 
to simplify exception handling in Java code.
   
   
   
   ### Why are the changes needed?
   Code simplification
   
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Pass GA
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon opened a new pull request #35133: [SPARK-37833][INFRA][FOLLOW-UP] Run checking modules of precondition only in forked repository

2022-01-07 Thread GitBox


HyukjinKwon opened a new pull request #35133:
URL: https://github.com/apache/spark/pull/35133


   ### What changes were proposed in this pull request?
   
   This PR is a followup of https://github.com/apache/spark/pull/35121. We 
should run "Check all modules" in precondition job only in the forked 
repository because `is-changed.py` requires `APACHE_SPARK_REF` to be set: 
https://github.com/apache/spark/blob/master/dev/is-changed.py#L60
   
   ### Why are the changes needed?
   
   To fix broken build in main branch. PRs are not affected.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, dev-only.
   
   ### How was this patch tested?
   
   Should be merged to test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >