[GitHub] [spark] SparkQA removed a comment on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`
SparkQA removed a comment on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-858882404 **[Test build #139656 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139656/testReport)** for PR 32849 at commit [`410242e`](https://github.com/apache/spark/commit/410242eba1caff00f86c0355dc97fdb198cdbc70). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`
SparkQA commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-859148736 **[Test build #139656 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139656/testReport)** for PR 32849 at commit [`410242e`](https://github.com/apache/spark/commit/410242eba1caff00f86c0355dc97fdb198cdbc70). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya opened a new pull request #32870: [[SPARK-35439][SQL]][FOLLOWUO] ExpressionContainmentOrdering should not sort unrelated expressions
viirya opened a new pull request #32870: URL: https://github.com/apache/spark/pull/32870 ### What changes were proposed in this pull request? This is a followup of #32586. We introduced `ExpressionContainmentOrdering` to sort common expressions according to their parent-child relations. For unrelated expressions, previously the ordering returns -1 which is not correct and can possibly lead to transitivity issue. ### Why are the changes needed? To fix the possible transitivity issue of `ExpressionContainmentOrdering`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs
dongjoon-hyun closed pull request #32730: URL: https://github.com/apache/spark/pull/32730 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs
dongjoon-hyun commented on pull request #32730: URL: https://github.com/apache/spark/pull/32730#issuecomment-859142586 Thank you so much, @viirya ! Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
SparkQA removed a comment on pull request #32868: URL: https://github.com/apache/spark/pull/32868#issuecomment-859052129 **[Test build #139660 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139660/testReport)** for PR 32868 at commit [`3b38ae8`](https://github.com/apache/spark/commit/3b38ae861b0bf8127a2f8c621630932affb49b03). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
SparkQA commented on pull request #32868: URL: https://github.com/apache/spark/pull/32868#issuecomment-859138782 **[Test build #139660 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139660/testReport)** for PR 32868 at commit [`3b38ae8`](https://github.com/apache/spark/commit/3b38ae861b0bf8127a2f8c621630932affb49b03). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun edited a comment on pull request #32826: [SPARK-35670][BUILD] Upgrade ZSTD-JNI to 1.5.0-1
dongjoon-hyun edited a comment on pull request #32826: URL: https://github.com/apache/spark/pull/32826#issuecomment-859136503 It's just a historical fact. IMO, I believe that we need to replace it to `zstd-jni`. > Does it make sense to use aircompressor for ZSTD in ORC, rather than the zstd-jni? Yes, `aircompressor` is behind and also has ZSTD bug. That's the reason why the community (not only Apache ORC, but also Presto) complains at the new version of aircompressor. - https://github.com/airlift/aircompressor/issues/122 BTW, please note that your PR is merged to Apache ORC 1.7 which has no release plan yet. The situation is the same for the other communities. Apache Kafka with ZSTD 1.5? Apache Avro with ZSTD 1.5? Apache Parquet with ZSTD 1.5? Apache Spark should embrace those Apache Projects together because our customers are able to use them together in a single app. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #32826: [SPARK-35670][BUILD] Upgrade ZSTD-JNI to 1.5.0-1
dongjoon-hyun commented on pull request #32826: URL: https://github.com/apache/spark/pull/32826#issuecomment-859136503 It's just a historical factor. IMO, I believe that we need to replace it to `zstd-jni`. > Does it make sense to use aircompressor for ZSTD in ORC, rather than the zstd-jni? Yes, `aircompressor` is behind and also has ZSTD bug. That's the reason why the community (not only Apache ORC, but also Presto) complains at the new version of aircompressor. - https://github.com/airlift/aircompressor/issues/122 BTW, please note that your PR is merged to Apache ORC 1.7 which has no release plan yet. The situation is the same for the other community. Apache Kafka with ZSTD 1.5? Apache Avro with ZSTD 1.5? Apache Parquet with ZSTD 1.5? Apache Spark should embrace those Apache Projects together because our customers are able to use them together in a single app. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun edited a comment on pull request #32826: [SPARK-35670][BUILD] Upgrade ZSTD-JNI to 1.5.0-1
dongjoon-hyun edited a comment on pull request #32826: URL: https://github.com/apache/spark/pull/32826#issuecomment-859136503 It's just a historical fact. IMO, I believe that we need to replace it to `zstd-jni`. > Does it make sense to use aircompressor for ZSTD in ORC, rather than the zstd-jni? Yes, `aircompressor` is behind and also has ZSTD bug. That's the reason why the community (not only Apache ORC, but also Presto) complains at the new version of aircompressor. - https://github.com/airlift/aircompressor/issues/122 BTW, please note that your PR is merged to Apache ORC 1.7 which has no release plan yet. The situation is the same for the other community. Apache Kafka with ZSTD 1.5? Apache Avro with ZSTD 1.5? Apache Parquet with ZSTD 1.5? Apache Spark should embrace those Apache Projects together because our customers are able to use them together in a single app. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Tonix517 commented on a change in pull request #29000: [SPARK-27194][SPARK-29302][SQL] Fix commit collision in dynamic partition overwrite mode
Tonix517 commented on a change in pull request #29000: URL: https://github.com/apache/spark/pull/29000#discussion_r649580767 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala ## @@ -55,7 +55,8 @@ class SQLHadoopMapReduceCommitProtocol( // The specified output committer is a FileOutputCommitter. // So, we will use the FileOutputCommitter-specified constructor. val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) -committer = ctor.newInstance(new Path(path), context) +val committerOutputPath = if (dynamicPartitionOverwrite) stagingDir else new Path(path) +committer = ctor.newInstance(committerOutputPath, context) Review comment: Hey @WinkerDu - thank you for the PR. One question: when dynamicPartitionOverwrite is on, this code block will only execute when `clazz` is non-null, which means SQLConf.OUTPUT_COMMITTER_CLASS is set. It works for parquet files since that SQL property is set at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L97. So what about other file formats, like Orc? There seems to be no such property set logic for other file formats, at lease in Spark repo. So is dynamicPartitionOverwrite supposed to be for Parquet only? Am I missing sth here? Thanks. @cloud-fan @Ngone51 @agrawaldevesh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs
dongjoon-hyun commented on a change in pull request #32730: URL: https://github.com/apache/spark/pull/32730#discussion_r649578467 ## File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import scala.concurrent.duration._ + +import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} + +import org.apache.spark.{LocalSparkContext, MapOutputTrackerMaster, SparkConf, SparkContext, SparkFunSuite, TestUtils} +import org.apache.spark.LocalSparkContext.withSpark +import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC +import org.apache.spark.internal.config._ +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend + +class KubernetesLocalDiskShuffleDataIOSuite extends SparkFunSuite with LocalSparkContext { + + val conf = new SparkConf() +.setAppName("ShuffleExecutorComponentsSuite") +.setMaster("local-cluster[1,1,1024]") +.set(UI.UI_ENABLED, false) +.set(DYN_ALLOCATION_ENABLED, true) +.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true) +.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 1) +.set(DYN_ALLOCATION_MIN_EXECUTORS, 1) +.set(IO_ENCRYPTION_ENABLED, false) +.set(KUBERNETES_DRIVER_REUSE_PVC, true) +.set(SHUFFLE_IO_PLUGIN_CLASS, classOf[KubernetesLocalDiskShuffleDataIO].getName) + + test("recompute is not blocked by the recovery") { +sc = new SparkContext(conf) +withSpark(sc) { sc => + val master = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + assert(master.shuffleStatuses.isEmpty) + + val rdd = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3) +.groupByKey() + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + val loc1 = master.shuffleStatuses(0).mapStatuses(0).location + assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == Set(0, 1, 2)) + + // Reuse the existing shuffle data + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == Set(0, 1, 2)) + + // Decommission all executors + val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] + sc.getExecutorIds().foreach { id => +sched.killExecutor(id) + } + TestUtils.waitUntilExecutorsUp(sc, 1, 6) + // Shuffle status are removed + eventually(timeout(60.second), interval(1.seconds)) { +assert(master.shuffleStatuses.keys.toSet == Set(0)) +assert(master.shuffleStatuses(0).mapStatuses.forall(_ == null)) + } + + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == Set(9, 10, 11)) +} + } + + test("Partial recompute shuffle data") { +sc = new SparkContext(conf) +withSpark(sc) { sc => + val master = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + assert(master.shuffleStatuses.isEmpty) + + val rdd = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3).groupByKey() + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + val loc1 = master.shuffleStatuses(0).mapStatuses(0).location + assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1)) + assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == Set(0, 1, 2)) + + // Reuse the existing shuffle data + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1)) + assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == Set(0, 1, 2)) + + val rdd2 = sc.parallelize(Seq((4, "four"), (5, "five"), (6, "six"), (7, "seven")), 4) +.groupByKey() + rdd2.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0, 1)) + assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1)) +
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs
dongjoon-hyun commented on a change in pull request #32730: URL: https://github.com/apache/spark/pull/32730#discussion_r649577655 ## File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import scala.concurrent.duration._ + +import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} + +import org.apache.spark.{LocalSparkContext, MapOutputTrackerMaster, SparkConf, SparkContext, SparkFunSuite, TestUtils} +import org.apache.spark.LocalSparkContext.withSpark +import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC +import org.apache.spark.internal.config._ +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend + +class KubernetesLocalDiskShuffleDataIOSuite extends SparkFunSuite with LocalSparkContext { + + val conf = new SparkConf() +.setAppName("ShuffleExecutorComponentsSuite") +.setMaster("local-cluster[1,1,1024]") +.set(UI.UI_ENABLED, false) +.set(DYN_ALLOCATION_ENABLED, true) +.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true) +.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 1) +.set(DYN_ALLOCATION_MIN_EXECUTORS, 1) +.set(IO_ENCRYPTION_ENABLED, false) +.set(KUBERNETES_DRIVER_REUSE_PVC, true) +.set(SHUFFLE_IO_PLUGIN_CLASS, classOf[KubernetesLocalDiskShuffleDataIO].getName) + + test("recompute is not blocked by the recovery") { +sc = new SparkContext(conf) +withSpark(sc) { sc => + val master = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + assert(master.shuffleStatuses.isEmpty) + + val rdd = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3) +.groupByKey() + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + val loc1 = master.shuffleStatuses(0).mapStatuses(0).location Review comment: Yes, `loc1` is not used in this test case. I'll remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs
viirya commented on a change in pull request #32730: URL: https://github.com/apache/spark/pull/32730#discussion_r649576592 ## File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import scala.concurrent.duration._ + +import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} + +import org.apache.spark.{LocalSparkContext, MapOutputTrackerMaster, SparkConf, SparkContext, SparkFunSuite, TestUtils} +import org.apache.spark.LocalSparkContext.withSpark +import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC +import org.apache.spark.internal.config._ +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend + +class KubernetesLocalDiskShuffleDataIOSuite extends SparkFunSuite with LocalSparkContext { + + val conf = new SparkConf() +.setAppName("ShuffleExecutorComponentsSuite") +.setMaster("local-cluster[1,1,1024]") +.set(UI.UI_ENABLED, false) +.set(DYN_ALLOCATION_ENABLED, true) +.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true) +.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 1) +.set(DYN_ALLOCATION_MIN_EXECUTORS, 1) +.set(IO_ENCRYPTION_ENABLED, false) +.set(KUBERNETES_DRIVER_REUSE_PVC, true) +.set(SHUFFLE_IO_PLUGIN_CLASS, classOf[KubernetesLocalDiskShuffleDataIO].getName) + + test("recompute is not blocked by the recovery") { +sc = new SparkContext(conf) +withSpark(sc) { sc => + val master = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + assert(master.shuffleStatuses.isEmpty) + + val rdd = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3) +.groupByKey() + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + val loc1 = master.shuffleStatuses(0).mapStatuses(0).location + assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == Set(0, 1, 2)) + + // Reuse the existing shuffle data + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == Set(0, 1, 2)) + + // Decommission all executors + val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] + sc.getExecutorIds().foreach { id => +sched.killExecutor(id) + } + TestUtils.waitUntilExecutorsUp(sc, 1, 6) + // Shuffle status are removed + eventually(timeout(60.second), interval(1.seconds)) { +assert(master.shuffleStatuses.keys.toSet == Set(0)) +assert(master.shuffleStatuses(0).mapStatuses.forall(_ == null)) + } + + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == Set(9, 10, 11)) +} + } + + test("Partial recompute shuffle data") { +sc = new SparkContext(conf) +withSpark(sc) { sc => + val master = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + assert(master.shuffleStatuses.isEmpty) + + val rdd = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3).groupByKey() + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + val loc1 = master.shuffleStatuses(0).mapStatuses(0).location + assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1)) + assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == Set(0, 1, 2)) + + // Reuse the existing shuffle data + rdd.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0)) + assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1)) + assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == Set(0, 1, 2)) + + val rdd2 = sc.parallelize(Seq((4, "four"), (5, "five"), (6, "six"), (7, "seven")), 4) +.groupByKey() + rdd2.collect() + assert(master.shuffleStatuses.keys.toSet == Set(0, 1)) + assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1)) +
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
AmplabJenkins removed a comment on pull request #32868: URL: https://github.com/apache/spark/pull/32868#issuecomment-859126609 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44188/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
AmplabJenkins commented on pull request #32868: URL: https://github.com/apache/spark/pull/32868#issuecomment-859126609 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44188/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
SparkQA commented on pull request #32868: URL: https://github.com/apache/spark/pull/32868#issuecomment-859126580 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44188/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`
AmplabJenkins removed a comment on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-859124767 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44187/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
AmplabJenkins removed a comment on pull request #32869: URL: https://github.com/apache/spark/pull/32869#issuecomment-859124766 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139653/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree
AmplabJenkins removed a comment on pull request #32862: URL: https://github.com/apache/spark/pull/32862#issuecomment-859124769 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139654/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree
AmplabJenkins commented on pull request #32862: URL: https://github.com/apache/spark/pull/32862#issuecomment-859124769 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139654/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`
AmplabJenkins commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-859124767 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44187/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
AmplabJenkins commented on pull request #32869: URL: https://github.com/apache/spark/pull/32869#issuecomment-859124766 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139653/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
SparkQA removed a comment on pull request #32869: URL: https://github.com/apache/spark/pull/32869#issuecomment-858818300 **[Test build #139653 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139653/testReport)** for PR 32869 at commit [`dde4cd8`](https://github.com/apache/spark/commit/dde4cd800a3d2e9aac55d05b5d99cc00fba0a2e6). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
SparkQA commented on pull request #32869: URL: https://github.com/apache/spark/pull/32869#issuecomment-859119484 **[Test build #139653 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139653/testReport)** for PR 32869 at commit [`dde4cd8`](https://github.com/apache/spark/commit/dde4cd800a3d2e9aac55d05b5d99cc00fba0a2e6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dchristle commented on pull request #32826: [SPARK-35670][BUILD] Upgrade ZSTD-JNI to 1.5.0-1
dchristle commented on pull request #32826: URL: https://github.com/apache/spark/pull/32826#issuecomment-859115935 > Thank you for your efforts. BTW, @dchristle . Please note that your ORC PR is not about ZSTD-JNI. It's native ZSTD library only. I commented on your ORC PR about the difference. > > For the following, I saw Kafka failures. > > > They appear to pass their respective CIs. > > https://user-images.githubusercontent.com/9700541/121303167-9afc7380-c8af-11eb-9e2e-7500a3467325.png;> > > No worry~ For Apache Avro, they have a dependency bot. I guess they will catch up soon. Let's wait and see their activity. > > > I have less familiarity with Avro's build chains/codebase, so I did not attempt to test it yet. > > In addition, all libraries should be synced inside Apache Spark because Apache Spark is using everything. Yes, for ORC it's the native C library and not Java. I have a tangential question for you: Does it make sense to use `aircompressor` for ZSTD in ORC, rather than the `zstd-jni`? It does not seem to keep up with the latest `zstd`, and the implementation seems to lack support for many of the strategies employed at different compression levels, if I understand the code here https://github.com/airlift/aircompressor/blob/495bae80ac7487d2efa1bba437d04e8a2a42bb7b/src/main/java/io/airlift/compress/zstd/CompressionParameters.java#L143 correctly. The reason I ask is because it is conceivable that `zstd` in the future makes an incompatible change that propagates to `zstd-jni` but not `aircompressor`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree
SparkQA removed a comment on pull request #32862: URL: https://github.com/apache/spark/pull/32862#issuecomment-858818317 **[Test build #139654 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139654/testReport)** for PR 32862 at commit [`9902e03`](https://github.com/apache/spark/commit/9902e03b2cce528f91b1e4423252d77f689e5767). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree
SparkQA commented on pull request #32862: URL: https://github.com/apache/spark/pull/32862#issuecomment-859115485 **[Test build #139654 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139654/testReport)** for PR 32862 at commit [`9902e03`](https://github.com/apache/spark/commit/9902e03b2cce528f91b1e4423252d77f689e5767). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class BooleanExtensionOps(BooleanOps):` * `class NullOps(DataTypeOps):` * `class UDTOps(DataTypeOps):` * `public final class CompositeReadLimit implements ReadLimit ` * `public final class ReadMinRows implements ReadLimit ` * `trait InvokeLike extends Expression with NonSQLExpression with ImplicitCastInputTypes ` * `case class LateralSubquery(` * `case class LateralJoin(` * `case class CommandResultExec(` * `class RocksDBFileManager(` * ` sealed trait SchemaReader ` * ` class SchemaV1Reader extends SchemaReader ` * ` class SchemaV2Reader extends SchemaReader ` * ` trait SchemaWriter ` * ` class SchemaV1Writer extends SchemaWriter ` * ` class SchemaV2Writer extends SchemaWriter ` * `case class CommandResult(` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`
SparkQA commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-859115229 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44187/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
mridulm commented on pull request #32007: URL: https://github.com/apache/spark/pull/32007#issuecomment-859108418 Thanks for working on this @zhouyejoe Thanks for all the reviews @Ngone51, @Victsm, @otterc ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] asfgit closed pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
asfgit closed pull request #32007: URL: https://github.com/apache/spark/pull/32007 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
mridulm commented on pull request #32007: URL: https://github.com/apache/spark/pull/32007#issuecomment-859105499 The spark UI test which failed in jenkins is unrelated to this PR. Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
SparkQA commented on pull request #32868: URL: https://github.com/apache/spark/pull/32868#issuecomment-859104452 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44188/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data
otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r649548771 ## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ## @@ -1124,4 +1394,54 @@ object ShuffleBlockFetcherIterator { */ private[storage] case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends FetchResult + + /** + * Result of an un-successful fetch of either of these: + * 1) Remote shuffle block chunk. + * 2) Local merged block data. + * + * Instead of treating this as a FailureFetchResult, we ignore this failure + * and fallback to fetch the original unmerged blocks. + * @param blockId block id + * @param address BlockManager that the merged block was attempted to be fetched from + * @param size size of the block, used to update bytesInFlight. + * @param isNetworkReqDone Is this the last network request for this host in this fetch + * request. Used to update reqsInFlight. + */ + private[storage] case class IgnoreFetchResult(blockId: BlockId, Review comment: Will `Retriable` be confusing that this request itself is retried? Can call it `FallbackOnFailureFetchResult`. Let me know what you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`
SparkQA commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-859091608 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44187/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier
AmplabJenkins removed a comment on pull request #32854: URL: https://github.com/apache/spark/pull/32854#issuecomment-859087811 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139651/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier
AmplabJenkins commented on pull request #32854: URL: https://github.com/apache/spark/pull/32854#issuecomment-859087811 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139651/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier
SparkQA removed a comment on pull request #32854: URL: https://github.com/apache/spark/pull/32854#issuecomment-858769118 **[Test build #139651 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139651/testReport)** for PR 32854 at commit [`bd96508`](https://github.com/apache/spark/commit/bd9650841fff48c7456e226d564ac04c4ed5e217). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier
SparkQA commented on pull request #32854: URL: https://github.com/apache/spark/pull/32854#issuecomment-859075631 **[Test build #139651 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139651/testReport)** for PR 32854 at commit [`bd96508`](https://github.com/apache/spark/commit/bd9650841fff48c7456e226d564ac04c4ed5e217). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on pull request #30135: [SPARK-29250][BUILD] Upgrade to Hadoop 3.3.1
sunchao commented on pull request #30135: URL: https://github.com/apache/spark/pull/30135#issuecomment-859061487 > maybe the build is picking up the old RC Yes that's exactly what happened :) For the regression, don't know the full context behind the original change but seems like a good thing to do, although a boolean flag returned might be less disruptive IMO :). Either way it is a simple fix from Spark side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #32750: [SPARK-34512][BUILD][SQL] Upgrade built-in Hive to 2.3.9
dongjoon-hyun commented on pull request #32750: URL: https://github.com/apache/spark/pull/32750#issuecomment-859055684 Hi, @wangyum . @sunchao uploaded Apache Hive 2.3.9 to the Maven Central. Could you revise this PR? - https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs
dongjoon-hyun commented on pull request #32730: URL: https://github.com/apache/spark/pull/32730#issuecomment-859054062 Please let me know if there are any other comments, @viirya and @mridulm ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
SparkQA commented on pull request #32868: URL: https://github.com/apache/spark/pull/32868#issuecomment-859052129 **[Test build #139660 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139660/testReport)** for PR 32868 at commit [`3b38ae8`](https://github.com/apache/spark/commit/3b38ae861b0bf8127a2f8c621630932affb49b03). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #32868: [SPARK-35714]Bug fix for deadlock during the executor shutdown
dongjoon-hyun commented on pull request #32868: URL: https://github.com/apache/spark/pull/32868#issuecomment-859051087 cc @mridulm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #32868: [SPARK-35714]Bug fix for deadlock during the executor shutdown
dongjoon-hyun commented on pull request #32868: URL: https://github.com/apache/spark/pull/32868#issuecomment-859050639 ok to test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32868: [SPARK-35714]Bug fix for deadlock during the executor shutdown
AmplabJenkins removed a comment on pull request #32868: URL: https://github.com/apache/spark/pull/32868#issuecomment-858682923 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs
dongjoon-hyun commented on a change in pull request #32730: URL: https://github.com/apache/spark/pull/32730#discussion_r649519438 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io.File +import java.util.Optional + +import scala.reflect.ClassTag + +import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.internal.Logging +import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter} +import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents +import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, UnrecognizedBlockId} +import org.apache.spark.util.Utils + +class KubernetesLocalDiskShuffleExecutorComponents(sparkConf: SparkConf) + extends ShuffleExecutorComponents with Logging { + + private val delegate = new LocalDiskShuffleExecutorComponents(sparkConf) + private var blockManager: BlockManager = _ + + override def initializeExecutor( + appId: String, execId: String, extraConfigs: java.util.Map[String, String]): Unit = { +delegate.initializeExecutor(appId, execId, extraConfigs) +blockManager = SparkEnv.get.blockManager +if (sparkConf.getBoolean("spark.kubernetes.driver.reusePersistentVolumeClaim", false)) { + // Turn off the deletion of the shuffle data in order to reuse + blockManager.diskBlockManager.deleteFilesOnStop = false + Utils.tryLogNonFatalError { + KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, blockManager) + } +} + } + + override def createMapOutputWriter(shuffleId: Int, mapTaskId: Long, numPartitions: Int) +: ShuffleMapOutputWriter = { +delegate.createMapOutputWriter(shuffleId, mapTaskId, numPartitions) + } + + override def createSingleFileMapOutputWriter(shuffleId: Int, mapId: Long) +: Optional[SingleSpillShuffleMapOutputWriter] = { +delegate.createSingleFileMapOutputWriter(shuffleId, mapId) + } +} + +object KubernetesLocalDiskShuffleExecutorComponents extends Logging { + /** + * This tries to recover shuffle data of dead executors' local dirs if exists. + * Since the executors are already dead, we cannot use `getHostLocalDirs`. + * This is enabled only when spark.kubernetes.driver.reusePersistentVolumeClaim is true. + */ + def recoverDiskStore(conf: SparkConf, bm: BlockManager): Unit = { +// Find All files +val files = Utils.getConfiguredLocalDirs(conf) + .filter(_ != null) + .map(s => new File(new File(new File(s).getParent).getParent)) + .flatMap { dir => +val oldDirs = dir.listFiles().filter { f => + f.isDirectory && f.getName.startsWith("spark-") +} +val files = oldDirs + .flatMap(_.listFiles).filter(_.isDirectory) // executor-xxx + .flatMap(_.listFiles).filter(_.isDirectory) // blockmgr-xxx + .flatMap(_.listFiles).filter(_.isDirectory) // 00 Review comment: Sure, we can check the pattern. I will do later~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] steveloughran commented on pull request #30135: [SPARK-29250][BUILD] Upgrade to Hadoop 3.3.1
steveloughran commented on pull request #30135: URL: https://github.com/apache/spark/pull/30135#issuecomment-859048545 > reverted from RC3 so I reverted my earlier change of handling it in Spark code. Let me check later. maybe the build is picking up the old RC. FWIW I'm surprised about this regression...add as much as you can to the test, as in "what is the test trying to do?". Gabor's patch is trying to stop "hadoop fs" copy command from copying a file onto itself, which can be, well, lossy, depending on the OS and things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #32837: [SPARK-35692][K8S] Use AtomicInteger for executor id generating
dongjoon-hyun closed pull request #32837: URL: https://github.com/apache/spark/pull/32837 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
AmplabJenkins commented on pull request #32007: URL: https://github.com/apache/spark/pull/32007#issuecomment-859045755 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139658/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
AmplabJenkins removed a comment on pull request #32007: URL: https://github.com/apache/spark/pull/32007#issuecomment-859045755 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139658/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] steveloughran commented on a change in pull request #30135: [SPARK-29250][BUILD] Upgrade to Hadoop 3.3.1
steveloughran commented on a change in pull request #30135: URL: https://github.com/apache/spark/pull/30135#discussion_r649517273 ## File path: pom.xml ## @@ -197,6 +197,7 @@ 1.8 1.1.0 1.2 +1.52 Review comment: we rolled back the latest bouncy castle update as some internal spark builds with some extra modules were failing as the asm JAR couldn't handle the changes. Tried to fiddle with the spark settings there to use an updated asm JAR and gave up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
SparkQA removed a comment on pull request #32007: URL: https://github.com/apache/spark/pull/32007#issuecomment-858883870 **[Test build #139658 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139658/testReport)** for PR 32007 at commit [`e630725`](https://github.com/apache/spark/commit/e630725ca5c161cea62a2afcc7668a67a3e6d72e). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #32830: [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception
dongjoon-hyun closed pull request #32830: URL: https://github.com/apache/spark/pull/32830 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
SparkQA commented on pull request #32007: URL: https://github.com/apache/spark/pull/32007#issuecomment-859044030 **[Test build #139658 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139658/testReport)** for PR 32007 at commit [`e630725`](https://github.com/apache/spark/commit/e630725ca5c161cea62a2afcc7668a67a3e6d72e). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
MaxGekk closed pull request #32869: URL: https://github.com/apache/spark/pull/32869 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data
otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r649514612 ## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ## @@ -436,24 +485,48 @@ final class ShuffleBlockFetcherIterator( val iterator = blockInfos.iterator var curRequestSize = 0L var curBlocks = Seq.empty[FetchBlockInfo] - while (iterator.hasNext) { val (blockId, size, mapIndex) = iterator.next() - assertPositiveBlockSize(blockId, size) curBlocks = curBlocks ++ Seq(FetchBlockInfo(blockId, size, mapIndex)) curRequestSize += size - // For batch fetch, the actual block in flight should count for merged block. - val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress - if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { -curBlocks = createFetchRequests(curBlocks, address, isLast = false, - collectedRemoteRequests) -curRequestSize = curBlocks.map(_.size).sum + blockId match { +// Either all blocks are merged blocks, merged block chunks, or original non-merged blocks. +// Based on these types, we decide to do batch fetch and create FetchRequests with +// forMergedMetas set. +case ShuffleBlockChunkId(_, _, _) => + if (curRequestSize >= targetRemoteRequestSize || +curBlocks.size >= maxBlocksInFlightPerAddress) { +curBlocks = createFetchRequests(curBlocks, address, isLast = false, + collectedRemoteRequests, enableBatchFetch = false) +curRequestSize = curBlocks.map(_.size).sum + } +case ShuffleBlockId(_, SHUFFLE_PUSH_MAP_ID, _) => + if (curBlocks.size >= maxBlocksInFlightPerAddress) { +curBlocks = createFetchRequests(curBlocks, address, isLast = false, + collectedRemoteRequests, enableBatchFetch = false, forMergedMetas = true) + } +case _ => + // For batch fetch, the actual block in flight should count for merged block. + val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress + if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { +curBlocks = createFetchRequests(curBlocks, address, isLast = false, + collectedRemoteRequests, enableBatchFetch = doBatchFetch) +curRequestSize = curBlocks.map(_.size).sum + } } } // Add in the final request if (curBlocks.nonEmpty) { + val (enableBatchFetch, areMergedBlocks) = { +curBlocks.head.blockId match { + case ShuffleBlockChunkId(_, _, _) => (false, false) + case ShuffleBlockId(_, SHUFFLE_PUSH_MAP_ID, _) => (false, true) + case _ => (doBatchFetch, false) +} + } curBlocks = createFetchRequests(curBlocks, address, isLast = true, -collectedRemoteRequests) +collectedRemoteRequests, enableBatchFetch = enableBatchFetch, +forMergedMetas = areMergedBlocks) curRequestSize = curBlocks.map(_.size).sum Review comment: We do want the sum of the sizes of all the blocks in `curBlocks` so I think the `sum` is needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
MaxGekk commented on pull request #32869: URL: https://github.com/apache/spark/pull/32869#issuecomment-859039767 +1, LGTM. Merging to master. Thank you @gengliangwang, and @cloud-fan for your review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`
SparkQA commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-859038318 **[Test build #139659 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139659/testReport)** for PR 32849 at commit [`32fae30`](https://github.com/apache/spark/commit/32fae304389bbf5f3d9ae7cfb1a2496ae9e84a35). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
AmplabJenkins removed a comment on pull request #32007: URL: https://github.com/apache/spark/pull/32007#issuecomment-859033409 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44186/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`
AmplabJenkins removed a comment on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-859033402 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44184/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32847: [WIP] [SPARK-35616][PYTHON] Make astype data-type-based
AmplabJenkins removed a comment on pull request #32847: URL: https://github.com/apache/spark/pull/32847#issuecomment-859033406 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44185/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32470: [SPARK-35712][SQL] Simplify ResolveAggregateFunctions
AmplabJenkins removed a comment on pull request #32470: URL: https://github.com/apache/spark/pull/32470#issuecomment-859033403 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139652/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`
AmplabJenkins commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-859033402 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44184/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
AmplabJenkins commented on pull request #32007: URL: https://github.com/apache/spark/pull/32007#issuecomment-859033409 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44186/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32847: [WIP] [SPARK-35616][PYTHON] Make astype data-type-based
AmplabJenkins commented on pull request #32847: URL: https://github.com/apache/spark/pull/32847#issuecomment-859033406 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44185/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32470: [SPARK-35712][SQL] Simplify ResolveAggregateFunctions
AmplabJenkins commented on pull request #32470: URL: https://github.com/apache/spark/pull/32470#issuecomment-859033403 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139652/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data
otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r649505327 ## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ## @@ -347,77 +355,118 @@ final class ShuffleBlockFetcherIterator( } } - private[this] def partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest] = { + /** + * This is called from initialize and also from the fallback which is triggered from + * [[PushBasedFetchHelper]]. + */ + private[this] def partitionBlocksByFetchMode( + blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], + localBlocks: mutable.LinkedHashSet[(BlockId, Int)], + hostLocalBlocksByExecutor: mutable.LinkedHashMap[BlockManagerId, Seq[(BlockId, Long, Int)]], + mergedLocalBlocks: mutable.LinkedHashSet[BlockId]): ArrayBuffer[FetchRequest] = { logDebug(s"maxBytesInFlight: $maxBytesInFlight, targetRemoteRequestSize: " + s"$targetRemoteRequestSize, maxBlocksInFlightPerAddress: $maxBlocksInFlightPerAddress") -// Partition to local, host-local and remote blocks. Remote blocks are further split into -// FetchRequests of size at most maxBytesInFlight in order to limit the amount of data in flight +// Partition to local, host-local, merged-local, remote (includes merged-remote) blocks. +// Remote blocks are further split into FetchRequests of size at most maxBytesInFlight in order +// to limit the amount of data in flight val collectedRemoteRequests = new ArrayBuffer[FetchRequest] +val hostLocalBlocksCurrentIteration = mutable.LinkedHashSet[(BlockId, Int)]() var localBlockBytes = 0L var hostLocalBlockBytes = 0L +var mergedLocalBlockBytes = 0L var remoteBlockBytes = 0L +val prevNumBlocksToFetch = numBlocksToFetch val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId for ((address, blockInfos) <- blocksByAddress) { - if (Seq(blockManager.blockManagerId.executorId, fallback).contains(address.executorId)) { -checkBlockSizes(blockInfos) + checkBlockSizes(blockInfos) + if (pushBasedFetchHelper.isMergedShuffleBlockAddress(address)) { +// These are push-based merged blocks or chunks of these merged blocks. +if (address.host == blockManager.blockManagerId.host) { + val pushMergedBlockInfos = blockInfos.map( +info => FetchBlockInfo(info._1, info._2, info._3)) + numBlocksToFetch += pushMergedBlockInfos.size + mergedLocalBlocks ++= pushMergedBlockInfos.map(info => info.blockId) + val size = pushMergedBlockInfos.map(_.size).sum + logInfo(s"Got ${pushMergedBlockInfos.size} local merged blocks " + +s"of size $size") + mergedLocalBlockBytes += size +} else { + remoteBlockBytes += blockInfos.map(_._2).sum + collectFetchRequests(address, blockInfos, collectedRemoteRequests) +} + } else if ( +Seq(blockManager.blockManagerId.executorId, fallback).contains(address.executorId)) { val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded( blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), doBatchFetch) numBlocksToFetch += mergedBlockInfos.size localBlocks ++= mergedBlockInfos.map(info => (info.blockId, info.mapIndex)) localBlockBytes += mergedBlockInfos.map(_.size).sum } else if (blockManager.hostLocalDirManager.isDefined && address.host == blockManager.blockManagerId.host) { -checkBlockSizes(blockInfos) val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded( blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), doBatchFetch) numBlocksToFetch += mergedBlockInfos.size val blocksForAddress = mergedBlockInfos.map(info => (info.blockId, info.size, info.mapIndex)) hostLocalBlocksByExecutor += address -> blocksForAddress -hostLocalBlocks ++= blocksForAddress.map(info => (info._1, info._3)) +hostLocalBlocksCurrentIteration ++= blocksForAddress.map(info => (info._1, info._3)) hostLocalBlockBytes += mergedBlockInfos.map(_.size).sum } else { remoteBlockBytes += blockInfos.map(_._2).sum collectFetchRequests(address, blockInfos, collectedRemoteRequests) } } val numRemoteBlocks = collectedRemoteRequests.map(_.blocks.size).sum -val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes -assert(numBlocksToFetch == localBlocks.size + hostLocalBlocks.size + numRemoteBlocks, - s"The number of non-empty blocks $numBlocksToFetch doesn't equal to the number of local " + -s"blocks ${localBlocks.size} + the number of host-local blocks ${hostLocalBlocks.size} " + -s"+ the number of remote blocks ${numRemoteBlocks}.") -logInfo(s"Getting $numBlocksToFetch
[GitHub] [spark] MaxGekk commented on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type
MaxGekk commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-859028904 @cloud-fan I have addressed all comments except of opening JIRA sub-task. Will do that as soon as all test are passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type
MaxGekk commented on a change in pull request #32849: URL: https://github.com/apache/spark/pull/32849#discussion_r649502078 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala ## @@ -405,8 +405,9 @@ case class MultiplyDTInterval( override def left: Expression = interval override def right: Expression = num - override def inputTypes: Seq[AbstractDataType] = Seq(DayTimeIntervalType, NumericType) - override def dataType: DataType = DayTimeIntervalType + // TODO(SPARK-X): Multiply day-time intervals with any fields + override def inputTypes: Seq[AbstractDataType] = Seq(DayTimeIntervalType(), NumericType) Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type
MaxGekk commented on a change in pull request #32849: URL: https://github.com/apache/spark/pull/32849#discussion_r649497221 ## File path: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java ## @@ -21,10 +21,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.*; Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #32470: [SPARK-35712][SQL] Simplify ResolveAggregateFunctions
SparkQA removed a comment on pull request #32470: URL: https://github.com/apache/spark/pull/32470#issuecomment-858777838 **[Test build #139652 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139652/testReport)** for PR 32470 at commit [`1e973ef`](https://github.com/apache/spark/commit/1e973ef66b445e3d70df92a52b39e99f7c491151). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type
SparkQA commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-859006068 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44184/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32470: [SPARK-35712][SQL] Simplify ResolveAggregateFunctions
SparkQA commented on pull request #32470: URL: https://github.com/apache/spark/pull/32470#issuecomment-859005261 **[Test build #139652 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139652/testReport)** for PR 32470 at commit [`1e973ef`](https://github.com/apache/spark/commit/1e973ef66b445e3d70df92a52b39e99f7c491151). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32847: [WIP] [SPARK-35616][PYTHON] Make astype data-type-based
SparkQA commented on pull request #32847: URL: https://github.com/apache/spark/pull/32847#issuecomment-859003646 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44185/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data
otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r649482176 ## File path: core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success} + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.MapOutputTracker +import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID +import org.apache.spark.internal.Logging +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, MergedBlocksMetaListener} +import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER +import org.apache.spark.storage.ShuffleBlockFetcherIterator._ + +/** + * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the push-based + * functionality to fetch merged block meta and merged shuffle block chunks. + */ +private class PushBasedFetchHelper( + private val iterator: ShuffleBlockFetcherIterator, + private val shuffleClient: BlockStoreClient, + private val blockManager: BlockManager, + private val mapOutputTracker: MapOutputTracker) extends Logging { + + private[this] val startTimeNs = System.nanoTime() + + private[this] val localShuffleMergerBlockMgrId = BlockManagerId( +SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host, +blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo) + + /** A map for storing merged block shuffle chunk bitmap */ + private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, RoaringBitmap]() + + /** + * Returns true if the address is for a push-merged block. + */ + def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = { +SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId) + } + + /** + * Returns true if the address is not of executor local or merged local block. false otherwise. + */ + def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = { +(isMergedShuffleBlockAddress(address) && address.host != blockManager.blockManagerId.host) || + (!isMergedShuffleBlockAddress(address) && address != blockManager.blockManagerId) + } + + /** + * Returns true if the address if of merged local block. false otherwise. + */ + def isMergedLocal(address: BlockManagerId): Boolean = { +isMergedShuffleBlockAddress(address) && address.host == blockManager.blockManagerId.host + } + + def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = { +chunksMetaMap(blockId).getCardinality + } + + def removeChunk(blockId: ShuffleBlockChunkId): Unit = { +chunksMetaMap.remove(blockId) + } + + def createChunkBlockInfosFromMetaResponse( + shuffleId: Int, + reduceId: Int, + blockSize: Long, + numChunks: Int, +bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = { +val approxChunkSize = blockSize / numChunks +val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]() +for (i <- 0 until numChunks) { + val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i) + chunksMetaMap.put(blockChunkId, bitmaps(i)) + logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize") + blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID)) +} +blocksToFetch + } + + def sendFetchMergedStatusRequest(req: FetchRequest): Unit = { +val sizeMap = req.blocks.map { + case FetchBlockInfo(blockId, size, _) => +val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] +((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap +val address = req.address +val mergedBlocksMetaListener = new MergedBlocksMetaListener { + override def onSuccess(shuffleId: Int, reduceId: Int, meta: MergedBlockMeta): Unit = { +logInfo(s"Received the meta of merged block for ($shuffleId, $reduceId) " + + s"from ${req.address.host}:${req.address.port}") +
[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data
otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r649479840 ## File path: core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success} + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.MapOutputTracker +import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID +import org.apache.spark.internal.Logging +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, MergedBlocksMetaListener} +import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER +import org.apache.spark.storage.ShuffleBlockFetcherIterator._ + +/** + * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the push-based + * functionality to fetch merged block meta and merged shuffle block chunks. + */ +private class PushBasedFetchHelper( + private val iterator: ShuffleBlockFetcherIterator, + private val shuffleClient: BlockStoreClient, + private val blockManager: BlockManager, + private val mapOutputTracker: MapOutputTracker) extends Logging { + + private[this] val startTimeNs = System.nanoTime() + + private[this] val localShuffleMergerBlockMgrId = BlockManagerId( +SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host, +blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo) + + /** A map for storing merged block shuffle chunk bitmap */ + private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, RoaringBitmap]() Review comment: Same as above. It is always accessed by the task thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data
otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r649479384 ## File path: core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success} + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.MapOutputTracker +import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID +import org.apache.spark.internal.Logging +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, MergedBlocksMetaListener} +import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER +import org.apache.spark.storage.ShuffleBlockFetcherIterator._ + +/** + * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the push-based + * functionality to fetch merged block meta and merged shuffle block chunks. + */ +private class PushBasedFetchHelper( + private val iterator: ShuffleBlockFetcherIterator, + private val shuffleClient: BlockStoreClient, + private val blockManager: BlockManager, + private val mapOutputTracker: MapOutputTracker) extends Logging { + + private[this] val startTimeNs = System.nanoTime() + + private[this] val localShuffleMergerBlockMgrId = BlockManagerId( +SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host, +blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo) + + /** A map for storing merged block shuffle chunk bitmap */ + private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, RoaringBitmap]() + + /** + * Returns true if the address is for a push-merged block. + */ + def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = { +SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId) + } + + /** + * Returns true if the address is not of executor local or merged local block. false otherwise. + */ + def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = { +(isMergedShuffleBlockAddress(address) && address.host != blockManager.blockManagerId.host) || + (!isMergedShuffleBlockAddress(address) && address != blockManager.blockManagerId) + } + + /** + * Returns true if the address if of merged local block. false otherwise. + */ + def isMergedLocal(address: BlockManagerId): Boolean = { +isMergedShuffleBlockAddress(address) && address.host == blockManager.blockManagerId.host + } + + def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = { +chunksMetaMap(blockId).getCardinality + } + + def removeChunk(blockId: ShuffleBlockChunkId): Unit = { +chunksMetaMap.remove(blockId) + } + + def createChunkBlockInfosFromMetaResponse( + shuffleId: Int, + reduceId: Int, + blockSize: Long, + numChunks: Int, +bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = { +val approxChunkSize = blockSize / numChunks +val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]() +for (i <- 0 until numChunks) { + val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i) + chunksMetaMap.put(blockChunkId, bitmaps(i)) + logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize") + blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID)) +} +blocksToFetch + } + + def sendFetchMergedStatusRequest(req: FetchRequest): Unit = { +val sizeMap = req.blocks.map { + case FetchBlockInfo(blockId, size, _) => +val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] +((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap +val address = req.address +val mergedBlocksMetaListener = new MergedBlocksMetaListener { + override def onSuccess(shuffleId: Int, reduceId: Int, meta: MergedBlockMeta): Unit = { +logInfo(s"Received the meta of merged block for ($shuffleId, $reduceId) " + + s"from ${req.address.host}:${req.address.port}") +
[GitHub] [spark] SparkQA commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
SparkQA commented on pull request #32007: URL: https://github.com/apache/spark/pull/32007#issuecomment-858977861 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44186/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data
otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r649472914 ## File path: core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success} + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.MapOutputTracker +import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID +import org.apache.spark.internal.Logging +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, MergedBlocksMetaListener} +import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER +import org.apache.spark.storage.ShuffleBlockFetcherIterator._ + +/** + * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the push-based + * functionality to fetch merged block meta and merged shuffle block chunks. + */ +private class PushBasedFetchHelper( + private val iterator: ShuffleBlockFetcherIterator, + private val shuffleClient: BlockStoreClient, + private val blockManager: BlockManager, + private val mapOutputTracker: MapOutputTracker) extends Logging { + + private[this] val startTimeNs = System.nanoTime() + + private[this] val localShuffleMergerBlockMgrId = BlockManagerId( +SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host, +blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo) + + /** A map for storing merged block shuffle chunk bitmap */ + private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, RoaringBitmap]() + + /** + * Returns true if the address is for a push-merged block. + */ + def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = { +SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId) + } + + /** + * Returns true if the address is not of executor local or merged local block. false otherwise. + */ + def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = { +(isMergedShuffleBlockAddress(address) && address.host != blockManager.blockManagerId.host) || + (!isMergedShuffleBlockAddress(address) && address != blockManager.blockManagerId) + } + + /** + * Returns true if the address if of merged local block. false otherwise. + */ + def isMergedLocal(address: BlockManagerId): Boolean = { +isMergedShuffleBlockAddress(address) && address.host == blockManager.blockManagerId.host + } + + def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = { +chunksMetaMap(blockId).getCardinality + } + + def removeChunk(blockId: ShuffleBlockChunkId): Unit = { +chunksMetaMap.remove(blockId) + } + + def createChunkBlockInfosFromMetaResponse( + shuffleId: Int, + reduceId: Int, + blockSize: Long, + numChunks: Int, +bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = { +val approxChunkSize = blockSize / numChunks +val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]() +for (i <- 0 until numChunks) { + val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i) + chunksMetaMap.put(blockChunkId, bitmaps(i)) + logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize") + blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID)) +} +blocksToFetch + } + + def sendFetchMergedStatusRequest(req: FetchRequest): Unit = { +val sizeMap = req.blocks.map { + case FetchBlockInfo(blockId, size, _) => +val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] +((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap +val address = req.address +val mergedBlocksMetaListener = new MergedBlocksMetaListener { + override def onSuccess(shuffleId: Int, reduceId: Int, meta: MergedBlockMeta): Unit = { +logInfo(s"Received the meta of merged block for ($shuffleId, $reduceId) " + + s"from ${req.address.host}:${req.address.port}") +
[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type
SparkQA commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-858965222 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44184/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #32473: [SPARK-35345][SQL] Add Parquet tests to BloomFilterBenchmark
viirya commented on a change in pull request #32473: URL: https://github.com/apache/spark/pull/32473#discussion_r649471892 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BloomFilterBenchmark.scala ## @@ -81,8 +85,113 @@ object BloomFilterBenchmark extends SqlBasedBenchmark { } } + private def readORCBenchmarkForInSet(): Unit = { +withTempPath { dir => + val path = dir.getCanonicalPath + val samples = df2.sample(0.03, 128).select("value").as[String].collect() + val filter = "value IN (" + samples.map ( x => s"'$x'").mkString(", ") + ")" + + df2.repartition(col("value")).sort(col("value")).write.orc(path + "/withoutBF") + df2.repartition(col("value")).sort(col("value")) +.write.option("orc.bloom.filter.columns", "value").orc(path + "/withBF") + + runBenchmark(s"ORC Read for IN set") { +val benchmark = new Benchmark(s"Read a row from 1M rows", 1000 * 1000, output = output) Review comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #32473: [SPARK-35345][SQL] Add Parquet tests to BloomFilterBenchmark
viirya commented on a change in pull request #32473: URL: https://github.com/apache/spark/pull/32473#discussion_r649471780 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BloomFilterBenchmark.scala ## @@ -81,8 +85,113 @@ object BloomFilterBenchmark extends SqlBasedBenchmark { } } + private def readORCBenchmarkForInSet(): Unit = { +withTempPath { dir => + val path = dir.getCanonicalPath + val samples = df2.sample(0.03, 128).select("value").as[String].collect() + val filter = "value IN (" + samples.map ( x => s"'$x'").mkString(", ") + ")" + + df2.repartition(col("value")).sort(col("value")).write.orc(path + "/withoutBF") + df2.repartition(col("value")).sort(col("value")) +.write.option("orc.bloom.filter.columns", "value").orc(path + "/withBF") + + runBenchmark(s"ORC Read for IN set") { Review comment: nit: s"" is not needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type
AmplabJenkins removed a comment on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-858958878 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139643/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #32473: [SPARK-35345][SQL] Add Parquet tests to BloomFilterBenchmark
viirya commented on a change in pull request #32473: URL: https://github.com/apache/spark/pull/32473#discussion_r649470905 ## File path: sql/core/benchmarks/BloomFilterBenchmark-jdk11-results.txt ## @@ -2,23 +2,179 @@ ORC Write -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz +OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz Write 100M rows: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -Without bloom filter 19503 19621 166 5.1 195.0 1.0X -With bloom filter 22472 22710 335 4.4 224.7 0.9X +Without bloom filter 13568 13645 109 7.4 135.7 1.0X +With bloom filter 16116 16238 172 6.2 161.2 0.8X ORC Read -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz +OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz Read a row from 100M rows:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -Without bloom filter 1981 2040 82 50.5 19.8 1.0X -With bloom filter 1428 1467 54 70.0 14.3 1.4X +Without bloom filter 1572 1605 47 63.6 15.7 1.0X +With bloom filter 1343 1359 23 74.5 13.4 1.2X + + + +ORC Read for IN set + + +OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +Read a row from 1M rows: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative + +Without bloom filter 51 63 15 19.6 51.1 1.0X +With bloom filter54 88 23 18.5 54.0 0.9X + + + +Parquet Write + + +OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +Write 100M rows: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative + +Without bloom filter 13679 13954 389 7.3 136.8 1.0X +With bloom filter 18260 18284 33 5.5 182.6 0.7X + + + +Parquet Read + + +OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +Read a row from 100M rows:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative + +Without bloom filter, blocksize: 2097152954984 49104.8 9.5 1.0X +With bloom filter, blocksize: 2097152 285307 21350.4 2.9 3.3X + +
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier
AmplabJenkins removed a comment on pull request #32854: URL: https://github.com/apache/spark/pull/32854#issuecomment-858958876 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44179/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
AmplabJenkins removed a comment on pull request #32869: URL: https://github.com/apache/spark/pull/32869#issuecomment-858958880 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44181/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree
AmplabJenkins removed a comment on pull request #32862: URL: https://github.com/apache/spark/pull/32862#issuecomment-858958887 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44182/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type
AmplabJenkins commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-858958878 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139643/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier
AmplabJenkins commented on pull request #32854: URL: https://github.com/apache/spark/pull/32854#issuecomment-858958876 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44179/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree
AmplabJenkins commented on pull request #32862: URL: https://github.com/apache/spark/pull/32862#issuecomment-858958887 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44182/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
AmplabJenkins commented on pull request #32869: URL: https://github.com/apache/spark/pull/32869#issuecomment-858958880 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44181/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #32473: [SPARK-35345][SQL] Add Parquet tests to BloomFilterBenchmark
viirya commented on a change in pull request #32473: URL: https://github.com/apache/spark/pull/32473#discussion_r649470402 ## File path: sql/core/benchmarks/BloomFilterBenchmark-jdk11-results.txt ## @@ -2,23 +2,179 @@ ORC Write -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz +OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz Write 100M rows: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -Without bloom filter 19503 19621 166 5.1 195.0 1.0X -With bloom filter 22472 22710 335 4.4 224.7 0.9X +Without bloom filter 13568 13645 109 7.4 135.7 1.0X +With bloom filter 16116 16238 172 6.2 161.2 0.8X ORC Read -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz +OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz Read a row from 100M rows:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -Without bloom filter 1981 2040 82 50.5 19.8 1.0X -With bloom filter 1428 1467 54 70.0 14.3 1.4X +Without bloom filter 1572 1605 47 63.6 15.7 1.0X +With bloom filter 1343 1359 23 74.5 13.4 1.2X + + + +ORC Read for IN set + + +OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +Read a row from 1M rows: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative + +Without bloom filter 51 63 15 19.6 51.1 1.0X +With bloom filter54 88 23 18.5 54.0 0.9X + + + +Parquet Write + + +OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +Write 100M rows: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative + +Without bloom filter 13679 13954 389 7.3 136.8 1.0X +With bloom filter 18260 18284 33 5.5 182.6 0.7X + + + +Parquet Read + + +OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +Read a row from 100M rows:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative + +Without bloom filter, blocksize: 2097152954984 49104.8 9.5 1.0X +With bloom filter, blocksize: 2097152 285307 21350.4 2.9 3.3X + +
[GitHub] [spark] SparkQA commented on pull request #32847: [WIP] [SPARK-35616][PYTHON] Make astype data-type-based
SparkQA commented on pull request #32847: URL: https://github.com/apache/spark/pull/32847#issuecomment-858957221 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44185/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on pull request #32702: [SPARK-35565][SS] Add config for ignoring metadata directory of FileStreamSink
viirya commented on pull request #32702: URL: https://github.com/apache/spark/pull/32702#issuecomment-858953384 @HeartSaVioR @xuanyuanking Can we move forward with this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
SparkQA commented on pull request #32869: URL: https://github.com/apache/spark/pull/32869#issuecomment-858946087 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44181/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type
SparkQA removed a comment on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-858674648 **[Test build #139643 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139643/testReport)** for PR 32849 at commit [`7e502b5`](https://github.com/apache/spark/commit/7e502b56afcf6bc3238fae037323e0f0ab78edbf). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data
otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r649457152 ## File path: core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success} + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.MapOutputTracker +import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID +import org.apache.spark.internal.Logging +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, MergedBlocksMetaListener} +import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER +import org.apache.spark.storage.ShuffleBlockFetcherIterator._ + +/** + * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the push-based + * functionality to fetch merged block meta and merged shuffle block chunks. + */ +private class PushBasedFetchHelper( + private val iterator: ShuffleBlockFetcherIterator, + private val shuffleClient: BlockStoreClient, + private val blockManager: BlockManager, + private val mapOutputTracker: MapOutputTracker) extends Logging { + + private[this] val startTimeNs = System.nanoTime() + + private[this] val localShuffleMergerBlockMgrId = BlockManagerId( +SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host, +blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo) + + /** A map for storing merged block shuffle chunk bitmap */ + private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, RoaringBitmap]() + + /** + * Returns true if the address is for a push-merged block. + */ + def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = { +SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId) + } + + /** + * Returns true if the address is not of executor local or merged local block. false otherwise. + */ + def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = { +(isMergedShuffleBlockAddress(address) && address.host != blockManager.blockManagerId.host) || + (!isMergedShuffleBlockAddress(address) && address != blockManager.blockManagerId) + } + + /** + * Returns true if the address if of merged local block. false otherwise. + */ + def isMergedLocal(address: BlockManagerId): Boolean = { +isMergedShuffleBlockAddress(address) && address.host == blockManager.blockManagerId.host + } + + def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = { +chunksMetaMap(blockId).getCardinality + } + + def removeChunk(blockId: ShuffleBlockChunkId): Unit = { +chunksMetaMap.remove(blockId) + } + + def createChunkBlockInfosFromMetaResponse( + shuffleId: Int, + reduceId: Int, + blockSize: Long, + numChunks: Int, +bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = { +val approxChunkSize = blockSize / numChunks +val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]() +for (i <- 0 until numChunks) { + val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i) + chunksMetaMap.put(blockChunkId, bitmaps(i)) + logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize") + blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID)) +} +blocksToFetch + } + + def sendFetchMergedStatusRequest(req: FetchRequest): Unit = { +val sizeMap = req.blocks.map { + case FetchBlockInfo(blockId, size, _) => +val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] +((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap +val address = req.address +val mergedBlocksMetaListener = new MergedBlocksMetaListener { + override def onSuccess(shuffleId: Int, reduceId: Int, meta: MergedBlockMeta): Unit = { +logInfo(s"Received the meta of merged block for ($shuffleId, $reduceId) " + + s"from ${req.address.host}:${req.address.port}") +
[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type
SparkQA commented on pull request #32849: URL: https://github.com/apache/spark/pull/32849#issuecomment-858940273 **[Test build #139643 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139643/testReport)** for PR 32849 at commit [`7e502b5`](https://github.com/apache/spark/commit/7e502b56afcf6bc3238fae037323e0f0ab78edbf). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree
SparkQA commented on pull request #32862: URL: https://github.com/apache/spark/pull/32862#issuecomment-858936148 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44182/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org