[GitHub] [spark] zhouyejoe commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663656292 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -2225,4 +2225,14 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val APP_ATTEMPT_ID = +ConfigBuilder("spark.app.attempt.id") Review comment: And which version should we mark it there? I added "1.3.0" since the support for multiple attempts in Yarn was added since 1.3.0. But if we change to "spark.app.attemptNumber", we should start from "3.2.0". Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33209: [SPARK-36013][BUILD] Upgrade Dropwizard Metrics to 4.2.2
SparkQA commented on pull request #33209: URL: https://github.com/apache/spark/pull/33209#issuecomment-873823116 **[Test build #140630 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140630/testReport)** for PR 33209 at commit [`20ff0f6`](https://github.com/apache/spark/commit/20ff0f6ef895cebd0c35b8806a64f121ef362eac). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663655645 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -2225,4 +2225,14 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val APP_ATTEMPT_ID = +ConfigBuilder("spark.app.attempt.id") Review comment: AppAttemptId is currently used in lots of places to indicate the attempt id from Yarn, should we keep it as "spark.app.attempt.id" to align with all other places? Or we just want to change the configuration string to "spark.app.attemptNumber"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33207: [SPARK-33996][BUILD] Match SBT's plugin checkstyle version to Maven's
SparkQA commented on pull request #33207: URL: https://github.com/apache/spark/pull/33207#issuecomment-873819616 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45142/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sarutak commented on a change in pull request #33209: [SPARK-36013][BUILD] Upgrade Dropwizard Metrics to 4.2.2
sarutak commented on a change in pull request #33209: URL: https://github.com/apache/spark/pull/33209#discussion_r663649835 ## File path: pom.xml ## @@ -147,7 +147,7 @@ If you changes codahale.metrics.version, you also need to change the link to metrics.dropwizard.io in docs/monitoring.md. --> -4.2.0 +4.2.2 Review comment: The URL for Dropwizard Metrics is still https://metrics.dropwizard.io/4.2.0/ so the document is not changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state
Ngone51 commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663649791 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -2225,4 +2225,14 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val APP_ATTEMPT_ID = +ConfigBuilder("spark.app.attempt.id") Review comment: `attemptNumber` sounds better to me. If this conf can't be set by users, I think it's ok to keep it internal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33208: [SPARK-36010][BUILD] Upgrade sbt-antlr4 from 0.8.2 to 0.8.3
SparkQA commented on pull request #33208: URL: https://github.com/apache/spark/pull/33208#issuecomment-873816616 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45141/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state
Ngone51 commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663649281 ## File path: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java ## @@ -419,4 +419,11 @@ public long mergedIndexCacheSize() { public int ioExceptionsThresholdDuringMerge() { return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4); } + + /** + * The application attemptID assigned from Hadoop YARN. + */ + public int appAttemptId() { +return conf.getInt("spark.app.attempt.id", -1); Review comment: Use the ConfigBuilder? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sarutak opened a new pull request #33209: [SPARK-36013][BUILD] Upgrade Dropwizard Metrics to 4.2.2
sarutak opened a new pull request #33209: URL: https://github.com/apache/spark/pull/33209 ### What changes were proposed in this pull request? This PR aims to upgrade Dropwizard Metrics from `4.2.0` to `4.2.2`. ### Why are the changes needed? Dropwizard `4.2.1` fixes a bug related to `JMXReporter` but `4.2.1` also contains a bug. so upgrading to `4.2.2` seems better. https://github.com/dropwizard/metrics/releases/tag/v4.2.1 https://github.com/dropwizard/metrics/releases/tag/v4.2.2 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state
Ngone51 commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663649190 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -2225,4 +2225,14 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val APP_ATTEMPT_ID = +ConfigBuilder("spark.app.attempt.id") + .internal() + .doc("The application attempt Id assigned from Hadoop YARN. " + +"When the application runs in cluster mode on YARN, there can be " + +"multiple attempts before failing the application") + .version("1.3.0") Review comment: Do we have this configuration before? If not, I think it should be `3.2.0`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #31948: [SPARK-34848][CORE] Add duration to TaskMetricDistributions
AngersZh commented on a change in pull request #31948: URL: https://github.com/apache/spark/pull/31948#discussion_r663648579 ## File path: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ## @@ -342,6 +342,7 @@ class ShuffleWriteMetrics private[spark]( class TaskMetricDistributions private[spark]( val quantiles: IndexedSeq[Double], +val duration: IndexedSeq[Double], Review comment: TBH I am not clear what happened since I am not familiar what is mima doing...How should I check about this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state
Ngone51 commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663606607 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java ## @@ -35,7 +35,12 @@ public final String[] localDirs; /** Number of subdirectories created within each localDir. */ public final int subDirsPerLocalDir; - /** Shuffle manager (SortShuffleManager) that the executor is using. */ + /** + * Shuffle manager (SortShuffleManager) that the executor is using. + * If this string contains semicolon, it will also include the meta information + * for push based shuffle in JSON format. Example of the string with semiColon would be: Review comment: `semiColon` -> `semicolon`? ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ## @@ -403,38 +430,88 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { -logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId, - msg.shuffleId, partition.reduceId, ioe); +logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, + msg.attemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFiles(); -// The partition should be removed after the files are written so that any new stream -// for the same reduce partition will see that the data file exists. -partitionsIter.remove(); } } } mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } -partitions.remove(appShuffleId); -logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); +logger.info("Finalized shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.attemptId); return mergeStatuses; } @Override public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { if (logger.isDebugEnabled()) { logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} " -+ "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs), - executorInfo.subDirsPerLocalDir); ++ "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs), +executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager); +} +String shuffleManagerMeta = executorInfo.shuffleManager; +if (shuffleManagerMeta.contains(SHUFFLE_META_DELIMITER)) { + String mergeDirInfo = + shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(SHUFFLE_META_DELIMITER) + 1); + try { +ObjectMapper mapper = new ObjectMapper(); +TypeReference> typeRef + = new TypeReference>(){}; +Map metaMap = mapper.readValue(mergeDirInfo, typeRef); +String mergeDir = metaMap.get(MERGE_DIR_KEY); +int attemptId = Integer.getInteger( + metaMap.getOrDefault(ATTEMPT_ID_KEY, String.valueOf(UNDEFINED_ATTEMPT_ID))); +if (mergeDir == null) { + throw new IllegalArgumentException( +String.format("Failed to get the merge directory information from the " + + "shuffleManagerMeta %s in executor registration message", shuffleManagerMeta)); +} +if (attemptId == UNDEFINED_ATTEMPT_ID) { + // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. + // Only the first ExecutorRegister message can register the merge dirs + appsShuffleInfo.computeIfAbsent(appId, id -> +new AppShuffleInfo( + appId, UNDEFINED_ATTEMPT_ID, + new AppPathsInfo(appId, executorInfo.localDirs, +mergeDir, executorInfo.subDirsPerLocalDir) +)); +} else { + // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. + // The first ExecutorRegister message from the same application attempt wil register + // the merge dirs in External Shuffle Service. Any later ExecutorRegister message + // from the same application attempt will not override the merge dirs. But it can + // be overridden by ExecutorRegister message from newer application attempt, + // and former attempts' shuffle partitions information will also be cleaned up. + AtomicReference originalAppShuffleInfo = new AtomicReference<>(); + appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> { +if (appShuffleInfo == null || attemptId >
[GitHub] [spark] AngersZhuuuu commented on pull request #33183: [SPARK-35972][SQL] When replace ExtractValue in NestedColumnAliasing we should use semanticEquals
AngersZh commented on pull request #33183: URL: https://github.com/apache/spark/pull/33183#issuecomment-873812116 Any more suggestion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #33113: [SPARK-34302][SQL] Migrate ALTER TABLE ... CHANGE COLUMN command to use UnresolvedTable to resolve the identifier
cloud-fan commented on a change in pull request #33113: URL: https://github.com/apache/spark/pull/33113#discussion_r663643466 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -3527,13 +3527,35 @@ class Analyzer(override val catalogManager: CatalogManager) * Rule to mostly resolve, normalize and rewrite column names based on case sensitivity * for alter table commands. */ - object ResolveFieldNames extends Rule[LogicalPlan] { + object ResolveAlterTableCommands extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case a: AlterTableCommand if a.table.resolved => -a.transformExpressions { +val table = a.table.asInstanceOf[ResolvedTable] +val transformed = a.transformExpressions { case u: UnresolvedFieldName => -val table = a.table.asInstanceOf[ResolvedTable] -resolveFieldNames(table.schema, u.name).map(ResolvedFieldName(_)).getOrElse(u) +resolveFieldNames(table.schema, u.name).getOrElse(u) + case u: UnresolvedFieldPosition => u.position match { +case after: After => + resolveFieldNames(table.schema, u.fieldName.init :+ after.column()) Review comment: Do we need to put `fieldName` in `UnresolvedFieldPosition`? We can easily get it via `AlterTableAlterColumn.column.name` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33206: [SPARK-36002][PYTHON] Consolidate tests for data-type-based operations of decimal Series
SparkQA commented on pull request #33206: URL: https://github.com/apache/spark/pull/33206#issuecomment-873798785 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45140/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #33206: [SPARK-36002][PYTHON] Consolidate tests for data-type-based operations of decimal Series
AmplabJenkins commented on pull request #33206: URL: https://github.com/apache/spark/pull/33206#issuecomment-873798808 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45140/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
AmplabJenkins removed a comment on pull request #33174: URL: https://github.com/apache/spark/pull/33174#issuecomment-873784871 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/140625/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
AmplabJenkins commented on pull request #33174: URL: https://github.com/apache/spark/pull/33174#issuecomment-873784871 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/140625/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
SparkQA removed a comment on pull request #33174: URL: https://github.com/apache/spark/pull/33174#issuecomment-873745414 **[Test build #140625 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140625/testReport)** for PR 33174 at commit [`5d1daed`](https://github.com/apache/spark/commit/5d1daed014e9abdcc170dbe2bf0c7a97c512f7de). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
AmplabJenkins removed a comment on pull request #33174: URL: https://github.com/apache/spark/pull/33174#issuecomment-873769422 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45138/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
SparkQA commented on pull request #33174: URL: https://github.com/apache/spark/pull/33174#issuecomment-873784467 **[Test build #140625 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140625/testReport)** for PR 33174 at commit [`5d1daed`](https://github.com/apache/spark/commit/5d1daed014e9abdcc170dbe2bf0c7a97c512f7de). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `for _, _class in inspect.getmembers(module, inspect.isclass):` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33207: [SPARK-33996][BUILD] Match SBT's plugin checkstyle version to Maven's
SparkQA commented on pull request #33207: URL: https://github.com/apache/spark/pull/33207#issuecomment-873783924 **[Test build #140629 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140629/testReport)** for PR 33207 at commit [`055b472`](https://github.com/apache/spark/commit/055b472398b5fbdbc5d34dedf2f4a08019e38e37). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33208: [SPARK-36010][BUILD] Upgrade sbt-antlr4 from 0.8.2 to 0.8.3
SparkQA commented on pull request #33208: URL: https://github.com/apache/spark/pull/33208#issuecomment-873783903 **[Test build #140628 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140628/testReport)** for PR 33208 at commit [`d816ee0`](https://github.com/apache/spark/commit/d816ee0b4d909ade8ae25c2c7e1d098df5d65bfe). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #33206: [SPARK-36002][PYTHON] Consolidate tests for data-type-based operations of decimal Series
AmplabJenkins removed a comment on pull request #33206: URL: https://github.com/apache/spark/pull/33206#issuecomment-873783687 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/140627/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #33206: [SPARK-36002][PYTHON] Consolidate tests for data-type-based operations of decimal Series
AmplabJenkins commented on pull request #33206: URL: https://github.com/apache/spark/pull/33206#issuecomment-873783687 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/140627/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon opened a new pull request #33208: [SPARK-36010][BUILD] Upgrade sbt-antlr4 from 0.8.2 to 0.8.3
HyukjinKwon opened a new pull request #33208: URL: https://github.com/apache/spark/pull/33208 ### What changes were proposed in this pull request? This PR proposes to upgrade sbt-antlr4 from 0.8.2 to 0.8.3 per the guides at https://github.com/ihji/sbt-antlr4 I can't find an official proper docs for this. ### Why are the changes needed? To stick to the guides in https://github.com/ihji/sbt-antlr4, and leverage the fixes included. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should tests it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerqi commented on pull request #33204: [SPARK-36011][SQL] Disallow altering permanent views based on temporary views or UDFs
jerqi commented on pull request #33204: URL: https://github.com/apache/spark/pull/33204#issuecomment-873782250 > @jerqi can you create a new JIRA instead of reusing [SPARK-18217](https://issues.apache.org/jira/browse/SPARK-18217)? The fixed versions would be different at least. OK, It's done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #33113: [SPARK-34302][SQL] Migrate ALTER TABLE ... CHANGE COLUMN command to use UnresolvedTable to resolve the identifier
cloud-fan commented on a change in pull request #33113: URL: https://github.com/apache/spark/pull/33113#discussion_r663634209 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala ## @@ -1088,8 +1058,51 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { } alter match { - case AlterTableRenameColumn(table: ResolvedTable, ResolvedFieldName(name), newName) => -checkColumnNotExists(name.init :+ newName, table.schema) + case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) => +checkColumnNotExists(col.path :+ newName, table.schema) + case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) => +val fieldName = col.name.quoted +if (a.dataType.isDefined) { + val field = CharVarcharUtils.getRawType(col.field.metadata) +.map(dt => col.field.copy(dataType = dt)) +.getOrElse(col.field) + val newDataType = a.dataType.get + newDataType match { Review comment: Not related to this PR: I think there is a small bug here, if the data type is not changed, we shouldn't fail even if the new data type is struct/array/map. @imback82 can you help to fix it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #33207: [SPARK-33996][BUILD] Match SBT's plugin checkstyle version to Maven's
HyukjinKwon commented on pull request #33207: URL: https://github.com/apache/spark/pull/33207#issuecomment-873780373 cc @williamhyun can you review this please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon opened a new pull request #33207: [SPARK-33996][BUILD] Match SBT's plugin checkstyle version to Maven's
HyukjinKwon opened a new pull request #33207: URL: https://github.com/apache/spark/pull/33207 ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/31019 that forgot to update SBT's to match. ### Why are the changes needed? To use the same version in both Maven and SBT. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI should test them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33206: [SPARK-36002][PYTHON] Consolidate tests for data-type-based operations of decimal Series
SparkQA commented on pull request #33206: URL: https://github.com/apache/spark/pull/33206#issuecomment-873779084 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45140/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #33186: [SPARK-35987][SQL] The ANSI flags of Sum and Avg should be kept after being copied
cloud-fan closed pull request #33186: URL: https://github.com/apache/spark/pull/33186 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #33186: [SPARK-35987][SQL] The ANSI flags of Sum and Avg should be kept after being copied
cloud-fan commented on pull request #33186: URL: https://github.com/apache/spark/pull/33186#issuecomment-873778416 thanks, merging to master/3.2! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #33186: [SPARK-35987][SQL] The ANSI flags of Sum and Avg should be kept after being copied
cloud-fan commented on a change in pull request #33186: URL: https://github.com/apache/spark/pull/33186#discussion_r663631415 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -151,9 +157,12 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast override lazy val evaluateExpression: Expression = resultType match { case d: DecimalType => If(isEmpty, Literal.create(null, resultType), -CheckOverflowInSum(sum, d, !SQLConf.get.ansiEnabled)) +CheckOverflowInSum(sum, d, !failOnError)) case _ => sum } override protected def withNewChildInternal(newChild: Expression): Sum = copy(child = newChild) + + // The flag `failOnError` won't be shown in the `toString` or `toAggString` methods Review comment: The basic math operations (Add, Subtract, etc.) just print something like `a + b` without showing the ANSI flag. This makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #33206: [SPARK-36002][PYTHON] Consolidate tests for data-type-based operations of decimal Series
SparkQA removed a comment on pull request #33206: URL: https://github.com/apache/spark/pull/33206#issuecomment-873763852 **[Test build #140627 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140627/testReport)** for PR 33206 at commit [`a37dede`](https://github.com/apache/spark/commit/a37deded20e21989f390b82a31b1953d3bc35bc5). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33206: [SPARK-36002][PYTHON] Consolidate tests for data-type-based operations of decimal Series
SparkQA commented on pull request #33206: URL: https://github.com/apache/spark/pull/33206#issuecomment-873773583 **[Test build #140627 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140627/testReport)** for PR 33206 at commit [`a37dede`](https://github.com/apache/spark/commit/a37deded20e21989f390b82a31b1953d3bc35bc5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
SparkQA commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873769611 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45137/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
AmplabJenkins commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873769630 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45137/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
SparkQA commented on pull request #33174: URL: https://github.com/apache/spark/pull/33174#issuecomment-873769395 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45138/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
AmplabJenkins commented on pull request #33174: URL: https://github.com/apache/spark/pull/33174#issuecomment-873769422 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45138/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32959: [SPARK-35780][SQL] Support DATE/TIMESTAMP literals across the full range
AmplabJenkins commented on pull request #32959: URL: https://github.com/apache/spark/pull/32959#issuecomment-873769162 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45139/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32959: [SPARK-35780][SQL] Support DATE/TIMESTAMP literals across the full range
SparkQA commented on pull request #32959: URL: https://github.com/apache/spark/pull/32959#issuecomment-873769148 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45139/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33206: [SPARK-36002][PYTHON] Consolidate tests for data-type-based operations of decimal Series
SparkQA commented on pull request #33206: URL: https://github.com/apache/spark/pull/33206#issuecomment-873763852 **[Test build #140627 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140627/testReport)** for PR 33206 at commit [`a37dede`](https://github.com/apache/spark/commit/a37deded20e21989f390b82a31b1953d3bc35bc5). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
AmplabJenkins removed a comment on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873763579 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45135/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
AmplabJenkins removed a comment on pull request #33078: URL: https://github.com/apache/spark/pull/33078#issuecomment-873763580 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
AmplabJenkins commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873763579 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45135/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a
AmplabJenkins commented on pull request #33078: URL: https://github.com/apache/spark/pull/33078#issuecomment-873763581 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in
SparkQA removed a comment on pull request #33078: URL: https://github.com/apache/spark/pull/33078#issuecomment-873725122 **[Test build #140623 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140623/testReport)** for PR 33078 at commit [`83eb3e8`](https://github.com/apache/spark/commit/83eb3e86c21f8ce76a9b9eb1deb02e4c925cf512). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better
SparkQA commented on pull request #33078: URL: https://github.com/apache/spark/pull/33078#issuecomment-873763026 **[Test build #140623 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140623/testReport)** for PR 33078 at commit [`83eb3e8`](https://github.com/apache/spark/commit/83eb3e86c21f8ce76a9b9eb1deb02e4c925cf512). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dgd-contributor commented on pull request #32951: [SPARK-33603][SQL] Grouping execution/command
dgd-contributor commented on pull request #32951: URL: https://github.com/apache/spark/pull/32951#issuecomment-873760231 @beliefer maybe @allisonwang-db is a little bit busy, if there anyone else can review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
SparkQA commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873759530 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45137/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
SparkQA commented on pull request #33174: URL: https://github.com/apache/spark/pull/33174#issuecomment-873759443 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45138/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32959: [SPARK-35780][SQL] Support DATE/TIMESTAMP literals across the full range
SparkQA commented on pull request #32959: URL: https://github.com/apache/spark/pull/32959#issuecomment-873759338 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45139/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #33206: [SPARK-36002][PYTHON] Consolidate tests for data-type-based operations of decimal Series
HyukjinKwon commented on pull request #33206: URL: https://github.com/apache/spark/pull/33206#issuecomment-873753536 cc @xinrong-databricks @ueshin FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
SparkQA commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873751653 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45135/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun opened a new pull request #33206: [SPARK-36002][PYTHON] Consolidate tests for data-type-based operations of decimal Series
Yikun opened a new pull request #33206: URL: https://github.com/apache/spark/pull/33206 ### What changes were proposed in this pull request? Merge test_decimal_ops into test_num_ops - merge test_isnull() into test_num_ops.test_isnull() - remove test_datatype_ops(), which already covered in https://github.com/apache/spark/blob/11fcbc73cbcbb1bdf5ba5d90eba0aba1edebb15d/python/pyspark/pandas/tests/data_type_ops/test_base.py#L58-L59 ### Why are the changes needed? Tests for data-type-based operations of decimal Series are in two places: - python/pyspark/pandas/tests/data_type_ops/test_decimal_ops.py - python/pyspark/pandas/tests/data_type_ops/test_num_ops.py We'd better merge test_decimal_ops into test_num_ops. See also [SPARK-36002](https://issues.apache.org/jira/browse/SPARK-36002) . ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? unittests passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better
SparkQA commented on pull request #33078: URL: https://github.com/apache/spark/pull/33078#issuecomment-873751580 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45136/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #33187: [WIP][SPARK-35988][SS] The implementation for RocksDBStateStoreProvider
HeartSaVioR commented on a change in pull request #33187: URL: https://github.com/apache/spark/pull/33187#discussion_r663608311 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io._ + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.Platform +import org.apache.spark.util.Utils + +private[state] class RocksDBStateStoreProvider + extends StateStoreProvider with Logging with Closeable { + import RocksDBStateStoreProvider._ + + class RocksDBStateStore(val lastVersion: Long) extends StateStore { +/** Trait and classes representing the internal state of the store */ +trait STATE +case object UPDATING extends STATE +case object COMMITTED extends STATE +case object ABORTED extends STATE + +@volatile private var state: STATE = UPDATING +@volatile private var isValidated = false + +override def id: StateStoreId = RocksDBStateStoreProvider.this.stateStoreId + +override def version: Long = lastVersion + +override def get(key: UnsafeRow): UnsafeRow = { + verify(key != null, "Key cannot be null") + val value = encoder.decodeValue(rocksDB.get(encoder.encode(key))) + if (!isValidated && value != null) { +StateStoreProvider.validateStateRowFormat( + key, keySchema, value, valueSchema, storeConf) +isValidated = true + } + value +} + +override def put(key: UnsafeRow, value: UnsafeRow): Unit = { + verify(state == UPDATING, "Cannot put after already committed or aborted") + verify(key != null, "Key cannot be null") + verify(value != null, "Value cannot be null") + logDebug(s"Storing $key => $value") + rocksDB.put(encoder.encode(key), encoder.encode(value)) +} + +override def remove(key: UnsafeRow): Unit = { + verify(state == UPDATING, "Cannot remove after already committed or aborted") + verify(key != null, "Key cannot be null") + rocksDB.remove(encoder.encode(key)) +} + +override def getRange( +start: Option[UnsafeRow], +end: Option[UnsafeRow]): Iterator[UnsafeRowPair] = { + verify(state == UPDATING, "Cannot call getRange() after already committed or aborted") + iterator() +} + +override def iterator(): Iterator[UnsafeRowPair] = { + rocksDB.iterator().map { kv => +val rowPair = encoder.decode(kv) +if (!isValidated && rowPair.value != null) { + StateStoreProvider.validateStateRowFormat( +rowPair.key, keySchema, rowPair.value, valueSchema, storeConf) + isValidated = true +} +rowPair + } +} + +override def commit(): Long = synchronized { + verify(state == UPDATING, "Cannot commit after already committed or aborted") + val newVersion = rocksDB.commit() + state = COMMITTED + logInfo(s"Committed $newVersion for $id") + newVersion +} + +override def abort(): Unit = { + verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + logInfo(s"Aborting ${version + 1} for $id") + rocksDB.rollback() + state = ABORTED +} + +override def metrics: StateStoreMetrics = { + val rocksDBMetrics = rocksDB.metrics + def commitLatencyMs(typ: String): Long = rocksDBMetrics.lastCommitLatencyMs.getOrElse(typ, 0L) + def avgNativeOpsLatencyMs(typ: String): Long = { + rocksDBMetrics.nativeOpsLatencyMicros.get(typ).map(_.avg).getOrElse(0.0).toLong + } + + val stateStoreCustomMetrics = Map[StateStoreCustomMetric, Long]( +CUSTOM_METRIC_SST_FILE_SIZE -> rocksDBMetrics.totalSSTFilesBytes, +CUSTOM_METRIC_GET_TIME -> avgNativeOpsLatencyMs("get"), +CUSTOM_METRIC_PUT_TIME -> avgNativeOpsLatencyMs("put"), +CUSTOM_METRIC_WRITEBATCH_TIME ->
[GitHub] [spark] SparkQA commented on pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
SparkQA commented on pull request #33174: URL: https://github.com/apache/spark/pull/33174#issuecomment-873745414 **[Test build #140625 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140625/testReport)** for PR 33174 at commit [`5d1daed`](https://github.com/apache/spark/commit/5d1daed014e9abdcc170dbe2bf0c7a97c512f7de). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32959: [SPARK-35780][SQL] Support DATE/TIMESTAMP literals across the full range
SparkQA commented on pull request #32959: URL: https://github.com/apache/spark/pull/32959#issuecomment-873745442 **[Test build #140626 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140626/testReport)** for PR 32959 at commit [`e9558cb`](https://github.com/apache/spark/commit/e9558cb0f7ba921064d43efcbce2fe8d6095d16f). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
SparkQA commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873745387 **[Test build #140624 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140624/testReport)** for PR 33188 at commit [`7b668bb`](https://github.com/apache/spark/commit/7b668bbd9cef4ca2f0cfa84d2e16870464db4f1b). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] linhongliu-db commented on a change in pull request #32959: [SPARK-35780][SQL] Support DATE/TIMESTAMP literals across the full range
linhongliu-db commented on a change in pull request #32959: URL: https://github.com/apache/spark/pull/32959#discussion_r663606917 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala ## @@ -224,12 +224,12 @@ object DateTimeUtils { * value. The return type is [[Option]] in order to distinguish between 0L and null. The following * formats are allowed: * - * `` - * `-[m]m` - * `-[m]m-[d]d` - * `-[m]m-[d]d ` - * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` - * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` + * `[+-]y*` + * `[+-]y*-[m]m` + * `[+-]y*-[m]m-[d]d` + * `[+-]y*-[m]m-[d]d ` + * `[+-]y*-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` + * `[+-]y*-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` Review comment: actually the we will respect the `-`, for example: ``` scala> sql("select date'-12-12-12'").show() +--+ |DATE '-0012-12-12'| +--+ | -0012-12-12| +--+ scala> sql("select date'+12-12-12'").show() +-+ |DATE '0012-12-12'| +-+ | 0012-12-12| +-+ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a change in pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
Yikun commented on a change in pull request #33174: URL: https://github.com/apache/spark/pull/33174#discussion_r663592689 ## File path: python/run-tests.py ## @@ -40,6 +44,111 @@ from sparktestsupport.shellutils import which, subprocess_check_output # noqa from sparktestsupport.modules import all_modules, pyspark_sql # noqa +# Make sure logging config before any possible logging print +logging.basicConfig(stream=sys.stdout, format="%(message)s") +LOGGER = logging.getLogger() + + +def _contain_unittests_class(module_name, slow=False): +""" +Check if the module with specific module has classes are derived from unittest.TestCase. Review comment: will reword as "Check if the module contain unittest class or not." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunpe commented on a change in pull request #33154: [SPARK-35949][CORE]Add `is-server` arg for to prevent closing spark context when starting as a server.
sunpe commented on a change in pull request #33154: URL: https://github.com/apache/spark/pull/33154#discussion_r663604636 ## File path: python/pyspark/pandas/tests/test_stats.py ## @@ -283,7 +283,7 @@ def test_cov_corr_meta(self): index=pd.Index([1, 2, 3], name="myindex"), ) psdf = ps.from_pandas(pdf) -self.assert_eq(psdf.corr(), pdf.corr()) +self.assert_eq(psdf.corr(), pdf.corr(), check_exact=False) Review comment: Thanks for replay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better
SparkQA commented on pull request #33078: URL: https://github.com/apache/spark/pull/33078#issuecomment-873739858 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45136/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
SparkQA commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873739660 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45135/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
ulysses-you commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873735573 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
venkata91 commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663595592 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -2225,4 +2225,14 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val APP_ATTEMPT_ID = +ConfigBuilder("spark.app.attempt.id") Review comment: nit: This feels like it should be `spark.app.attemptNumber` than `spark.app.attempt.id`. Also currently this is specific to push based shuffle right? Should we add that in the documentation? ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ## @@ -210,76 +236,54 @@ public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceI } } - /** - * The logic here is consistent with - * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile( - * org.apache.spark.storage.BlockId, scala.Option)]] - */ - private File getFile(String appId, String filename) { -// TODO: [SPARK-33236] Change the message when this service is able to handle NM restart -AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); -File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, - appPathsInfo.subDirsPerLocalDir, filename); -logger.debug("Get merged file {}", targetFile.getAbsolutePath()); -return targetFile; - } - - private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) { -String fileName = String.format("%s.data", generateFileName(appShuffleId, reduceId)); -return getFile(appShuffleId.appId, fileName); - } - - private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) { -String indexName = String.format("%s.index", generateFileName(appShuffleId, reduceId)); -return getFile(appShuffleId.appId, indexName); - } - - private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) { -String metaName = String.format("%s.meta", generateFileName(appShuffleId, reduceId)); -return getFile(appShuffleId.appId, metaName); - } - @Override public String[] getMergedBlockDirs(String appId) { -AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); -String[] activeLocalDirs = Preconditions.checkNotNull(appPathsInfo.activeLocalDirs, - "application " + appId - + " active local dirs list has not been updated by any executor registration"); +AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); +String[] activeLocalDirs = + Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs, +"application " + appId + " active local dirs list has not been updated " + +"by any executor registration"); return activeLocalDirs; } @Override public void applicationRemoved(String appId, boolean cleanupLocalDirs) { logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); -// TODO: [SPARK-33236] Change the message when this service is able to handle NM restart -AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.remove(appId), - "application " + appId + " is not registered or NM was restarted."); -Iterator>> iterator = - partitions.entrySet().iterator(); -while (iterator.hasNext()) { - Map.Entry> entry = iterator.next(); - AppShuffleId appShuffleId = entry.getKey(); - if (appId.equals(appShuffleId.appId)) { -iterator.remove(); -for (AppShufflePartitionInfo partitionInfo : entry.getValue().values()) { +AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId); +if (null != appShuffleInfo) { + mergedShuffleCleanerExecutor.execute( +() -> cleanupMergedShuffle(appShuffleInfo, cleanupLocalDirs)); +} + } + + + /** + * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo. + * If cleanupLocalDirs is true, the merged shuffle files will also be deleted. + * The cleanup will be executed in a separate thread. + */ + private void cleanupMergedShuffle( Review comment: nit: `closeAndDeletePartitionFilesIfNeeded`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
[GitHub] [spark] SparkQA removed a comment on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
SparkQA removed a comment on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873725056 **[Test build #140622 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140622/testReport)** for PR 33188 at commit [`7b668bb`](https://github.com/apache/spark/commit/7b668bbd9cef4ca2f0cfa84d2e16870464db4f1b). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
AmplabJenkins removed a comment on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873726735 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/140622/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
AmplabJenkins commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873726735 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/140622/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
SparkQA commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873726704 **[Test build #140622 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140622/testReport)** for PR 33188 at commit [`7b668bb`](https://github.com/apache/spark/commit/7b668bbd9cef4ca2f0cfa84d2e16870464db4f1b). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better
SparkQA commented on pull request #33078: URL: https://github.com/apache/spark/pull/33078#issuecomment-873725122 **[Test build #140623 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140623/testReport)** for PR 33078 at commit [`83eb3e8`](https://github.com/apache/spark/commit/83eb3e86c21f8ce76a9b9eb1deb02e4c925cf512). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
SparkQA commented on pull request #33188: URL: https://github.com/apache/spark/pull/33188#issuecomment-873725056 **[Test build #140622 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/140622/testReport)** for PR 33188 at commit [`7b668bb`](https://github.com/apache/spark/commit/7b668bbd9cef4ca2f0cfa84d2e16870464db4f1b). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663593359 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ## @@ -403,38 +394,78 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { -logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId, - msg.shuffleId, partition.reduceId, ioe); +logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, + msg.attemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFiles(); -// The partition should be removed after the files are written so that any new stream -// for the same reduce partition will see that the data file exists. -partitionsIter.remove(); } } } mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } -partitions.remove(appShuffleId); -logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); +logger.info("Finalized shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.attemptId); return mergeStatuses; } @Override public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { if (logger.isDebugEnabled()) { logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} " -+ "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs), - executorInfo.subDirsPerLocalDir); ++ "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs), +executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager); +} +String shuffleManagerMeta = executorInfo.shuffleManager; +if (shuffleManagerMeta.contains(":")) { + String mergeDirInfo = shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1); + try { +ObjectMapper mapper = new ObjectMapper(); +MergeDirectoryMeta mergeDirectoryMeta = + mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class); +if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) { + // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. + // Only the first ExecutorRegister message can register the merge dirs + appsShuffleInfo.computeIfAbsent(appId, id -> +new AppShuffleInfo( + appId, mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, +mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir) +)); +} else { + // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. + // The first ExecutorRegister message from the same application attempt wil register + // the merge dirs in External Shuffle Service. Any later ExecutorRegister message + // from the same application attempt will not override the merge dirs. But it can + // be overridden by ExecutorRegister message from newer application attempt, + // and former attempts' shuffle partitions information will also be cleaned up. + ConcurrentMap appShuffleInfoToBeCleanedUp = +Maps.newConcurrentMap(); + appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> { +if (appShuffleInfo == null || (appShuffleInfo != null + && mergeDirectoryMeta.attemptId > appShuffleInfo.attemptId)) { + appShuffleInfoToBeCleanedUp.putIfAbsent(appShuffleInfo.attemptId, appShuffleInfo); + appShuffleInfo = +new AppShuffleInfo( + appId, mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, +mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)); +} +return appShuffleInfo; + }); + for (AppShuffleInfo appShuffleInfo: appShuffleInfoToBeCleanedUp.values()) { +logger.info("Remove shuffle info for {}_{} as new application attempt registered", + appId, appShuffleInfo.attemptId); +appShuffleInfo.cleanupShufflePartitionInfo(); Review comment: Updated to cleanup async. ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ## @@ -403,38 +394,78 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
[GitHub] [spark] Yikun commented on a change in pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
Yikun commented on a change in pull request #33174: URL: https://github.com/apache/spark/pull/33174#discussion_r663592689 ## File path: python/run-tests.py ## @@ -40,6 +44,111 @@ from sparktestsupport.shellutils import which, subprocess_check_output # noqa from sparktestsupport.modules import all_modules, pyspark_sql # noqa +# Make sure logging config before any possible logging print +logging.basicConfig(stream=sys.stdout, format="%(message)s") +LOGGER = logging.getLogger() + + +def _contain_unittests_class(module_name, slow=False): +""" +Check if the module with specific module has classes are derived from unittest.TestCase. Review comment: will reword as "Check if the module contain unittest.TestCase based class or not." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a change in pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
Yikun commented on a change in pull request #33174: URL: https://github.com/apache/spark/pull/33174#discussion_r663592493 ## File path: python/run-tests.py ## @@ -40,6 +44,111 @@ from sparktestsupport.shellutils import which, subprocess_check_output # noqa from sparktestsupport.modules import all_modules, pyspark_sql # noqa +# Make sure logging config before any possible logging print +logging.basicConfig(stream=sys.stdout, format="%(message)s") +LOGGER = logging.getLogger() + + +def _contain_unittests_class(module_name, slow=False): +""" +Check if the module with specific module has classes are derived from unittest.TestCase. +Such as: +pyspark.tests.test_appsubmit, it will return True, because there is SparkSubmitTests which is +included under the module of pyspark.tests.test_appsubmit, inherits from unittest.TestCase. + +Parameters +-- +module_name : str +The module name to be check +slow : bool +Return True if module contains unittests and is_slow_test is marked as True. + +Returns +--- +True if contains unittest classes otherwise False. An ``ModuleNotFoundError`` will raise if the +module is not found. + +>>> _contain_unittests_class("pyspark.tests.test_appsubmit") +True +>>> _contain_unittests_class("pyspark.conf") +False +>>> _contain_unittests_class("pyspark.pandas.tests.test_dataframe", slow=True) +True +>>> _contain_unittests_class("pyspark.pandas.tests.test_dataframe") +False +""" +module = import_module(module_name) +for _, _class in inspect.getmembers(module, inspect.isclass): +if issubclass(_class, unittest.TestCase): +if slow and hasattr(module, 'is_slow_test'): +return True +if not slow and not hasattr(module, 'is_slow_test'): +return True +return False + + +def _discover_python_unittests(paths): +"""Discover the python module which contains unittests under paths. + +Such as: +['pyspark/tests'], it will return the set of module name under the path of pyspark/tests, like +{'pyspark.tests.test_appsubmit', 'pyspark.tests.test_broadcast', ...} + +Parameters +-- +paths : list +Paths of modules to be discovered. + +Returns +--- +A set of complete test module name discovered under specified paths + +>>> sorted([x for x in _discover_python_unittests(['pyspark/tests'])]) +... # doctest: +NORMALIZE_WHITESPACE +['pyspark.tests.test_appsubmit', 'pyspark.tests.test_broadcast', 'pyspark.tests.test_conf', +'pyspark.tests.test_context', 'pyspark.tests.test_daemon', 'pyspark.tests.test_install_spark', +'pyspark.tests.test_join', 'pyspark.tests.test_pin_thread', 'pyspark.tests.test_profiler', +'pyspark.tests.test_rdd', 'pyspark.tests.test_rddbarrier', 'pyspark.tests.test_readwrite', +'pyspark.tests.test_serializers', 'pyspark.tests.test_shuffle', +'pyspark.tests.test_taskcontext', 'pyspark.tests.test_util', 'pyspark.tests.test_worker'] +>>> sorted([x for x in _discover_python_unittests([("pyspark/pandas/tests", "slow")])]) +... # doctest: +NORMALIZE_WHITESPACE +['pyspark.pandas.tests.indexes.test_base', 'pyspark.pandas.tests.indexes.test_datetime', +'pyspark.pandas.tests.test_dataframe', 'pyspark.pandas.tests.test_groupby', +'pyspark.pandas.tests.test_indexing', 'pyspark.pandas.tests.test_ops_on_diff_frames', +'pyspark.pandas.tests.test_ops_on_diff_frames_groupby', 'pyspark.pandas.tests.test_series', +'pyspark.pandas.tests.test_stats'] +>>> sorted([x for x in _discover_python_unittests([('pyspark/tests', 'slow')])]) +[] +""" +if not paths: +return [] +modules = set() +pyspark_path = os.path.join(SPARK_HOME, "python") +for path in paths: +slow_only = False +if isinstance(path, tuple) and len(path) == 2 and path[1] == "slow": Review comment: Sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a change in pull request #33174: [SPARK-35721][PYTHON] Path level discover for python unittests
Yikun commented on a change in pull request #33174: URL: https://github.com/apache/spark/pull/33174#discussion_r663592408 ## File path: python/run-tests.py ## @@ -40,6 +44,111 @@ from sparktestsupport.shellutils import which, subprocess_check_output # noqa from sparktestsupport.modules import all_modules, pyspark_sql # noqa +# Make sure logging config before any possible logging print +logging.basicConfig(stream=sys.stdout, format="%(message)s") +LOGGER = logging.getLogger() + + +def _contain_unittests_class(module_name, slow=False): +""" +Check if the module with specific module has classes are derived from unittest.TestCase. +Such as: +pyspark.tests.test_appsubmit, it will return True, because there is SparkSubmitTests which is +included under the module of pyspark.tests.test_appsubmit, inherits from unittest.TestCase. + +Parameters +-- +module_name : str +The module name to be check +slow : bool +Return True if module contains unittests and is_slow_test is marked as True. + +Returns +--- +True if contains unittest classes otherwise False. An ``ModuleNotFoundError`` will raise if the +module is not found. + +>>> _contain_unittests_class("pyspark.tests.test_appsubmit") +True +>>> _contain_unittests_class("pyspark.conf") +False +>>> _contain_unittests_class("pyspark.pandas.tests.test_dataframe", slow=True) +True +>>> _contain_unittests_class("pyspark.pandas.tests.test_dataframe") +False +""" +module = import_module(module_name) +for _, _class in inspect.getmembers(module, inspect.isclass): +if issubclass(_class, unittest.TestCase): +if slow and hasattr(module, 'is_slow_test'): +return True +if not slow and not hasattr(module, 'is_slow_test'): +return True +return False + + +def _discover_python_unittests(paths): +"""Discover the python module which contains unittests under paths. + +Such as: +['pyspark/tests'], it will return the set of module name under the path of pyspark/tests, like +{'pyspark.tests.test_appsubmit', 'pyspark.tests.test_broadcast', ...} + +Parameters +-- +paths : list +Paths of modules to be discovered. + +Returns +--- +A set of complete test module name discovered under specified paths + +>>> sorted([x for x in _discover_python_unittests(['pyspark/tests'])]) Review comment: Yes, this doctest is to make sure the _discover_python_unittests work in real env. Such as if we forgot to add `pyspark.pandas.tests.test_series`, the CI would be failed due to some error like: ```Python ** File "./python/run-tests.py", line 116, in __main__._discover_python_unittests Failed example: sorted([x for x in _discover_python_unittests([("pyspark/pandas/tests", "slow")])]) # doctest: +NORMALIZE_WHITESPACE Expected: ['pyspark.pandas.tests.indexes.test_base', 'pyspark.pandas.tests.indexes.test_datetime', 'pyspark.pandas.tests.test_dataframe', 'pyspark.pandas.tests.test_groupby', 'pyspark.pandas.tests.test_indexing', 'pyspark.pandas.tests.test_ops_on_diff_frames', 'pyspark.pandas.tests.test_ops_on_diff_frames_groupby', 'pyspark.pandas.tests.test_series'] Got: ['pyspark.pandas.tests.indexes.test_base', 'pyspark.pandas.tests.indexes.test_datetime', 'pyspark.pandas.tests.test_dataframe', 'pyspark.pandas.tests.test_groupby', 'pyspark.pandas.tests.test_indexing', 'pyspark.pandas.tests.test_ops_on_diff_frames', 'pyspark.pandas.tests.test_ops_on_diff_frames_groupby', 'pyspark.pandas.tests.test_series', 'pyspark.pandas.tests.test_series'] ** ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #32934: [SPARK-35788][SS] Metrics support for RocksDB instance
HeartSaVioR commented on a change in pull request #32934: URL: https://github.com/apache/spark/pull/32934#discussion_r663587422 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala ## @@ -134,6 +135,22 @@ class RocksDBFileManager( override def accept(path: Path): Boolean = path.toString.endsWith(".zip") } + /** + * Metrics for loading checkpoint from DFS. Every loadCheckpointFromDFS call will update this + * metrics, so this effectively records the latest metrics. + */ + @volatile private var loadCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS + + /** + * Metrics for saving checkpoint to DFS. Every saveCheckpointToDFS call will update this + * metrics, so this effectively records the latest metrics. + */ + @volatile private var saveCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS + + def latestloadCheckpointMetrics: RocksDBFileManagerMetrics = loadCheckpointMetrics Review comment: nit: latest`L`oadCheckpointMetrics ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala ## @@ -442,6 +476,42 @@ object RocksDBConf { def apply(): RocksDBConf = apply(new StateStoreConf()) } +/** Class to represent stats from each commit. */ +case class RocksDBMetrics( + numCommittedKeys: Long, + numUncommittedKeys: Long, + memUsageBytes: Long, + totalSSTFilesBytes: Long, + nativeOpsLatencyMicros: Map[String, RocksDBNativeHistogram], + lastCommitLatencyMs: Map[String, Long], + filesCopied: Long, + bytesCopied: Long, + filesReused: Long, + zipFileBytesUncompressed: Option[Long]) { + def json: String = Serialization.write(this)(RocksDBMetrics.format) +} + +object RocksDBMetrics { + val format = Serialization.formats(NoTypeHints) +} + +/** Class to wrap RocksDB's native histogram */ +case class RocksDBNativeHistogram( + avg: Double, stddev: Double, median: Double, p95: Double, p99: Double) { Review comment: nit: indent ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala ## @@ -486,6 +516,23 @@ class RocksDBFileManager( } } +/** + * Metrics regarding RocksDB file sync between local and DFS. + */ +case class RocksDBFileManagerMetrics( + filesCopied: Long, Review comment: nit: indent ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala ## @@ -442,6 +476,42 @@ object RocksDBConf { def apply(): RocksDBConf = apply(new StateStoreConf()) } +/** Class to represent stats from each commit. */ +case class RocksDBMetrics( + numCommittedKeys: Long, Review comment: nit: indent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663589883 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ## @@ -417,24 +476,75 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } -partitions.remove(appShuffleId); -logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); +appShuffleInfo.partitions.remove(msg.shuffleId); +logger.info("Finalized shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.attemptId); return mergeStatuses; } @Override public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { if (logger.isDebugEnabled()) { logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} " -+ "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs), - executorInfo.subDirsPerLocalDir); ++ "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs), +executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager); +} +String shuffleManagerMeta = executorInfo.shuffleManager; +if (shuffleManagerMeta.contains(":")) { + String mergeDirInfo = shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1); + try { +ObjectMapper mapper = new ObjectMapper(); +MergeDirectoryMeta mergeDirectoryMeta = + mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class); +if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) { + // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. + // Only the first ExecutorRegister message can register the merge dirs + appsShuffleInfo.computeIfAbsent(appId, id -> +new AppShuffleInfo( + mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, +mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir) +)); +} else { + // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. + // The first ExecutorRegister message from the same application attempt wil register + // the merge dirs in External Shuffle Service. Any later ExecutorRegister message + // from the same application attempt will not override the merge dirs. But it can + // be overridden by ExecutorRegister message from newer application attempt, + // and former attempts' shuffle partitions information will also be cleaned up. + AtomicReference originalAppShuffleInfo = new AtomicReference<>(); + appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> { +if (appShuffleInfo == null || (appShuffleInfo != null + && mergeDirectoryMeta.attemptId > appShuffleInfo.attemptId)) { + originalAppShuffleInfo.set(appShuffleInfo); + appShuffleInfo = +new AppShuffleInfo( + mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, +mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)); +} +return appShuffleInfo; + }); + if (originalAppShuffleInfo.get() != null) { +AppShuffleInfo appShuffleInfo = originalAppShuffleInfo.get(); +logger.warn("Remove shuffle info for {}_{} as new application attempt registered", + appId, appShuffleInfo.attemptId); +cleanupShufflePartitionInfo(appShuffleInfo); Review comment: Updated to cleanup in a async way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663589670 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ## @@ -403,38 +394,78 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { -logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId, - msg.shuffleId, partition.reduceId, ioe); +logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, + msg.attemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFiles(); -// The partition should be removed after the files are written so that any new stream -// for the same reduce partition will see that the data file exists. -partitionsIter.remove(); } } } mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } -partitions.remove(appShuffleId); -logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); +logger.info("Finalized shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.attemptId); return mergeStatuses; } @Override public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { if (logger.isDebugEnabled()) { logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} " -+ "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs), - executorInfo.subDirsPerLocalDir); ++ "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs), +executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager); +} +String shuffleManagerMeta = executorInfo.shuffleManager; +if (shuffleManagerMeta.contains(":")) { + String mergeDirInfo = shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1); + try { +ObjectMapper mapper = new ObjectMapper(); +MergeDirectoryMeta mergeDirectoryMeta = + mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class); Review comment: Updated this part to encode/decode as a Map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663589606 ## File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ## @@ -530,10 +530,17 @@ private[spark] class BlockManager( private def registerWithExternalShuffleServer(): Unit = { logInfo("Registering executor with local external shuffle service.") +val shuffleManagerMeta = + if (Utils.isPushBasedShuffleEnabled(conf)) { +s"${shuffleManager.getClass.getName}:" + + s"${diskBlockManager.getMergeDirectoryAndAttemptIDJsonString()}}}" + } else { +shuffleManager.getClass.getName + } Review comment: Added UT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663589577 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ## @@ -567,7 +598,8 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { // memory, while still providing the necessary guarantee. synchronized (partitionInfo) { Map shufflePartitions = - mergeManager.partitions.get(partitionInfo.appShuffleId); + mergeManager.appsShuffleInfo.get(partitionInfo.appId).partitions Review comment: Added AppShuffleInfo as a final field in the AppShufflePartitionInfo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663589389 ## File path: core/src/main/scala/org/apache/spark/SparkContext.scala ## @@ -583,6 +583,7 @@ class SparkContext(config: SparkConf) extends Logging { _applicationId = _taskScheduler.applicationId() _applicationAttemptId = _taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) +_applicationAttemptId.foreach(attemptId => _conf.set("spark.app.attempt.id", attemptId)) Review comment: Updated with a conf in conf/package.scala -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
ulysses-you commented on a change in pull request #33188: URL: https://github.com/apache/spark/pull/33188#discussion_r663587709 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala ## @@ -133,4 +134,27 @@ class EnsureRequirementsSuite extends SharedSparkSession { }.size == 2) } } + + test("SPARK-35989: Do not remove REPARTITION_BY_NUM shuffle if AQE is enabled") { +import testImplicits._ +Seq(true, false).foreach { enableAqe => + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAqe.toString, +SQLConf.SHUFFLE_PARTITIONS.key -> "3", +SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { Review comment: I see the reason, make sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663582230 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ## @@ -403,38 +394,78 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { -logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId, - msg.shuffleId, partition.reduceId, ioe); +logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, + msg.attemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFiles(); -// The partition should be removed after the files are written so that any new stream -// for the same reduce partition will see that the data file exists. -partitionsIter.remove(); } } } mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } -partitions.remove(appShuffleId); -logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); +logger.info("Finalized shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.attemptId); return mergeStatuses; } @Override public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { if (logger.isDebugEnabled()) { logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} " -+ "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs), - executorInfo.subDirsPerLocalDir); ++ "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs), +executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager); +} +String shuffleManagerMeta = executorInfo.shuffleManager; +if (shuffleManagerMeta.contains(":")) { + String mergeDirInfo = shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1); + try { +ObjectMapper mapper = new ObjectMapper(); +MergeDirectoryMeta mergeDirectoryMeta = + mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class); +if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) { + // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. + // Only the first ExecutorRegister message can register the merge dirs + appsShuffleInfo.computeIfAbsent(appId, id -> +new AppShuffleInfo( + appId, mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, +mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir) +)); +} else { + // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. + // The first ExecutorRegister message from the same application attempt wil register + // the merge dirs in External Shuffle Service. Any later ExecutorRegister message + // from the same application attempt will not override the merge dirs. But it can + // be overridden by ExecutorRegister message from newer application attempt, + // and former attempts' shuffle partitions information will also be cleaned up. + ConcurrentMap appShuffleInfoToBeCleanedUp = +Maps.newConcurrentMap(); + appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> { +if (appShuffleInfo == null || (appShuffleInfo != null + && mergeDirectoryMeta.attemptId > appShuffleInfo.attemptId)) { + appShuffleInfoToBeCleanedUp.putIfAbsent(appShuffleInfo.attemptId, appShuffleInfo); + appShuffleInfo = +new AppShuffleInfo( + appId, mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, +mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)); +} +return appShuffleInfo; + }); + for (AppShuffleInfo appShuffleInfo: appShuffleInfoToBeCleanedUp.values()) { Review comment: I updated with ConcurrentHashMap anyway as the atomic and locking is required for our logic. Even though the current implementation of Maps.newConcurrentMap actually returns a ConcurrentHashMap, it is still better not to rely on third-party library to create the ConcurrentHashMap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32973: [SPARK-36009][GRAPHX] Add missing GraphX classes to registerKryoClasses util method
AmplabJenkins removed a comment on pull request #32973: URL: https://github.com/apache/spark/pull/32973#issuecomment-873703870 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45134/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #33205: [SPARK-20384][SQL] Support value class in nested schema for Dataset
AmplabJenkins commented on pull request #33205: URL: https://github.com/apache/spark/pull/33205#issuecomment-873704056 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32973: [SPARK-36009][GRAPHX] Add missing GraphX classes to registerKryoClasses util method
AmplabJenkins commented on pull request #32973: URL: https://github.com/apache/spark/pull/32973#issuecomment-873703870 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45134/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a change in pull request #33078: [SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the sta
zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663581955 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ## @@ -403,38 +394,78 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { -logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId, - msg.shuffleId, partition.reduceId, ioe); +logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, + msg.attemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFiles(); -// The partition should be removed after the files are written so that any new stream -// for the same reduce partition will see that the data file exists. -partitionsIter.remove(); } } } mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } -partitions.remove(appShuffleId); -logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); +logger.info("Finalized shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.attemptId); return mergeStatuses; } @Override public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { if (logger.isDebugEnabled()) { logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} " -+ "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs), - executorInfo.subDirsPerLocalDir); ++ "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs), +executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager); +} +String shuffleManagerMeta = executorInfo.shuffleManager; +if (shuffleManagerMeta.contains(":")) { + String mergeDirInfo = shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1); + try { +ObjectMapper mapper = new ObjectMapper(); +MergeDirectoryMeta mergeDirectoryMeta = + mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class); +if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) { + // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. + // Only the first ExecutorRegister message can register the merge dirs + appsShuffleInfo.computeIfAbsent(appId, id -> +new AppShuffleInfo( + appId, mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, +mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir) +)); +} else { + // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. + // The first ExecutorRegister message from the same application attempt wil register + // the merge dirs in External Shuffle Service. Any later ExecutorRegister message + // from the same application attempt will not override the merge dirs. But it can + // be overridden by ExecutorRegister message from newer application attempt, + // and former attempts' shuffle partitions information will also be cleaned up. + ConcurrentMap appShuffleInfoToBeCleanedUp = +Maps.newConcurrentMap(); + appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> { +if (appShuffleInfo == null || (appShuffleInfo != null + && mergeDirectoryMeta.attemptId > appShuffleInfo.attemptId)) { + appShuffleInfoToBeCleanedUp.putIfAbsent(appShuffleInfo.attemptId, appShuffleInfo); + appShuffleInfo = +new AppShuffleInfo( + appId, mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, +mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)); +} +return appShuffleInfo; + }); + for (AppShuffleInfo appShuffleInfo: appShuffleInfoToBeCleanedUp.values()) { Review comment: It is related to the description of the [compute](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentMap.html#compute-K-java.util.function.BiFunction-) method for ConcurrentMap. ConcurrentMap is an interface, and if we take a look at how the [compute](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#compute-K-java.util.function.BiFunction-) in ConcurrentSkipListMap is implemented, we can find that it is implemented with a for(;;)
[GitHub] [spark] HyukjinKwon commented on pull request #33204: [SPARK-18217][SQL][FOLLOWUP] Disallow altering permanent views based on temporary views or UDFs
HyukjinKwon commented on pull request #33204: URL: https://github.com/apache/spark/pull/33204#issuecomment-873703278 @jerqi can you create a new JIRA instead of reusing SPARK-18217? The fixed versions would be different at least. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #33186: [SPARK-35987][SQL] The ANSI flags of Sum and Avg should be kept after being copied
HyukjinKwon commented on a change in pull request #33186: URL: https://github.com/apache/spark/pull/33186#discussion_r663578972 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -151,9 +157,12 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast override lazy val evaluateExpression: Expression = resultType match { case d: DecimalType => If(isEmpty, Literal.create(null, resultType), -CheckOverflowInSum(sum, d, !SQLConf.get.ansiEnabled)) +CheckOverflowInSum(sum, d, !failOnError)) case _ => sum } override protected def withNewChildInternal(newChild: Expression): Sum = copy(child = newChild) + + // The flag `failOnError` won't be shown in the `toString` or `toAggString` methods Review comment: Just to make sure, are these same with other expressions? other places might have to be fixed (in a separate PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #32944: [SPARK-35794][SQL] Allow custom plugin for AQE cost evaluator
HyukjinKwon commented on a change in pull request #32944: URL: https://github.com/apache/spark/pull/32944#discussion_r663578635 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -678,6 +678,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS = +buildConf("spark.sql.adaptive.customCostEvaluatorClass") + .doc("The custom cost evaluator class to be used for adaptive execution. If not being set," + +" Spark will use its own SimpleCostEvaluator by default.") + .version("3.2.0") Review comment: the only think is that the version has to be 3.3.0 since we cut the branch now. Since this PR won't likely affect anything in the main code, I am okay with merging to 3.2.0 either tho. I will leave it to @cloud-fan and you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #32944: [SPARK-35794][SQL] Allow custom plugin for AQE cost evaluator
HyukjinKwon commented on a change in pull request #32944: URL: https://github.com/apache/spark/pull/32944#discussion_r663578635 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -678,6 +678,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS = +buildConf("spark.sql.adaptive.customCostEvaluatorClass") + .doc("The custom cost evaluator class to be used for adaptive execution. If not being set," + +" Spark will use its own SimpleCostEvaluator by default.") + .version("3.2.0") Review comment: the only think is that the version has to be 3.3.0 since we cut the branch now. Since this PR won't likely affect anything in the main code, I am okay with merging to 3.2.0 either tho. I will leave it to @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
HyukjinKwon commented on a change in pull request #33188: URL: https://github.com/apache/spark/pull/33188#discussion_r663577786 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala ## @@ -133,4 +134,27 @@ class EnsureRequirementsSuite extends SharedSparkSession { }.size == 2) } } + + test("SPARK-35989: Do not remove REPARTITION_BY_NUM shuffle if AQE is enabled") { +import testImplicits._ +Seq(true, false).foreach { enableAqe => + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAqe.toString, +SQLConf.SHUFFLE_PARTITIONS.key -> "3", +SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { Review comment: I usually follow method definition one here (https://github.com/databricks/scala-style-guide#spacing-and-indentation) because the parameters and body are inlined, and makes it a bit less readable. but im fine as is either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #33203: [SPARK-36007][INFRA] Failed to run benchmark in GA
HyukjinKwon commented on pull request #33203: URL: https://github.com/apache/spark/pull/33203#issuecomment-873693628 I merged to branch-3.2 too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #33203: [SPARK-36007][INFRA] Failed to run benchmark in GA
HyukjinKwon closed pull request #33203: URL: https://github.com/apache/spark/pull/33203 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #33203: [SPARK-36007][INFRA] Failed to run benchmark in GA
HyukjinKwon commented on pull request #33203: URL: https://github.com/apache/spark/pull/33203#issuecomment-873693412 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mickjermsurawong-stripe commented on a change in pull request #33205: [SPARK-20384][SQL] Support value class in nested schema for Dataset
mickjermsurawong-stripe commented on a change in pull request #33205: URL: https://github.com/apache/spark/pull/33205#discussion_r663576570 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala ## @@ -358,17 +417,17 @@ object ScalaReflection extends ScalaReflection { Invoke(obj, "deserialize", ObjectType(udt.userClass), path :: Nil) case t if definedByConstructorParams(t) => -val params = getConstructorParameters(t) - -val cls = getClassFromType(tpe) +val cls = ScalaReflection.getClassFromType(tpe) +val isTypeTuple = isTupleType(tpe) +val unwrappedParams = getConstructorUnwrappedParameters(t, isTypeTuple) -val arguments = params.zipWithIndex.map { case ((fieldName, fieldType), i) => +val arguments = unwrappedParams.zipWithIndex.map { case ((fieldName, fieldType), i) => Review comment: Deserializer expression ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala ## @@ -584,8 +643,8 @@ object ScalaReflection extends ScalaReflection { throw QueryExecutionErrors.cannotHaveCircularReferencesInClassError(t.toString) } -val params = getConstructorParameters(t) -val fields = params.map { case (fieldName, fieldType) => +val unwrappedParams = getConstructorUnwrappedParameters(t, tpe) +val fields = unwrappedParams.map { case (fieldName, fieldType) => Review comment: Serializer expression ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala ## @@ -788,9 +847,9 @@ object ScalaReflection extends ScalaReflection { case t if isSubtype(t, definitions.ByteTpe) => Schema(ByteType, nullable = false) case t if isSubtype(t, definitions.BooleanTpe) => Schema(BooleanType, nullable = false) case t if definedByConstructorParams(t) => -val params = getConstructorParameters(t) +val unwrappedParams = getConstructorUnwrappedParameters(t, tpe) Schema(StructType( - params.map { case (fieldName, fieldType) => + unwrappedParams.map { case (fieldName, fieldType) => Review comment: Get schema type -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mickjermsurawong-stripe commented on pull request #27153: [SPARK-20384][SQL] Support value class in schema of Dataset (without breaking existing current projection)
mickjermsurawong-stripe commented on pull request #27153: URL: https://github.com/apache/spark/pull/27153#issuecomment-873692759 hi @eejbyfeldt i made another PR to address the tuple issue raised. https://github.com/apache/spark/pull/33205. Would appreciate your review as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org